Source code for smartpipeline.error.handling

from __future__ import annotations

import logging
import uuid
from typing import Any, Optional, Tuple, Type, Union

from smartpipeline.error.exceptions import CriticalError, SoftError
from smartpipeline.item import Item
from smartpipeline.stage import AliveMixin

__author__ = "Giacomo Berardi <giacbrd.com>"


[docs]class ErrorManager: """ Basic error handling of a pipeline, principally manages :class:`.exceptions.SoftError` and :class:`.exceptions.CriticalError` types """ _raise_on_critical = False _skip_on_critical = True
[docs] def raise_on_critical_error(self) -> ErrorManager: """ Set the error manager so that if a :class:`.exceptions.CriticalError` (or any un-managed exception) is encountered, it raises it "externally", at the pipeline run level, killing the pipeline """ self._raise_on_critical = True return self
[docs] def no_skip_on_critical_error(self) -> ErrorManager: """ Change default behaviour of the error manager on :class:`.exceptions.CriticalError`: only skip the stage which raises it, like the :class:`.exceptions.SoftError` . Valid only if :meth:`.ErrorManager.raise_on_critical_error` is not set """ self._skip_on_critical = False return self
[docs] def on_start(self) -> Any: """ Called for a concurrent stage executor in a process (only when multiprocessing concurrency) or simply after construction, by the pipeline. The error manager in the executor is a copy of the original, by overriding this method one can initialize variables specifically for the copies, that is mandatory when they are not serializable. """ pass
[docs] def on_end(self) -> Any: """ Called when the pipeline terminates, useful for executing closing operations (e.g. on files) """ pass
[docs] def handle( self, error: Exception, stage: AliveMixin, item: Item ) -> Optional[CriticalError]: """ Manage an error produced by a stage :param error: It can be a generic exception or an error from :mod:`.exceptions` explicitly raised by a stage :param stage: Stage which raised the exception during processing :param item: Item which raised the exception when processed :return: If the handled error results to be critical return the generated :class:`.exceptions.CriticalError` :raises Exception: When a :meth:`.ErrorManager.raise_on_critical_error` has been set and the error is critical """ item_error: Union[SoftError, CriticalError] if isinstance(error, SoftError): item_error = item.add_soft_error(stage.name, error) else: # any un-managed exception is a potential critical error item_error = item.add_critical_error(stage.name, error) exc_info = (type(item_error), item_error, item_error.__traceback__) self.logger.exception( "%s has generated an error on item %s", stage, item, exc_info=exc_info ) if isinstance(item_error, CriticalError): exception = self._check_critical(item_error) if exception: return item_error return None
@property def logger(self) -> logging.Logger: if getattr(self, "_logger", None) is None: self._logger = logging.getLogger( f"{self.__class__.__name__}-{str(uuid.uuid4())[:8]}" ) return self._logger def _check_critical( self, error: CriticalError ) -> Optional[Union[BaseException, CriticalError]]: """ Manage a critical error, usually after an item has been processed by a stage :return: The exception which caused a critical error if any, otherwise the :class:`.exceptions.CriticalError` itself :raises Exception: When a :meth:`.ErrorManager.raise_on_critical_error` has been set """ ex = error.get_exception() if self._raise_on_critical: raise ex or error elif self._skip_on_critical: return ex or error return None
[docs] def check_critical_errors(self, item: Item) -> Optional[BaseException]: """ Check the critical errors registered for an item """ if item.has_critical_errors(): for er in item.critical_errors(): ex = self._check_critical(er) if ex: return ex return None
class RetryManager: """ This class encapsulate the parameters used to handle the retry strategy in case some kind of error are raise by the stage """ def __init__( self, retryable_errors: Tuple[Type[Exception], ...] = tuple(), max_retries: int = 0, backoff: Union[float, int] = 0, ): """ :param retryable_errors: tuple of errors types which the retry strategy is applied for :param max_retries: maximum number of attempts for a which a stage is run in case of one of the `retryable_errors` is raised during its execution :param backoff: weight for the exponential back-off strategy """ self._backoff = backoff self._max_retries = max_retries self._retryable_errors = retryable_errors @property def backoff(self) -> float: return self._backoff @property def max_retries(self) -> int: return self._max_retries @property def retryable_errors(self) -> Tuple[Type[Exception], ...]: return self._retryable_errors