Source code for smartpipeline.stage

import uuid
from abc import ABC, abstractmethod
from logging import Logger, getLogger
from queue import Queue
from typing import Any, Optional, Sequence, TypeVar, Union

from smartpipeline.item import Item

__author__ = "Giacomo Berardi <giacbrd.com>"


class AliveMixin:
    """
    Simple mixin for defining an object that can "exist and communicate"
    """

    def set_name(self, name: str):
        self._name = name

    @property
    def name(self) -> str:
        if getattr(self, "_name", None) is None:
            self._name = f"{self.__class__.__name__}-{str(uuid.uuid4())[:8]}"
        return self._name

    def __str__(self) -> str:
        return self.name

    @property
    def logger(self) -> Logger:
        """Specific logger for this object"""
        if getattr(self, "_logger", None) is None:
            self._logger = getLogger(self.name)
        return self._logger


class ConstructorMixin:
    def on_start(self) -> Any:
        """
        Called after concurrent stage executor initialization in a process (only on multiprocessing concurrency)
        or after construction in all other cases, by the pipeline.
        The stage in the executor is a copy of the original,
        by overriding this method one can initialize variables specifically for the copies, that is mandatory
        when they are not serializable.
        """
        pass

    def on_end(self) -> Any:
        """
        Called when the stage terminates, useful for executing closing operations (e.g. on files)
        """
        pass


class Processor(ABC):
    @abstractmethod
    def process(self, item: Item) -> Item:
        """
        Process a single item received by the stage.
        Must be overridden for properly defining a stage

        :return: The same item instance processed and enriched by the stage
        """
        return item


class BatchProcessor(ABC):
    @abstractmethod
    def process_batch(self, items: Sequence[Item]) -> Sequence[Item]:
        """
        Process a batch of items received by the stage.
        Must be overridden for properly defining a batch stage

        :return: The same batch with items processed and enriched by the stage
        """
        return items


[docs]class Stage(AliveMixin, ConstructorMixin, Processor): """ Extend this class and override :meth:`.Stage.process` for defining a stage """ def __str__(self) -> str: return f"Stage {self.name}"
[docs]class BatchStage(AliveMixin, ConstructorMixin, BatchProcessor): """ Extend this class and override :meth:`.BatchStage.process_batch` for defining a batch stage """ def __init__(self, size: int, timeout: float = 1.0): """ :param size: Maximum size of item batches that can be processed together :param timeout: Seconds to wait before flushing a batch (calling :meth:`.BatchStage.process_batch` on it) """ self._size = size self._timeout = timeout def __str__(self) -> str: return f"Batch stage {self.name}" @property def size(self) -> int: """ Get the maximum size of item batches that can be processed together """ return self._size @property def timeout(self) -> float: """ Seconds to wait before flushing a batch (calling :meth:`.BatchStage.process_batch` on it) """ return self._timeout
[docs]class Source(ABC, AliveMixin): """ Extend this for defining a pipeline source """
[docs] @abstractmethod def pop(self) -> Optional[Item]: """ Generate items for feeding a pipeline. Must be overridden for properly defining a source. Call :meth:`.Source.stop` when item generation is ended :return: The generated item, if None it is simply ignored (e.g. after calling :meth:`.Source.stop`) """ pass
[docs] def stop(self): """ Declare the end item generation, this event will be spread through the pipeline """ self._is_stopped = True
@property def is_stopped(self) -> bool: """ True if the source has called the stop event """ return getattr(self, "_is_stopped", False)
ItemsQueue = Queue[Optional[Item]] StageType = Union[Stage, BatchStage] StageTypeVar = TypeVar("StageTypeVar", bound=StageType)