Guide

Data items

The unit of data of a pipeline is the item, which is represented by Item class or a subclass of it. Data is kept in the Item.data, a read-only dictionary. Item.metadata is for extra stuff as temporary data or descriptors of the item. If the pipeline is going to work on more processes (see later the parallel parameter) an item must me serializable with pickle module, both its data and metadata.

Defining the source

For defining your own processing pipeline you need to define the source and the stages. The Source should be extended and the Source.pop() method overridden. This method returns a single new item every time it is called, it can also return None if it cannot provide a new item when called. When the source has no more items to generate, the method Source.stop() must be called. A stop can happen when the resources from which we retrieve data items are exhausted, e.g.: all the documents in a index have been retrieved or all the lines on CSV file have been read. A source can also generate items indefinitely, then the pipeline will never end.

In this example we define a source that generates 1000 items with a random variable string in the data of each one.

class RandomTextSource(Source):
    def __init__(self, total=1000):
        self._total = total
        self._counter = 0

    def pop(self):
        self._counter += 1
        # when 1000 items are reached we declare a stop,
        # the stop will be still valid for all the next calls of pop()
        if self._counter > self._total:
            self.stop()
            return
        else:
            item = Item()
            text = ''.join(
                random.choices(
                    string.ascii_letters + string.digits,
                    k=random.randint(50, 200)
                )
            )
            item.data.update({
                "text": text,
                "id": hash(text)
            })
            item.metadata['count'] = self._counter
            return item

Defining your stages

A stage is a subclass of Stage or BatchStage.

The actual item data processing happens in the stage method Stage.process(), or BatchStage.process_batch() for lists of items.

A stage receives a single item and returns it, enriched with stage computations. A batch stage, instead, processes multiple items at once. This is useful when the computation can exploit handling more data together, e.g.: on a HTTP API that accepts lists of values, one would benefit by making less calls; on a machine learning model that is optimized for predicting on multiple samples.

When using concurrency, each concurrent stage will call the process method on different subsets of the data flow, concurrently.

Each stage provides its own logger in Stage.logger.

A simple example of a stage that takes the items generated in the previous example and substitutes specific patterns in the string with a fixed string.

class TextReplacer(Stage):
    def __init__(self, substitution):
        self._pattern = re.compile("\d\d\D\D")
        self._sub = substitution

    def process(self, item):
        new_text = re.sub(self._sub, item.data["text"])
        if item.data["text"] == new_text:
            # even if we raise SoftError the item will continue its path through the pipeline
            raise SoftError("Text has not been modified")
        item.data["text"] = new_text
        return item

Error handling

In the previous code snippet we raise a SoftError in case no modifications are made to the content of the item. The ErrorManager will take care of this but the item will still be processed by the next stages in the pipeline. By extending ErrorManager you can define custom handling for these kind of “soft” errors, but also for all other exceptions.

SoftError exceptions have to be explicitly raised. A soft error does not interrupt an item processing through the pipeline, the item processing is skipped just for the stage. Be careful on batch stages: raising a soft error, while iterating on batch items, will make skip also all the items of the batch following the item that has produced the error.

A CriticalError is raised for any non-captured exception, or it may be raised explicitly: it stops the processing of an item so that the pipeline goes ahead with the next one.

It is recommended to use the explicit exception chaining when explicitly raising a SoftError or a CriticalError exception.

Setting and running the pipeline

Once you have your set of stages you can add them in sequence to a Pipeline instance, following a “builder” pattern. Pipeline.append() is the main method for adding stages to a pipeline. One must define stages unique names and eventually their concurrency. The concurrency parameter is default to 0, a stage is concurrent when the value is 1 or greater. In case of values equal or greater than 1, and by setting parallel to True, Python multiprocessing is used: stage concurrent executions will run in parallel, stage instances will be copied in each process.

Consider using threads when I/O blocking operations are prevalent, while using multiprocessing when stages execute long computations on data. In case of no concurrency the pipeline simply runs a “chain” of Stage.process() on each item, while with concurrency Python queues are involved and items may be serialized.

