from __future__ import annotations
import uuid
from typing import Any, Callable, Dict, Generator, KeysView, Union
from smartpipeline.defaults import DATA_SNIPPET_SIZE
from smartpipeline.error.exceptions import CriticalError, SoftError
__author__ = "Giacomo Berardi <>"
[docs]class Item:
An item containing a unit of data to process through a pipeline.
Is ia usually generated by the :class:`.stage.Source` subclass and retrieved at the end of a :class:`.pipeline.Pipeline`.
def __init__(self):
self._soft_errors = []
self._critical_errors = []
self._meta = {}
self._payload = {}
self._timings = {}
self._callback_fun = None
self._id = None
def __str__(self) -> str:
return f"Data item {} with payload {self.data_snippet()}..."
def data(self) -> Dict[str, Any]:
Access to the actual data contained in the item
:return: A dictionary in which organizing data by fields (recommended to be JSON serializable)
return self._payload
# deprecated
payload = data # pragma: no cover
[docs] def data_snippet(self, max_size: int = DATA_SNIPPET_SIZE):
A short string representation of the :attr:`` (recommended to override this method)
:param max_size: Maximum size of the string representation
return str([:max_size]
def metadata(self) -> Dict[str, Any]:
Get the actual metadata dictionary
return self._meta
[docs] def set_timing(self, stage_name: str, seconds: float) -> Item:
Set the time spent by a stage (referenced by its name) for processing the item
self._timings[stage_name] = seconds
return self
[docs] def get_timing(self, stage_name: str) -> float:
Get the time spent by a stage (referenced by its name) for processing the item
:return: The time in seconds or None if the item has not been processed by the stage
return self._timings.get(stage_name)
def timed_stages(self) -> KeysView[str]:
Get the stage names for which the item has a process time
return self._timings.keys()
def id(self) -> Any:
Get the unique identifier of the item.
It is recommended to override this in order to properly compute it from the :attr:``
ret = self._payload.get("id")
if ret is None:
ret = self._meta.get("id")
if ret is None:
if self._id is None:
self._id = str(uuid.uuid4())
return self._id
return ret
[docs] def set_callback(self, fun: Callable[[Item], Any]):
Set a function to call after a successful asynchronous execution of a pipeline on the item (through :meth:`.pipeline.Pipeline.process_async`)
self._callback_fun = fun
[docs] def callback(self) -> Any:
Call the function set with :meth:`.Item.set_callback`
if self._callback_fun is not None:
return self._callback_fun(self)
[docs] def has_soft_errors(self) -> bool:
True if the item has raised an :class:`.error.exceptions.SoftError` in some stage processing
return any(self._soft_errors)
[docs] def has_critical_errors(self) -> bool:
True if the item has raised a :class:`.error.exceptions.CriticalError` or any un-managed exception in some stage processing
return any(self._critical_errors)
[docs] def soft_errors(self) -> Generator[SoftError, None, None]:
Iter over :class:`.error.exceptions.SoftError` instances eventually generated in some stage processing
for e in self._soft_errors:
yield e
[docs] def critical_errors(self) -> Generator[CriticalError, None, None]:
Iter over :class:`.error.exceptions.CriticalError` instances or any un-managed exception eventually generated in some stage processing
for e in self._critical_errors:
yield e
[docs] def add_soft_error(
self, stage: str, exception: Union[SoftError, Exception]
) -> SoftError:
*This is called internally by the* :class:`.error.handling.ErrorManager` *when the exception is handled*.
Add an :class:`.error.exceptions.SoftError` generated in a stage (referenced by its name) for the item.
:param exception: It can be an :class:`.error.exceptions.SoftError` instance or any exception, which will be encapsulated in an :class:`.error.exceptions.SoftError`
if type(exception) is not CriticalError:
if isinstance(exception, SoftError):
return exception
elif isinstance(exception, Exception):
error = SoftError(str(exception))
return error
raise ValueError("Add a pipeline SoftError or a generic exception")
[docs] def add_critical_error(
self, stage: str, exception: Union[CriticalError, Exception]
) -> CriticalError:
*This is called internally by the* :class:`.error.handling.ErrorManager` *when the exception is handled*.
Add a :class:`.error.exceptions.CriticalError` generated in a stage (referenced by its name) for the item
:param exception: It can be a :class:`.error.exceptions.CriticalError` instance or any exception, which will be encapsulated in a :class:`.error.exceptions.CriticalError`
if type(exception) is not SoftError:
if isinstance(exception, CriticalError):
return exception
elif isinstance(exception, Exception):
error = CriticalError(str(exception))
return error
raise ValueError("Add a pipeline CriticalError or a generic exception")
# deprecated
DataItem = Item # pragma: no cover
class Stop(Item):
Core "signal" item (*do not use this explicitly*) used in pipelines for determining the end of a flow of items,
like an event passed through all the stages
def __str__(self) -> str:
return f"Stop signal {}"