Source code for dispel.processing.core

r"""Core functionality to process :class:`~dispel.data.core.Reading`\ s."""
import copy
import inspect
import warnings
from abc import abstractmethod
from collections import abc
from dataclasses import dataclass
from enum import Enum
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    Generic,
    Iterable,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)

from dispel.data.core import EntityType, Reading
from dispel.data.flags import Flag, FlagSeverity, FlagType
from dispel.data.levels import Level, LevelEpoch
from dispel.data.measures import MeasureSet, MeasureValue
from dispel.data.raw import RawDataSet
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing.flags import FlagStepMixin

SourcesType = Union[Iterable[EntityType], EntityType]

TargetsType = Union[Iterable[EntityType], EntityType]

ParameterType = TypeVar("ParameterType")


[docs] class ProcessingError(Exception): """A base error class for errors that were caused during the processing. Parameters ---------- step The processing step that raised the exception message The error message """
[docs] def __init__(self, message: str, step: "ProcessingStep"): self.step = step super().__init__(f"{message} at step: {step.__class__.__name__}.")
[docs] class StopProcessingError(ProcessingError): """An error that halts the processing of a reading."""
[docs] class FlagError(ProcessingError): """An error that flags the processing results from a reading."""
[docs] def __init__(self, flag: Flag, step: "ProcessingStep"): self.flag = flag super().__init__(self.flag.reason, step)
[docs] class InvalidDataError(ProcessingError): """An error that flags the processing input."""
[docs] @dataclass(frozen=True) class ProcessingResultBase: """The processing result base of a processing step.""" #: The processing step associated with the processing step: "ProcessingStep"
[docs] def get_kwargs(self) -> Dict[str, Any]: """Get key word arguments for the additional class attributes. The additional arguments are passed to the setter function in :class:`~dispel.data.core.Reading` while assigning processing results and creating the data trace. It allows to handle the result to be set depending on the arguments passed. Returns ------- Dict[str, Any] Returns a dictionary of attributes of the instance omitting the step. """ kwargs = self.__dict__.copy() del kwargs["step"] return kwargs
[docs] @dataclass(frozen=True) class ProcessingResult(ProcessingResultBase): """The processing result of a processing step.""" #: The sources used for the processing function sources: SourcesType #: The result of the processing function result: Union[Level, MeasureValue, MeasureSet, LevelEpoch, RawDataSet]
[docs] def get_kwargs(self) -> Dict[str, Any]: """Get key word arguments for the additional class attributes. The additional arguments are passed to the setter function in :class:`~dispel.data.core.Reading` while assigning processing results and creating the data trace. It allows to handle the result to be set depending on the arguments passed. Returns ------- Dict[str, Any] Returns a dictionary of attributes of the instance omitting the step, sources, and result. """ kwargs = super().get_kwargs() del kwargs["sources"] del kwargs["result"] return kwargs
[docs] def get_sources(self) -> Iterable[SourcesType]: """Get the sources of the processing result.""" if isinstance(self.sources, abc.Iterable): return self.sources return [self.sources]
[docs] class ErrorHandling(Enum): """Different ways of dealing with an exception.""" RAISE = "raise" IGNORE = "ignore" @property def should_raise(self) -> bool: """Return ``True`` if the handling is to overwrite.""" return self is self.RAISE def __bool__(self) -> bool: return self is self.RAISE
[docs] @classmethod def from_bool(cls, stop_processing: bool) -> "ErrorHandling": """Create the error handling class from a stop_processing boolean.""" if stop_processing: return ErrorHandling.RAISE return ErrorHandling.IGNORE
[docs] @dataclass(frozen=True) class ProcessingControlResult(ProcessingResultBase): """The processing result of a processing error.""" #: The captured processing error error: Union[ProcessingError, StopProcessingError, FlagError, InvalidDataError] #: The type of handling for the captured error error_handling: ErrorHandling #: The flag targets (if the error is an flag error) targets: Optional[TargetsType] = None def __post_init__(self): if not isinstance(self.error, ProcessingError): raise ValueError("Processing control result has to be an exception.")
[docs] def get_targets(self) -> Iterable[EntityType]: """Get the targets of the flag.""" if self.targets is None: raise ValueError("Missing flag targets.") if isinstance(self.targets, abc.Iterable): return self.targets return [self.targets]
[docs] @classmethod def from_assertion_error( cls, step: "ProcessingStep", error: AssertionError, level: Optional[Level] = None, ): """Initialize object from a caught assertion error. Parameters ---------- step The processing step from where the assertion error has been caught. error The assertion error that has been caught. level The level corresponding to the :class:`~dispel.processing.level.LevelProcessingControlResult`. Returns ------- ProcessingControlResult The initialized processing control result object. Raises ------ ValueError If the level is passed. ValueError If the caught exception does not contain a message ot if the message is inconsistent. """ if level is not None: raise ValueError( "No level should be passed for ``ProcessingControlResult`` " "Initialization. If you would like to register the result in a level " "use ``LevelProcessingControlResult`` class." ) # Retrieve error message try: exception_message = next(iter(error.args)) except StopIteration: raise ValueError( f"Assertion {error=} must contain a message." ) from StopIteration # Retrieve error message components if isinstance(exception_message, str): error_handling = ErrorHandling.IGNORE returned_error = InvalidDataError(exception_message, step) elif isinstance(exception_message, abc.Iterable): if len((args := list(exception_message))) != 2: raise ValueError( "If the provided exception assertion message is an iterable. It " "ought to strictly contain two elements: a string error message " f"and an ``ErrorHandling`` action amongst {list(ErrorHandling)}." ) returned_error = InvalidDataError(args[0], step) error_handling = ErrorHandling(args[1]) else: raise ValueError( "The exception message accompanying the Assertion error must either " "be a string message or an iterable containing both a string message " f"and an ``ErrorHandling`` action amongst {list(ErrorHandling)}." ) return cls(step=step, error=returned_error, error_handling=error_handling)
[docs] @classmethod def from_flag( cls, flag: Flag, step: "ProcessingStep", targets: Iterable[EntityType], level: Optional[Level] = None, ): """Initialize processing control result from an flag. Parameters ---------- flag The flag from which the control processing result is to be created. step The associated processing step. targets The flag target entities. level The level corresponding to the :class:`~dispel.processing.level.LevelProcessingControlResult`. Returns ------- ProcessingControlResult The initialized processing control result object. """ assert level is None, "Level must not be provided." return cls( step=step, targets=targets, error=FlagError(flag, step), error_handling=ErrorHandling.from_bool(flag.stop_processing), )
ProcessResultType = Generator[ Union[ProcessingResult, ProcessingControlResult], None, None ]
[docs] class Parameter(Generic[ParameterType]): """A parameter defining aspects of processing. Parameters ---------- id_ The name of the parameter used to identify the configurable entity. default_value A default value to be used if the user does not specify otherwise. validator A callable that accepts values and raises an exception for any unexpected value. description A description of the parameter explaining its usage and influence on the affected processing steps. """ _registry: Dict[str, "Parameter"] = {}
[docs] def __new__(cls, id_: str, *_args, **_kwargs): """Create a new Parameter object.""" obj = super().__new__(cls) # inspect frame to obtain processing step defining parameter if (current_frame := inspect.currentframe()) is None: raise RuntimeError("Failed to inspect current frame") if (parent_frame := current_frame.f_back) is None: raise RuntimeError("Failed to inspect parent frame") location_spec = parent_frame.f_globals["__name__"] # check if we were defined inside a class if "__qualname__" in parent_frame.f_locals: location_spec += "." + parent_frame.f_locals["__qualname__"] full_id = f"{location_spec}.{id_}" if full_id in cls._registry: raise KeyError(f"Parameter already registered: {full_id}") cls._registry[full_id] = obj obj._full_id = full_id return obj
[docs] def __init__( self, id_: str, default_value: Optional[ParameterType] = None, validator: Optional[Callable[[Any], None]] = None, description: Optional[str] = None, ): self._full_id: Optional[str] self._local_id = id_ self._value = default_value self.default_value = default_value self.validator = validator self.description = description
@property def id(self): """Get the ID of the parameter. The id will be set automatically based on the context of the creation of the parameter. Returns ------- str The ID of the parameter. It is comprised by the name of the frame in which it was defined and the specified ``id_``. """ return self._full_id @property def value(self) -> ParameterType: """Get the value set for the parameter.""" if not self._value: raise RuntimeError(f"Parameter was not set: {self.id}") return self._value @value.setter def value(self, value: ParameterType): """Set the value for the parameter.""" if self.validator is not None: self.validator(value) self._value = value
[docs] @classmethod def has_parameter(cls, full_id: str) -> bool: """Check if a parameter was set.""" return full_id in cls._registry
[docs] @classmethod def set_value(cls, full_id: str, value: Any): """Set the value of a parameter.""" if not cls.has_parameter(full_id): raise KeyError(f"Unknown parameter: {full_id}") cls._registry[full_id].value = value
[docs] class ProcessingStep: r"""A processing step in a processing sequence. :class:`ProcessingStep` is the basic entity through which :class:`~dispel.data.core.Reading`\ s are processed. The processing step's :meth:`process_reading` function is called with the reading and additional arguments passed to :func:`process`. Results from the process step are expected to be an instance of :class:`ProcessingResult`. For a comprehensive description see :ref:`measure-extraction`. The method :meth:`flag_reading` can be overwritten to ensure that the reading about to be processed is valid, and return :class:`~dispel.data.flags.Flag`\ s if that is not the case. Examples -------- .. testsetup:: processing-step >>> import pandas as pd >>> import numpy as np >>> from dispel.data.core import Reading >>> from dispel.data.levels import Level >>> from dispel.data.raw import (RawDataSet, RawDataSetDefinition, ... RawDataValueDefinition) >>> reading = Reading( ... evaluation=None, ... levels=[ ... Level(id_='my-level', start=0, end=1, raw_data_sets=[ ... RawDataSet( ... RawDataSetDefinition('my-data-set', None, [ ... RawDataValueDefinition('dummy', 'dummy') ... ]), ... pd.DataFrame({'dummy': list(range(6))}) ... ) ... ]) ... ]) .. doctest:: processing-step >>> from dispel.data.measures import MeasureValue >>> from dispel.data.values import ValueDefinition >>> from dispel.processing import process >>> from dispel.processing.core import ProcessingResult, ProcessingStep >>> class MyStep(ProcessingStep): ... def process_reading(self, reading, **kwargs): ... level = reading.get_level('my-level') ... raw_data_set = level.get_raw_data_set('my-data-set') ... data = raw_data_set.data ... yield ProcessingResult( ... step=self, ... sources=raw_data_set, ... result=MeasureValue( ... ValueDefinition('my-measure-id','max value'), ... data.max().max() ... ) ... ) >>> _ = process(reading, MyStep()) >>> reading.measure_set.get_raw_value('my-measure-id') 5 """
[docs] def __init__(self): self.predecessor = None self.successor = None
[docs] def process(self, reading: Reading, **kwargs) -> ProcessResultType: """Check reading for validity and process it. Parameters ---------- reading The reading to be processed kwargs Additional arguments passed by :func:`process`. Yields ------ ProcessResultType The results from processing readings. """ for flag in self.flag_reading(reading, **kwargs): yield ProcessingControlResult.from_flag( flag=flag, step=self, targets=self.get_reading_flag_targets(reading, **kwargs), ) try: self.assert_valid_reading(reading, **kwargs) except AssertionError as error: yield ProcessingControlResult.from_assertion_error(step=self, error=error) else: yield from self.process_reading(reading, **kwargs)
[docs] def assert_valid_reading(self, reading: Reading, **kwargs): """Assert that reading is valid."""
[docs] def flag_reading(self, reading: Reading, **kwargs) -> Generator[Flag, None, None]: """Flag the provided reading. Parameters ---------- reading The reading to be flagged. kwargs Additional arguments passed by :func:`~dispel.processing.process`. Yields ------ Flag The resulted flags. """ # pylint: disable=unused-argument yield from []
[docs] def get_reading_flag_targets( self, reading: Reading, **kwargs ) -> Iterable[EntityType]: """Get the reading flag targets. Parameters ---------- reading The reading that is concerned with flagging. kwargs Additional keyword arguments eventually used for flag targets extraction. Returns ------- Iterable[EntityType] An iterable of entities that are flagged. """ # pylint: disable=unused-argument return [reading]
[docs] @abstractmethod def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType: """Process the provided reading. Parameters ---------- reading The reading to be processed kwargs Additional arguments passed by :func:`~dispel.processing.process`. Yields ------ ProcessResultType The results from processing readings. """ yield NotImplemented
[docs] def set_previous(self, step: "ProcessingStep"): """Set the previous step in a processing chain of this step.""" if self.predecessor is not None: warnings.warn( "Changing predecessors can lead to side-effects. Previous predecessor " f"was {self.predecessor}", UserWarning, ) self.predecessor = step
[docs] def set_next(self, step: "ProcessingStep"): """Set the next step in a processing chain of this step.""" if self.successor is not None: warnings.warn( "Changing successors can lead to side-effects. Previous successor was " f"{self.successor}", UserWarning, ) self.successor = step
[docs] def chain(self, successor: "ProcessingStep") -> "ProcessingStep": """Chain this step with the successor step.""" assert isinstance(successor, ProcessingStep), "Can only chain processing steps" self.set_next(successor) successor.set_previous(self) return _ChainedProcesses([self, successor])
def __and__(self, other): """See :meth:`ProcessingStep.chain`.""" return self.chain(other)
[docs] def get_parameters(self) -> List[Tuple[str, Parameter]]: """Get all parameters defined by the processing step. Returns ------- List[Tuple[str, Parameter]] A list of tuples of parameter name and :class:`Parameter` objects defined by the processing step. """ return inspect.getmembers(self, lambda x: isinstance(x, Parameter))
[docs] class CoreProcessingStepGroup(ProcessingStep): r"""A group of processing steps. The :class:`CoreProcessingStepGroup` allows to provide additional named arguments to the :meth:`~dispel.processing.process` function of the grouped :class:`~dispel.processing.core.ProcessingStep`\ s. The primary use case for this is to provide additional arguments to :class:`~dispel.processing.extract.ExtractStep` that use :class:`~dispel.data.values.ValueDefinitionPrototype`\ s. Parameters ---------- steps The processing steps of the group kwargs Additional arguments that are passed to the :meth:`~dispel.processing.core.ProcessingStep.process` function of each step. This allows to provide additional values, such as placeholder values in value definitions to the actual processing function. Examples -------- >>> from dispel.data.values import ValueDefinitionPrototype >>> from dispel.processing.core import CoreProcessingStepGroup >>> from dispel.processing.extract import ExtractStep >>> steps = [ ... CoreProcessingStepGroup( ... [ ... ExtractStep( ... 'data-set', ... lambda data_set: 5, ... ValueDefinitionPrototype( ... id_='x', ... measure_name='{placeholder} x' ... ) ... ), ... ], ... placeholder='A name' ... ), ... ... ... ] The above extract step will result in a measure value with the name ``A name x``. For further applications of :class:`CoreProcessingStepGroup` see also :ref:`measure-extraction`. """ steps: List[ProcessingStep] kwargs: Dict[str, Any] = {}
[docs] def __new__(cls, *args, **kwargs): """Instantiate a new ProcessingStepGroup.""" instance = super().__new__(cls) if hasattr(cls, "steps") and cls.steps: instance.steps = copy.deepcopy(cls.steps) return instance
[docs] def __init__(self, steps: Optional[List[ProcessingStep]] = None, **kwargs): self.set_steps(steps or self.get_steps()) self.set_kwargs(**(kwargs or self.get_kwargs())) super().__init__()
[docs] def set_kwargs(self, **kwargs): """Set the keyword arguments to be added to the processing.""" self.kwargs = kwargs
[docs] def get_kwargs(self) -> Dict[str, Any]: """Get keyword arguments to be added to the processing.""" return self.kwargs
[docs] def set_steps(self, steps: List[ProcessingStep]): """Set processing steps part of the group. Parameters ---------- steps The steps contained in the processing group. """ self.steps = steps
[docs] def get_steps(self) -> List[ProcessingStep]: """Get processing steps within this group.""" return self.steps
[docs] def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType: """See :meth:`dispel.processing.core.ProcessingStep.process`.""" (updated_kwargs := kwargs.copy()).update(self.get_kwargs()) for step in self.get_steps(): yield from step.process(reading, **updated_kwargs)
class _ChainedProcesses(CoreProcessingStepGroup): """A technical class to chain process steps inside a group.""" def chain(self, successor: "ProcessingStep") -> "ProcessingStep": """Add a processing step to the steps.""" assert len(steps := self.get_steps()) > 0, "No steps to chain in group" last_step = steps[-1] last_step.set_next(successor) successor.set_previous(last_step) steps.append(successor) self.set_steps(steps) return self
[docs] class FlagReadingStep(FlagStepMixin, ProcessingStep): """A reading flag class. Parameters ---------- task_name An optional abbreviated name value of the task used for the flag. See :class:`~dispel.processing.flags.FlagStepMixin`. flag_name An optional abbreviated name value of the considered flag. See :class:`~dispel.processing.flags.FlagStepMixin`. flag_type An optional flag type. See :class:`~dispel.data.flags.FlagType`. reason An optional string reason of the considered flag. See :class:`~dispel.processing.flags.FlagStepMixin`. stop_processing An optional boolean that specifies whether the flag is stop_processing, i.e., raises an error or not. See :class:`~dispel.processing.flags.FlagStepMixin`. flagging_function An optional flagging function applied to :class:`~dispel.data.core.Reading`. See :class:`~dispel.processing.flags.FlagStepMixin`. Examples -------- Assuming you want to flag reading information such as whether the user has finished the evaluation properly, you can create the following flag step: >>> from dispel.data.values import AbbreviatedValue as AV >>> from dispel.processing.core import FlagReadingStep >>> step = FlagReadingStep( ... task_name = AV('Pinch test', 'pinch'), ... flag_name = AV('unfinished evaluation', 'ua'), ... reason = 'The evaluation has not been finished by the user.', ... stop_processing=False, ... flag_type=FlagType.TECHNICAL, ... flag_severity=FlagSeverity.INVALIDATION, ... flagging_function=lambda reading: reading.evaluation.finished, ... ) The flagging function will be called with the corresponding reading as argument. Another common scenario is to define a class that can be reused. >>> from dispel.data.flags import FlagType >>> from dispel.processing.core import FlagReadingStep >>> class UnfinishedEvaluation(FlagReadingStep): ... task_name = AV('Pinch test', 'pinch') ... flag_name = AV('unfinished evaluation', 'ua') ... flag_type = FlagType.BEHAVIORAL ... flag_severity = FlagSeverity.INVALIDATION ... reason = 'The evaluation has not been finished by the user.' ... stop_processing = False ... flagging_function = lambda reading: reading.evaluation.finished Another convenient way to provide the flagging function is to use the ``@flag`` decorator: >>> from dispel.data.core import Reading >>> from dispel.processing.core import FlagReadingStep >>> from dispel.processing.flags import flag >>> class UnfinishedEvaluation(FlagReadingStep): ... task_name = AV('Pinch test', 'pinch') ... flag_name = AV('unfinished evaluation', 'ua') ... flag_type = 'behavioral' ... flag_severity = FlagSeverity.INVALIDATION ... reason = 'The evaluation has not been finished by the user.' ... stop_processing = False ... ... @flag ... def _unfinished_evaluation(self, reading: Reading) -> bool: ... return reading.evaluation.finished Note that the ``@flag`` decorator can take keyword arguments. These kwargs are merged with any keyword arguments that come from processing step groups in order to format the flag ``reason``. Also, one can use multiple flag decorators in the same flag class. """
[docs] def __init__( self, task_name: Optional[Union[AV, str]] = None, flag_name: Optional[Union[AV, str]] = None, flag_type: Optional[Union[FlagType, str]] = None, flag_severity: Optional[Union[FlagSeverity, str]] = None, reason: Optional[Union[AV, str]] = None, stop_processing: bool = False, flagging_function: Optional[Callable[..., bool]] = None, ): super().__init__( task_name=task_name, flag_name=flag_name, flag_type=flag_type, flag_severity=flag_severity, reason=reason, stop_processing=stop_processing, flagging_function=flagging_function, )
[docs] def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType: """Process the provided reading.""" yield from []
[docs] def get_reading_flag_targets( self, reading: Reading, **kwargs ) -> Iterable[EntityType]: """Get flag targets for reading flag.""" return self.get_flag_targets(reading, **kwargs)
[docs] def get_flag_targets( self, reading: Reading, level: Optional[Level] = None, **kwargs ) -> Iterable[EntityType]: """Get flag targets for reading flag.""" return [reading]
[docs] def flag_reading(self, reading: Reading, **kwargs) -> Generator[Flag, None, None]: """Flag the provided reading.""" for func, func_kwargs in self.get_flagging_functions(): if not func(reading, **kwargs): (merged_kwargs := kwargs.copy()).update(func_kwargs) yield self.get_flag(**merged_kwargs)