If you intend to define stages that can run on multiple processes, please read Parallel stages and on_start method about further, important details.

Through Pipeline.append() one can also define a retry policy on some specific errors (see method documentation for further details).

Another method is Pipeline.append_concurrently(), which allows to execute stages creation concurrently with other stages appending calls. Useful when long tasks must be executed at creation, e.g., the stage carries the construction of big data structures.

Remember to call Pipeline.build() at the end of stages “concatenation”.

Finally, from the previous example, we define another stage that reduces text size and we run the pipeline

class TextReducer(Stage):
    def process(self, item):
        item.data["text"] = item.data["text"][:40]
        return item

pipeline = (
    Pipeline()
    .set_source(RandomTextSource())
    .append("text_replacer", TextReplacer(substitution="XXX"))
    .append("text_reducer", TextReducer())
    .build()
)

for item in pipeline.run():
    print(item.data["text"])

A different example in which we process 100 items concurrently with Pipeline.process_async() (here “async” is not related to Python asyncio), without running the pipeline but explicitly executing a pipeline processing on each one. Note that no source is defined here.

pipeline = (
    Pipeline()
    .append("text_replacer", TextReplacer(substitution="XXX"), concurrency=3)
    .append("text_reducer", TextReducer(), concurrency=1)
    .build()
)
# "manually" send 100 items to the pipeline
for _ in range(100):
    item = Item()
    text = ''.join(
        random.choices(
            string.ascii_letters + string.digits,
            k=random.randint(50, 200)
        )
    )
    item.data.update({
        "text": text,
        "id": hash(text)
    })
    pipeline.process_async(item)
# retrieve the processed items
for _ in range(100):
    print(pipeline.get_item().data["text"])
# explicitly stop the pipeline when there are no more items
pipeline.stop()

It is possible to use Pipeline.process() when no stage is concurrent, each item will be processed and returned directly by this method.

A further example

Example of a pipeline that processes local files contained in ./document_files directory, extracts texts and finds VAT codes occurrences. Finally it indexes the result in an Elasticsearch cluster. Errors are eventually logged in the Elasticsearch cluster. Here the developer has defined his own custom error manager and, obviously, the stages. The source must be usually defined, here a trivial one (from the codebase) has been used, together with a custom data item type that provides a file reference.

More, executables examples can be found in the root sub-directory examples.

from smartpipeline.pipeline import Pipeline
from smartpipeline.stage import Stage, AliveMixin
from smartpipeline.item import Item
from smartpipeline.error.handling import ErrorManager
from smartpipeline.error.exceptions import SoftError, CriticalError
from smartpipeline.helpers import LocalFilesSource, FilePathItem
from elasticsearch import Elasticsearch
from typing import Optional
import logging, re


class ESErrorLogger(ErrorManager):
    """An error manager that writes error info into an Elasticsearch index"""

    def __init__(self, es_host: str, es_index: str):
        self.es_host = es_host
        self.es_index = es_index
        self.es_client = Elasticsearch(self.es_host)

    def handle(
        self, error: Exception, stage: AliveMixin, item: Item
    ) -> Optional[CriticalError]:
        if isinstance(error, SoftError):
            error = error.get_exception()
        self.es_client.index(
            index=self.es_index,
            body={
                "stage": str(stage),
                "item": str(item),
                "exception": type(error),
                "message": str(error),
            },
        )
        return super().handle(error, stage, item)


class TextExtractor(Stage):
    """Read the text content of files"""

    def process(self, item: FilePathItem) -> Item:
        try:
            with open(item.path) as f:
                item.data["text"] = f.read()
        except IOError as e:
            # even if we are unable to read the file content the item will processed by next stages
            # we encapsulate the exception in a "soft error"
            raise SoftError(f"Problems in reading file {item.path}") from e
        return item


class VatFinder(Stage):
    """Identify Italian VAT codes in texts"""

    def __init__(self):
        self.regex = re.compile(
            "^[A-Za-z]{2,4}(?=.{2,12}$)[-_\s0-9]*(?:[a-zA-Z][-_\s0-9]*){0,2}$"
        )

    def process(self, item: Item) -> Item:
        vat_codes = []
        for vat_match in self.regex.finditer(item.data.get("text", "")):
            vat_codes.append((vat_match.start(), vat_match.end()))
        item.data["vat_codes"] = vat_codes
        return item


