Developer Interface

Source and Stages

Base abstract classes for pipeline source and stages

class smartpipeline.stage.Source[source]

Extend this for defining a pipeline source

property is_stopped: bool

True if the source has called the stop event

property logger: Logger

Specific logger for this object

abstract pop()[source]

Generate items for feeding a pipeline. Must be overridden for properly defining a source. Call Source.stop() when item generation is ended

Return type

Optional[Item]

Returns

The generated item, if None it is simply ignored (e.g. after calling Source.stop())

stop()[source]

Declare the end item generation, this event will be spread through the pipeline

class smartpipeline.stage.Stage[source]

Extend this class and override Stage.process() for defining a stage

property logger: Logger

Specific logger for this object

on_end()

Called when the stage terminates, useful for executing closing operations (e.g. on files)

Return type

Any

on_start()

Called after concurrent stage executor initialization in a process (only on multiprocessing concurrency) or after construction in all other cases, by the pipeline. The stage in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable.

Return type

Any

abstract process(item)

Process a single item received by the stage. Must be overridden for properly defining a stage

Return type

Item

Returns

The same item instance processed and enriched by the stage

class smartpipeline.stage.BatchStage(size, timeout=1.0)[source]

Extend this class and override BatchStage.process_batch() for defining a batch stage

property logger: Logger

Specific logger for this object

on_end()

Called when the stage terminates, useful for executing closing operations (e.g. on files)

Return type

Any

on_start()

Called after concurrent stage executor initialization in a process (only on multiprocessing concurrency) or after construction in all other cases, by the pipeline. The stage in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable.

Return type

Any

abstract process_batch(items)

Process a batch of items received by the stage. Must be overridden for properly defining a batch stage

Return type

Sequence[Item]

Returns

The same batch with items processed and enriched by the stage

property size: int

Get the maximum size of item batches that can be processed together

property timeout: float

Seconds to wait before flushing a batch (calling BatchStage.process_batch() on it)

Data Items

The unit of data in a pipeline

class smartpipeline.item.Item[source]

An item containing a unit of data to process through a pipeline. Is ia usually generated by the stage.Source subclass and retrieved at the end of a pipeline.Pipeline.

add_critical_error(stage, exception)[source]

This is called internally by the error.handling.ErrorManager when the exception is handled. Add a error.exceptions.CriticalError generated in a stage (referenced by its name) for the item

Parameters

exception (Union[CriticalError, Exception]) – It can be a error.exceptions.CriticalError instance or any exception, which will be encapsulated in a error.exceptions.CriticalError

Return type

CriticalError

add_soft_error(stage, exception)[source]

This is called internally by the error.handling.ErrorManager when the exception is handled. Add an error.exceptions.SoftError generated in a stage (referenced by its name) for the item.

Parameters

exception (Union[SoftError, Exception]) – It can be an error.exceptions.SoftError instance or any exception, which will be encapsulated in an error.exceptions.SoftError

Return type

SoftError

callback()[source]

Call the function set with Item.set_callback()

Return type

Any

critical_errors()[source]

Iter over error.exceptions.CriticalError instances or any un-managed exception eventually generated in some stage processing

Return type

Generator[CriticalError, None, None]

property data: Dict[str, Any]

Access to the actual data contained in the item

Returns

A dictionary in which organizing data by fields (recommended to be JSON serializable)

data_snippet(max_size=100)[source]

A short string representation of the Item.data (recommended to override this method)

Parameters

max_size (int) – Maximum size of the string representation

get_metadata(field)[source]

DEPRECATED: Get a metadata value by its name

Return type

Any

Returns

A value or None if the metadata does not exist in the item

get_timing(stage_name)[source]

Get the time spent by a stage (referenced by its name) for processing the item

Return type

float

Returns

The time in seconds or None if the item has not been processed by the stage

has_critical_errors()[source]

True if the item has raised a error.exceptions.CriticalError or any un-managed exception in some stage processing

Return type

bool

has_soft_errors()[source]

True if the item has raised an error.exceptions.SoftError in some stage processing

Return type

bool

property id: Any

Get the unique identifier of the item. It is recommended to override this in order to properly compute it from the Item.data

property metadata: Dict[str, Any]

Get the actual metadata dictionary

