Developer Interface
Source and Stages
Base abstract classes for pipeline source and stages
- class smartpipeline.stage.Source[source]
Extend this for defining a pipeline source
- property is_stopped: bool
True if the source has called the stop event
- property logger: Logger
Specific logger for this object
- abstract pop()[source]
Generate items for feeding a pipeline. Must be overridden for properly defining a source. Call
Source.stop()
when item generation is ended- Return type
Optional
[Item
]- Returns
The generated item, if None it is simply ignored (e.g. after calling
Source.stop()
)
- class smartpipeline.stage.Stage[source]
Extend this class and override
Stage.process()
for defining a stage- property logger: Logger
Specific logger for this object
- on_end()
Called when the stage terminates, useful for executing closing operations (e.g. on files)
- Return type
Any
- on_start()
Called after concurrent stage executor initialization in a process (only on multiprocessing concurrency) or after construction in all other cases, by the pipeline. The stage in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable.
- Return type
Any
- class smartpipeline.stage.BatchStage(size, timeout=1.0)[source]
Extend this class and override
BatchStage.process_batch()
for defining a batch stage- property logger: Logger
Specific logger for this object
- on_end()
Called when the stage terminates, useful for executing closing operations (e.g. on files)
- Return type
Any
- on_start()
Called after concurrent stage executor initialization in a process (only on multiprocessing concurrency) or after construction in all other cases, by the pipeline. The stage in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable.
- Return type
Any
- abstract process_batch(items)
Process a batch of items received by the stage. Must be overridden for properly defining a batch stage
- Return type
Sequence
[Item
]- Returns
The same batch with items processed and enriched by the stage
- property size: int
Get the maximum size of item batches that can be processed together
- property timeout: float
Seconds to wait before flushing a batch (calling
BatchStage.process_batch()
on it)
Data Items
The unit of data in a pipeline
- class smartpipeline.item.Item[source]
An item containing a unit of data to process through a pipeline. Is ia usually generated by the
stage.Source
subclass and retrieved at the end of apipeline.Pipeline
.- add_critical_error(stage, exception)[source]
This is called internally by the
error.handling.ErrorManager
when the exception is handled. Add aerror.exceptions.CriticalError
generated in a stage (referenced by its name) for the item- Parameters
exception (
Union
[CriticalError
,Exception
]) – It can be aerror.exceptions.CriticalError
instance or any exception, which will be encapsulated in aerror.exceptions.CriticalError
- Return type
- add_soft_error(stage, exception)[source]
This is called internally by the
error.handling.ErrorManager
when the exception is handled. Add anerror.exceptions.SoftError
generated in a stage (referenced by its name) for the item.- Parameters
exception (
Union
[SoftError
,Exception
]) – It can be anerror.exceptions.SoftError
instance or any exception, which will be encapsulated in anerror.exceptions.SoftError
- Return type
- callback()[source]
Call the function set with
Item.set_callback()
- Return type
Any
- critical_errors()[source]
Iter over
error.exceptions.CriticalError
instances or any un-managed exception eventually generated in some stage processing- Return type
Generator
[CriticalError
,None
,None
]
- property data: Dict[str, Any]
Access to the actual data contained in the item
- Returns
A dictionary in which organizing data by fields (recommended to be JSON serializable)
- data_snippet(max_size=100)[source]
A short string representation of the
Item.data
(recommended to override this method)- Parameters
max_size (
int
) – Maximum size of the string representation
- get_metadata(field)[source]
DEPRECATED: Get a metadata value by its name
- Return type
Any
- Returns
A value or None if the metadata does not exist in the item
- get_timing(stage_name)[source]
Get the time spent by a stage (referenced by its name) for processing the item
- Return type
float
- Returns
The time in seconds or None if the item has not been processed by the stage
- has_critical_errors()[source]
True if the item has raised a
error.exceptions.CriticalError
or any un-managed exception in some stage processing- Return type
bool
- has_soft_errors()[source]
True if the item has raised an
error.exceptions.SoftError
in some stage processing- Return type
bool
- property id: Any
Get the unique identifier of the item. It is recommended to override this in order to properly compute it from the
Item.data
- property metadata: Dict[str, Any]
Get the actual metadata dictionary
- property payload: Dict[str, Any]
Access to the actual data contained in the item
- Returns
A dictionary in which organizing data by fields (recommended to be JSON serializable)
- set_callback(fun)[source]
Set a function to call after a successful asynchronous execution of a pipeline on the item (through
pipeline.Pipeline.process_async()
)
- set_metadata(field, value)[source]
DEPRECATED: Add a metadata, something we want to remember but keep outside the actual data in
Item.data
- Parameters
field (
str
) – Name of the metadata variable- Return type
- set_timing(stage_name, seconds)[source]
Set the time spent by a stage (referenced by its name) for processing the item
- Return type
- soft_errors()[source]
Iter over
error.exceptions.SoftError
instances eventually generated in some stage processing- Return type
Generator
[SoftError
,None
,None
]
- property timed_stages: KeysView[str]
Get the stage names for which the item has a process time
Pipeline
Main class for designing the sequence of stages and execute them
- class smartpipeline.pipeline.Pipeline(max_init_workers=None, max_queues_size=1000)[source]
- append(name, stage, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)[source]
Append a stage to the pipeline just after the last one appended, or after the source if it is the first stage
- Parameters
name (
str
) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipelinestage (
Union
[Stage
,BatchStage
]) – Instance of a stageconcurrency (
int
) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stageparallel (
bool
) – If True use multiprocessing, otherwise threadsretryable_errors (
Tuple
[Type
[Exception
],...
]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, aerror.exceptions.RetryError
is raisedmax_retries (
int
) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)backoff (
Union
[float
,int
]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)
- Return type
- append_concurrently(name, stage_class, args=None, kwargs=None, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)[source]
Append a stage class to the pipeline just after the last one appended, or after the source if it is the first stage. The stage construction will be executed concurrently respect to the general pipeline construction
- Parameters
name (
str
) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipelinestage_class (
Type
[Union
[Stage
,BatchStage
]]) – Class of a stageargs (
Optional
[Sequence
]) – List of arguments for the stage constructorkwargs (
Optional
[Mapping
]) – Dictionary of keyed arguments for the stage constructorconcurrency (
int
) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stageparallel (
bool
) – If True use multiprocessing, otherwise threads (also for concurrent construction)retryable_errors (
Tuple
[Type
[Exception
],...
]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, aerror.exceptions.RetryError
is raisedmax_retries (
int
) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)backoff (
Union
[float
,int
]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)
- Return type
- append_stage(name, stage, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)
Append a stage to the pipeline just after the last one appended, or after the source if it is the first stage
- Parameters
name (
str
) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipelinestage (
Union
[Stage
,BatchStage
]) – Instance of a stageconcurrency (
int
) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stageparallel (
bool
) – If True use multiprocessing, otherwise threadsretryable_errors (
Tuple
[Type
[Exception
],...
]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, aerror.exceptions.RetryError
is raisedmax_retries (
int
) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)backoff (
Union
[float
,int
]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)
- Return type
- append_stage_concurrently(name, stage_class, args=None, kwargs=None, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)
Append a stage class to the pipeline just after the last one appended, or after the source if it is the first stage. The stage construction will be executed concurrently respect to the general pipeline construction
- Parameters
name (
str
) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipelinestage_class (
Type
[Union
[Stage
,BatchStage
]]) – Class of a stageargs (
Optional
[Sequence
]) – List of arguments for the stage constructorkwargs (
Optional
[Mapping
]) – Dictionary of keyed arguments for the stage constructorconcurrency (
int
) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stageparallel (
bool
) – If True use multiprocessing, otherwise threads (also for concurrent construction)retryable_errors (
Tuple
[Type
[Exception
],...
]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, aerror.exceptions.RetryError
is raisedmax_retries (
int
) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)backoff (
Union
[float
,int
]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)
- Return type
- property count: int
Get the number of items processed by all executed runs, also for items which have failed
- Returns
Count of processed items
- get_item(block=True)[source]
Get a single item from the asynchronous execution of the pipeline on single items from
Pipeline.process_async()
- Parameters
block (
bool
) – If True wait indefinitely for the next processed item- Raises
ValueError – When there is not output queue set, the pipeline is not running asynchronously
queue.Empty – When we do not block and the queue is empty
- Return type
- get_stage(name)[source]
Get a stage instance by its name
- Return type
Union
[Stage
,BatchStage
,None
]
- property name: str
Pipeline unique name, equivalent to its logger name
- process(item)[source]
Process a single item synchronously (no concurrency) through the pipeline
- Return type
- process_async(item, callback=None)[source]
Process a single item asynchronously through the pipeline, stages may run concurrently. The call returns immediately, processed items are retrieved with
Pipeline.get_item()
- Parameters
callback (
Optional
[Callable
[[Item
],Any
]]) – A function to call after a successful process of the item
- run()[source]
Run the pipeline given a source and a concatenation of stages. Get the sequence of items through iteration
- Return type
Generator
[Item
,None
,None
]- Returns
Iterator over processed items
- Raises
ValueError – When a source has not been set for the pipeline
- set_error_manager(error_manager)[source]
Set the error manager for handling errors from each stage item processing
- Return type
- set_source(source)[source]
Set the source of the pipeline: a subclass of
stage.Source
- Return type
Error Handling
Exceptions to generate in case of errors and how to handle them
- class smartpipeline.error.exceptions.SoftError(*args, **kwargs)[source]
A type of exception which usually only provokes skipping a stage for an item
- add_note()
Exception.add_note(note) – add a note to the exception
- get_exception()
Get the original exception (if any) that has generated this error, equivalent to the __cause__ attribute
- Return type
Optional
[BaseException
]
- with_exception(exception)
Set the original exception (if any) that has generated this error, equivalent to explicit exception chaining
- Return type
Error
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class smartpipeline.error.exceptions.CriticalError(*args, **kwargs)[source]
A type of exception which usually provokes skipping the whole pipeline for an item
- add_note()
Exception.add_note(note) – add a note to the exception
- get_exception()
Get the original exception (if any) that has generated this error, equivalent to the __cause__ attribute
- Return type
Optional
[BaseException
]
- with_exception(exception)
Set the original exception (if any) that has generated this error, equivalent to explicit exception chaining
- Return type
Error
- with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class smartpipeline.error.handling.ErrorManager[source]
Basic error handling of a pipeline, principally manages
exceptions.SoftError
andexceptions.CriticalError
types- check_critical_errors(item)[source]
Check the critical errors registered for an item
- Return type
Optional
[BaseException
]
- handle(error, stage, item)[source]
Manage an error produced by a stage
- Parameters
error (
Exception
) – It can be a generic exception or an error fromexceptions
explicitly raised by a stagestage (
AliveMixin
) – Stage which raised the exception during processingitem (
Item
) – Item which raised the exception when processed
- Return type
Optional
[CriticalError
]- Returns
If the handled error results to be critical return the generated
exceptions.CriticalError
- Raises
Exception – When a
ErrorManager.raise_on_critical_error()
has been set and the error is critical
- no_skip_on_critical_error()[source]
Change default behaviour of the error manager on
exceptions.CriticalError
: only skip the stage which raises it, like theexceptions.SoftError
. Valid only ifErrorManager.raise_on_critical_error()
is not set- Return type
- on_end()[source]
Called when the pipeline terminates, useful for executing closing operations (e.g. on files)
- Return type
Any
- on_start()[source]
Called for a concurrent stage executor in a process (only when multiprocessing concurrency) or simply after construction, by the pipeline. The error manager in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable.
- Return type
Any
- raise_on_critical_error()[source]
Set the error manager so that if a
exceptions.CriticalError
(or any un-managed exception) is encountered, it raises it “externally”, at the pipeline run level, killing the pipeline- Return type