class Indexer(Stage):
    """Write item payloads into an Elasticsearch index"""

    def __init__(self, es_host: str, es_index: str):
        self.es_host = es_host
        self.es_index = es_index
        self.es_client = Elasticsearch(self.es_host)

    def process(self, item: Item) -> Item:
        self.es_client.index(index=self.es_index, body=item.data)
        return item


pipeline = (
    Pipeline()
    .set_error_manager(
        ESErrorLogger(
            es_host="localhost:9200", es_index="error_logs"
        ).raise_on_critical_error()
    )
    .set_source(LocalFilesSource("./document_files", postfix=".html"))
    .append("text_extractor", TextExtractor(), concurrency=2)
    .append("vat_finder", VatFinder())
    .append("indexer", Indexer(es_host="localhost:9200", es_index="documents"))
    .build()
)

for item in pipeline.run():
    logging.info("Processed document: %s", item)

Parallel stages and on_start method

The only way Python allows to run code in parallel is through multiple OS processes, with the package multiprocessing (threads cannot run in parallel because the GIL).

When we submit a Python function to a spawned/forked process we are actually copying memory from the current process to the new one, because OS processes cannot share memory, differently from multi-threading. In order to do this (at least for spawned processes) the data we want to pass to a new process must be serialized. Even communication between processes involves copying data from one to another (e.g. through queues). Moreover, for child processes that are not created with “fork” method, the memory of the parent won’t be copied completely.

Therefore, if we decide to run a pipeline stage concurrently and in parallel, the stage is going to be copied to each process. This means that the stage must be “pickleable”: serializable with the pickle module. If we want to define non-serializable attributes in our stage object and run it on more processes, we must find a way generate these attributes for each object copy in each process.

This is what Stage.on_start() method solves. It is simply used to initialize attributes “a posteriori”. It is normally called after __init__, but in case of execution on multiple processes, it is called once, on the stage copy, at process start. This allows stateful stages, locally to each process; it is also useful for safety and for avoiding copying large data.

Also for ErrorManager it is necessary to define ErrorManager.on_start(), because the manager must be coupled with a stage when it is copied.

Let’s take back the previous examples, the error manger and a stage need to be modified if we want to run the stage in parallel. The inconvenience here is the Elasticsearch client, which is not serializable (try it by yourself, e.g., pickle.dumps(Elasticsearch('localhost'))). Moreover, an Elasticsearch client opens a connection, consequently we desire an independent connection for each process, sharing one is unpractical.

This is how we refactor the original __init__ methods

class ESErrorLogger(ErrorManager):

    def __init__(self, es_host: str, es_index: str):
        self.es_host = es_host
        self.es_index = es_index
        self.es_client = None

    def on_start(self):
        self.es_client = Elasticsearch(self.es_host)


class Indexer(Stage):

    def __init__(self, es_host: str, es_index: str):
        self.es_host = es_host
        self.es_index = es_index
        self.es_client = None

    def on_start(self):
        self.es_client = Elasticsearch(self.es_host)

The effort for the developer is minimal, but the advantage big. We can now execute these pipeline abstractions in parallel, not limited to stateless methods as we would normally do with multiprocessing. In general, it is convenient to always override on_start if the attributes we are going to construct require this special treatment, so that the stage will be always compatible with both the three ways of running it: sequentially, concurrently on threads and on processes.

A complementary method is on_end, both for stages and error manager, which allows to call operations at pipeline exit, even when this is caused by an error. Useful, for example, for closing files or connections we have opened in on_start.

Parallel stages and logging

The on_start method is especially useful for configuring stage loggers.

Unfortunately a stage logger configuration, like the log level, and even the global logging configuration, won’t be inherited by stages running on sub-processes (this actually happens when the fork method is not used for creating child processes).

By defining the logging configuration of Stage.logger inside the on_start overriding, one can solve this issue.