property payload: Dict[str, Any]

Access to the actual data contained in the item

Returns

A dictionary in which organizing data by fields (recommended to be JSON serializable)

set_callback(fun)[source]

Set a function to call after a successful asynchronous execution of a pipeline on the item (through pipeline.Pipeline.process_async())

set_metadata(field, value)[source]

DEPRECATED: Add a metadata, something we want to remember but keep outside the actual data in Item.data

Parameters

field (str) – Name of the metadata variable

Return type

Item

set_timing(stage_name, seconds)[source]

Set the time spent by a stage (referenced by its name) for processing the item

Return type

Item

soft_errors()[source]

Iter over error.exceptions.SoftError instances eventually generated in some stage processing

Return type

Generator[SoftError, None, None]

property timed_stages: KeysView[str]

Get the stage names for which the item has a process time

Pipeline

Main class for designing the sequence of stages and execute them

class smartpipeline.pipeline.Pipeline(max_init_workers=None, max_queues_size=1000)[source]
append(name, stage, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)[source]

Append a stage to the pipeline just after the last one appended, or after the source if it is the first stage

Parameters
  • name (str) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipeline

  • stage (Union[Stage, BatchStage]) – Instance of a stage

  • concurrency (int) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stage

  • parallel (bool) – If True use multiprocessing, otherwise threads

  • retryable_errors (Tuple[Type[Exception], ...]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, a error.exceptions.RetryError is raised

  • max_retries (int) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)

  • backoff (Union[float, int]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)

Return type

Pipeline

append_concurrently(name, stage_class, args=None, kwargs=None, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)[source]

Append a stage class to the pipeline just after the last one appended, or after the source if it is the first stage. The stage construction will be executed concurrently respect to the general pipeline construction

Parameters
  • name (str) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipeline

  • stage_class (Type[Union[Stage, BatchStage]]) – Class of a stage

  • args (Optional[Sequence]) – List of arguments for the stage constructor

  • kwargs (Optional[Mapping]) – Dictionary of keyed arguments for the stage constructor

  • concurrency (int) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stage

  • parallel (bool) – If True use multiprocessing, otherwise threads (also for concurrent construction)

  • retryable_errors (Tuple[Type[Exception], ...]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, a error.exceptions.RetryError is raised

  • max_retries (int) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)

  • backoff (Union[float, int]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)

Return type

Pipeline

append_stage(name, stage, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)

Append a stage to the pipeline just after the last one appended, or after the source if it is the first stage

Parameters
  • name (str) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipeline

  • stage (Union[Stage, BatchStage]) – Instance of a stage

  • concurrency (int) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stage

  • parallel (bool) – If True use multiprocessing, otherwise threads

  • retryable_errors (Tuple[Type[Exception], ...]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, a error.exceptions.RetryError is raised

  • max_retries (int) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)

  • backoff (Union[float, int]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)

Return type

Pipeline

append_stage_concurrently(name, stage_class, args=None, kwargs=None, concurrency=0, parallel=False, retryable_errors=(), max_retries=0, backoff=0.0)

Append a stage class to the pipeline just after the last one appended, or after the source if it is the first stage. The stage construction will be executed concurrently respect to the general pipeline construction

Parameters
  • name (str) – Name for identify the stage in the pipeline, it is also set in the stage and it must be unique in the pipeline

  • stage_class (Type[Union[Stage, BatchStage]]) – Class of a stage

  • args (Optional[Sequence]) – List of arguments for the stage constructor

  • kwargs (Optional[Mapping]) – Dictionary of keyed arguments for the stage constructor

  • concurrency (int) – Number of concurrent stage executions, if 0 then threads/processes won’t be involved for this stage

  • parallel (bool) – If True use multiprocessing, otherwise threads (also for concurrent construction)

  • retryable_errors (Tuple[Type[Exception], ...]) – list of exceptions for which the stage applies an exponential backoff strategy. When the maximum number of retries is hit, a error.exceptions.RetryError is raised

  • max_retries (int) – maximum number of retries for the stage before raising a RetryError(SoftError) (default 0)

  • backoff (Union[float, int]) – backoff factor for the exponential backoff strategy used by the stage when it raises one of the exceptions declared in retryable_errors param (default 0.0)

Return type

Pipeline

build()[source]

Pipeline builder method

Return type

Pipeline

property count: int

Get the number of items processed by all executed runs, also for items which have failed

Returns

Count of processed items

get_item(block=True)[source]

Get a single item from the asynchronous execution of the pipeline on single items from Pipeline.process_async()

Parameters

block (bool) – If True wait indefinitely for the next processed item

Raises
  • ValueError – When there is not output queue set, the pipeline is not running asynchronously

  • queue.Empty – When we do not block and the queue is empty

Return type

Item

get_stage(name)[source]

Get a stage instance by its name

Return type

Union[Stage, BatchStage, None]

property name: str

Pipeline unique name, equivalent to its logger name

process(item)[source]

Process a single item synchronously (no concurrency) through the pipeline

Return type

Item

process_async(item, callback=None)[source]

Process a single item asynchronously through the pipeline, stages may run concurrently. The call returns immediately, processed items are retrieved with Pipeline.get_item()

Parameters

callback (Optional[Callable[[Item], Any]]) – A function to call after a successful process of the item

run()[source]

Run the pipeline given a source and a concatenation of stages. Get the sequence of items through iteration

Return type

Generator[Item, None, None]

Returns

Iterator over processed items

Raises

ValueError – When a source has not been set for the pipeline

set_error_manager(error_manager)[source]

Set the error manager for handling errors from each stage item processing

Return type

Pipeline

set_source(source)[source]

Set the source of the pipeline: a subclass of stage.Source

Return type

Pipeline

shutdown()[source]

Execute shutdown of various pool executors and multiprocessing stuff. The developer should not need to call it explicitly.

stop()[source]

Tell the source to stop to generate items and consequently terminate the pipeline after all “remaining” items are processed.

Error Handling

Exceptions to generate in case of errors and how to handle them

class smartpipeline.error.exceptions.SoftError(*args, **kwargs)[source]

A type of exception which usually only provokes skipping a stage for an item

add_note()

Exception.add_note(note) – add a note to the exception

get_exception()

Get the original exception (if any) that has generated this error, equivalent to the __cause__ attribute

Return type

Optional[BaseException]

with_exception(exception)

Set the original exception (if any) that has generated this error, equivalent to explicit exception chaining

Return type

Error

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class smartpipeline.error.exceptions.CriticalError(*args, **kwargs)[source]

A type of exception which usually provokes skipping the whole pipeline for an item

add_note()

Exception.add_note(note) – add a note to the exception

get_exception()

Get the original exception (if any) that has generated this error, equivalent to the __cause__ attribute

Return type

Optional[BaseException]

with_exception(exception)

Set the original exception (if any) that has generated this error, equivalent to explicit exception chaining

Return type

Error

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class smartpipeline.error.handling.ErrorManager[source]

Basic error handling of a pipeline, principally manages exceptions.SoftError and exceptions.CriticalError types

check_critical_errors(item)[source]

Check the critical errors registered for an item

Return type

Optional[BaseException]

handle(error, stage, item)[source]

Manage an error produced by a stage

Parameters
  • error (Exception) – It can be a generic exception or an error from exceptions explicitly raised by a stage

  • stage (AliveMixin) – Stage which raised the exception during processing

  • item (Item) – Item which raised the exception when processed

Return type

Optional[CriticalError]

Returns

If the handled error results to be critical return the generated exceptions.CriticalError

Raises

Exception – When a ErrorManager.raise_on_critical_error() has been set and the error is critical

no_skip_on_critical_error()[source]

Change default behaviour of the error manager on exceptions.CriticalError: only skip the stage which raises it, like the exceptions.SoftError . Valid only if ErrorManager.raise_on_critical_error() is not set

Return type

ErrorManager

on_end()[source]

Called when the pipeline terminates, useful for executing closing operations (e.g. on files)

Return type

Any

on_start()[source]

Called for a concurrent stage executor in a process (only when multiprocessing concurrency) or simply after construction, by the pipeline. The error manager in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable.

Return type

Any

raise_on_critical_error()[source]

Set the error manager so that if a exceptions.CriticalError (or any un-managed exception) is encountered, it raises it “externally”, at the pipeline run level, killing the pipeline

Return type

ErrorManager