"""Level processing functionalities."""
import inspect
import warnings
from abc import ABC, ABCMeta, abstractmethod
from dataclasses import dataclass
from types import MethodType
from typing import Callable, Generator, Iterable, List, Optional, Set, Union, cast
from dispel.data.core import EntityType, Reading
from dispel.data.flags import Flag, FlagSeverity, FlagType
from dispel.data.levels import Level, LevelId
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing.core import (
CoreProcessingStepGroup,
ProcessingControlResult,
ProcessingResult,
ProcessingStep,
ProcessResultType,
)
from dispel.processing.flags import FlagStepMixin
MultipleLevelIdsType = Union[str, LevelId, List[str], List[LevelId]]
[docs]
@dataclass(frozen=True)
class LevelProcessingResultBase:
"""The processing result base of a level processing step."""
#: The level where the processing result is stored.
level: Level
def __post_init__(self):
# Check that level id is not None
if self.level is None:
raise ValueError("level value cannot be null.")
[docs]
@dataclass(frozen=True)
class LevelProcessingResult(ProcessingResult, LevelProcessingResultBase):
"""The processing result of a level processing step."""
[docs]
@dataclass(frozen=True)
class LevelProcessingControlResult(ProcessingControlResult, LevelProcessingResultBase):
"""The processing result of an error in a level."""
[docs]
@classmethod
def from_assertion_error(
cls,
step: "ProcessingStep",
error: AssertionError,
level: Optional[Level] = None,
):
"""Initialize object from a caught assertion error.
Parameters
----------
step
The processing step from where the assertion error has been caught.
error
The assertion error that has been caught.
level
The level corresponding to the :class:`LevelProcessingControlResult`.
Returns
-------
LevelProcessingControlResult
The initialized level processing control result object.
"""
assert level is not None, "Level cannot be null."
res = ProcessingControlResult.from_assertion_error(step, error)
return cls(
step=res.step,
targets=res.targets,
error=res.error,
level=level,
error_handling=res.error_handling,
)
[docs]
@classmethod
def from_flag(
cls,
flag: Flag,
step: "ProcessingStep",
targets: Iterable[EntityType],
level: Optional[Level] = None,
):
"""Initialize processing control result from an flag.
Parameters
----------
flag
The flag from which the control processing result is to be created.
step
The associated processing step.
targets
The flag target entities.
level
The level corresponding to the :class:`LevelProcessingControlResult`.
Returns
-------
LevelProcessingControlResult
The initialized level processing control result object.
"""
res = ProcessingControlResult.from_flag(flag, step, targets)
assert isinstance(level, Level), "Level cannot be null."
return cls(
step=res.step,
targets=res.targets,
error=res.error,
error_handling=res.error_handling,
level=level,
)
def _intersection(a, b):
return a.intersection(b)
def _union(a, b):
return a.union(b)
[docs]
class LevelFilter(ABC):
"""A base class to filter levels during processing.
:class:`LevelFilter` provide a central mechanism to differentiate processing steps
in combination with :class:`LevelProcessingStep` and
:class:`~dispel.processing.transform.ConcatenateLevels`. Each filter implementation
must provide a :meth:`~LevelFilter.filter` function that consumes a container of
levels and returns a set of levels containing those that should be processed.
Furthermore, the method :meth:`~LevelFilter.repr` provides a hook to create a
readable representation of the filter.
Filters can also be combined by using logical operators ``&`` and ``|``.
Examples
--------
Each level filter has to implement the methods ``filter`` and ``repr``. Assuming we
want to create a filter that inspects some variables in the context of each level,
we can do the following:
>>> from typing import Any, Iterable, Set
>>> from dispel.data.levels import Level
>>> from dispel.processing.level import LevelFilter
>>> class MyContextLevelFilter(LevelFilter):
... def __init__(self, variable: str, value: Any):
... self.variable = variable
... self.value = value
... def filter(self, levels: Iterable[Level]) -> Set[Level]:
... return set(filter(
... lambda level: level.context.get_raw_value(
... self.variable) == self.value, levels))
... def repr(self) -> str:
... return f'{self.variable} equals "{self.value}"'
Given this filter one can now process levels with a specific context value by
creating a filter like ``MyContextLevelFilter('usedHand', 'right')``.
Since :class:`LevelFilter` can be used with logical operators one can create more
complex filters by simply combining them as such:
>>> MyContextLevelFilter('foo', 'bar') & MyContextLevelFilter('baz', 'bam')
<LevelFilter: (foo equals "bar" and baz equals "bam")>
This filter will now only consider levels where the context variables ``foo`` equal
``bar`` and ``baz`` equals ``bam``. This also works with or logic (``|``).
One can also use the inverse logic by applying the ``~`` operator to a level filter
in order to obtain its inverse.
"""
def __call__(self, levels: Iterable[Level]) -> Set[Level]:
"""Filter level."""
return self.filter(levels)
def __repr__(self) -> str:
return f"<{self.__class__.__name__}: {self.repr()}>"
[docs]
def repr(self) -> str:
"""Get representation of the filter.
Raises
------
NotImplementedError
This method is not implemented since there is no unambiguous representation
of filters.
"""
raise NotImplementedError
[docs]
def filter(self, levels: Iterable[Level]) -> Set[Level]:
"""Filter level.
Parameters
----------
levels
The levels to be inspected for filtering
Raises
------
NotImplementedError
This method is not implemented since there is no unambiguous definition of
filters.
"""
raise NotImplementedError
def _combined(
self, other: "LevelFilter", func: Callable[[Set, Set], Set]
) -> "LevelFilter":
if not isinstance(other, LevelFilter):
raise ValueError(f"Can only combine LevelFilters. Got: {type(other)}.")
# avoid nesting default filter
if isinstance(other, DefaultLevelFilter):
return self
if isinstance(self, DefaultLevelFilter):
return other
def _match(levels: Iterable[Level]) -> Set[Level]:
return func(self(levels), other(levels))
def _repr() -> str:
op_name = {_intersection: "and", _union: "or"}
return (
f"({self.repr()} " f"{op_name[func]} {other.repr()})"
) # pylint: disable=W0212
instance = LevelFilter()
setattr(instance, "filter", _match)
setattr(instance, "repr", _repr)
return instance
def __and__(self, other: "LevelFilter") -> "LevelFilter":
return self._combined(other, _intersection)
def __or__(self, other: "LevelFilter") -> "LevelFilter":
return self._combined(other, _union)
def __invert__(self) -> "LevelFilter":
def _inverted_filter(levels: Iterable[Level]) -> Set[Level]:
return set(levels) - self.filter(levels)
def _repr() -> str:
return f"Inverse of {self.repr()}" # pylint: disable=W0212
instance = LevelFilter()
setattr(instance, "filter", _inverted_filter)
setattr(instance, "repr", _repr)
return instance
LevelFilterType = Union[MultipleLevelIdsType, LevelFilter]
[docs]
class LevelIdFilter(LevelFilter):
"""A level filter based on level ids.
Parameters
----------
level_ids
The level id(s) to be filtered for processing. The level id can be provided as
:class:`str`, :class:`~dispel.data.core.LevelId` or lists of either. Levels with
the respective provided ids will be considered during processing.
"""
[docs]
def __init__(self, level_ids: MultipleLevelIdsType):
if isinstance(level_ids, str):
level_ids = [LevelId(level_ids)]
if isinstance(level_ids, LevelId):
level_ids = [level_ids]
if isinstance(level_ids, list):
if all(isinstance(level_id, str) for level_id in level_ids):
level_ids = [LevelId(cast(str, level_id)) for level_id in level_ids]
elif any(not isinstance(level_id, LevelId) for level_id in level_ids):
raise ValueError(
"The list of level_ids has to be filled only by {str}s or {"
"LevelId}s, never both."
)
self.level_ids = level_ids
[docs]
def repr(self) -> str:
"""Get representation of the filter."""
return f"level id in {self.level_ids}"
[docs]
def filter(self, levels: Iterable[Level]) -> Set[Level]:
"""Filter levels being part of a predefined level id set."""
return set(filter(lambda x: x.id in self.level_ids, levels))
[docs]
class DefaultLevelFilter(LevelFilter):
"""A default level filter that considers all levels."""
[docs]
def repr(self) -> str:
"""Get representation of the filter."""
return "*"
[docs]
def filter(self, levels: Iterable[Level]) -> Set[Level]:
"""Filter method returns a set of levels."""
return set(levels)
[docs]
class LevelProcessingStepProtocol(metaclass=ABCMeta):
"""Protocol for level processing steps."""
[docs]
@abstractmethod
def assert_valid_level(self, level: Level, reading: Reading, **kwargs):
"""Perform assertions that a given level can be processed.
Parameters
----------
level
The level to be tested for validity
reading
The parent reading of the level
kwargs
Additional keyword arguments passed to the processing function.
"""
[docs]
@abstractmethod
def flag_level(
self, level: Level, reading: Reading, **kwargs
) -> Generator[Flag, None, None]:
"""Flag the provided level.
Parameters
----------
level
The level to be flagged.
reading
The reading associated to the provided level.
kwargs
Additional arguments passed by :func:`~dispel.processing.process`.
Yields
------
Flag
The resulted flags.
"""
yield NotImplemented
[docs]
def get_level_flag_targets(
self, level: Level, reading: Reading, **kwargs
) -> Iterable[EntityType]:
"""Get the level flag targets.
Parameters
----------
level
The level to be flagged.
reading
The reading associated with the level flag.
kwargs
Additional keyword arguments eventually used for flag targets
extraction.
Returns
-------
Iterable[EntityType]
An iterable of entities that are flagged.
"""
# pylint: disable=unused-argument
return [level]
[docs]
@abstractmethod
def process_level(
self, level: Level, reading: Reading, **kwargs
) -> ProcessResultType:
"""Process the provided Level.
Parameters
----------
level
The level to be processed
reading
The reading to be processed
kwargs
Additional arguments passed by :meth:`process_level`.
Yields
------
ProcessResultType
Results from processing the level.
"""
yield NotImplemented
[docs]
class LevelFilterProcessingStepMixin:
"""A mixin class for all processing steps using level filters.
Parameters
----------
level_filter
The filter to be used to select the levels to be processed.
"""
level_filter: LevelFilter = DefaultLevelFilter()
[docs]
def __init__(self, *args, **kwargs):
level_filter = kwargs.pop("level_filter", None)
super().__init__(*args, **kwargs)
if level_filter is not None:
self.set_level_filter(level_filter)
[docs]
def get_level_filter(self) -> LevelFilter:
"""Get the level filter to sub-select levels to be processed."""
return self.level_filter
[docs]
def set_level_filter(self, level_filter: LevelFilterType):
"""Set a level filter to sub-select levels to be processed.
Parameters
----------
level_filter
The level filter to be used.
"""
if isinstance(level_filter, (str, list, LevelId)):
level_filter = LevelIdFilter(level_filter)
self.level_filter = level_filter
[docs]
def inject_level_filter_from_step(self, step: "LevelFilterProcessingStepMixin"):
"""Inject the level filter from a step into the filter of this step.
This function allows to have this processing step depend on the level
filter of another step.
Parameters
----------
step
A level filter processing step of which the level filter is used
in this step too.
"""
_func_cache_attr = "__original_get_level_filter"
_injected_step_attr = "__injected_step"
# only cache the original function once to avoid cascading filters
# from multiple injections
if not hasattr(self, _func_cache_attr):
setattr(self, _func_cache_attr, self.get_level_filter)
else:
if (old_step := getattr(self, _injected_step_attr)) is not step:
warnings.warn(
f"Re-assigning step {self} to a new {step} may lead to unintended "
f"side-effects with {old_step}.",
UserWarning,
)
def _get_level_filter(inner_self) -> LevelFilter:
level_filter = getattr(inner_self, _func_cache_attr)()
return level_filter & step.get_level_filter()
# See https://github.com/python/mypy/issues/2427
setattr(self, _injected_step_attr, step)
setattr(
self,
"get_level_filter",
MethodType(_get_level_filter, self), # type: ignore
)
[docs]
class LevelProcessingStep(
LevelProcessingStepProtocol, LevelFilterProcessingStepMixin, ProcessingStep
):
r"""A level processing step is a processing step specific on levels.
The level processing steps' :meth:`LevelProcessingStepProtocol.process_level` method
is called with the level, reading and additional arguments passed to
:meth:`~dispel.processing.core.ProcessingStep.process`. Results from the process step
are expected to be an instance of :class:`~dispel.processing.core.ProcessingResult`.
The :meth:`process_level` is only called with levels that pass the provided
``level_filter`` (see :class:`LevelFilter`). Each level will be processed if no
level filter is provided. The ``level_filter`` also accepts :class:`str`,
:class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a
:class:`LevelIdFilter` for convenience.
Examples
--------
.. testsetup:: processing-step
>>> import pandas as pd
>>> import numpy as np
>>> from dispel.data.core import Reading
>>> from dispel.data.levels import Level
>>> from dispel.data.raw import (RawDataSet, RawDataSetDefinition,
... RawDataValueDefinition)
>>> reading = Reading(
... evaluation=None,
... levels=[
... Level(id_='my-level', start=0, end=1, raw_data_sets=[
... RawDataSet(
... RawDataSetDefinition('my-data-set', None, [
... RawDataValueDefinition('dummy', 'dummy')
... ]),
... pd.DataFrame({'dummy': list(range(6))})
... )
... ])
... ])
.. doctest:: processing-step
>>> from dispel.data.measures import MeasureValue
>>> from dispel.data.values import ValueDefinition
>>> from dispel.processing import process
>>> from dispel.processing.level import (LevelProcessingStep,
... LevelProcessingResult)
>>> class MyLevelStep(LevelProcessingStep):
... def process_level(self, level, reading, **kwargs):
... raw_data_set = level.get_raw_data_set('my-data-set')
... yield LevelProcessingResult(
... step=self,
... sources=raw_data_set,
... level=level,
... result=MeasureValue(
... ValueDefinition('my-measure-id', 'max value'),
... raw_data_set.data.max().max()
... )
... )
>>> _ = process(reading, MyLevelStep())
>>> reading.get_measure_set('my-level').get_raw_value('my-measure-id')
5
"""
[docs]
def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType:
"""Process all levels in reading.
Parameters
----------
reading
The reading to be processed. Each level of the reading will be passed to the
``level_filter`` and if it returns ``True`` :meth:`process_level` is
called.
kwargs
Additional named arguments. This allows to provide additional values, such
as placeholder values in value definitions to the actual processing
function.
Yields
------
ProcessResultType
Passes through anything that is yielded from :meth:`process_level`.
"""
for level in self.get_level_filter()(reading.levels):
for flag in self.flag_level(level, reading, **kwargs):
yield LevelProcessingControlResult.from_flag(
flag=flag,
step=self,
targets=self.get_level_flag_targets(level, reading, **kwargs),
level=level,
)
try:
self.assert_valid_level(level, reading, **kwargs)
except AssertionError as exception:
yield LevelProcessingControlResult.from_assertion_error(
level=level, step=self, error=exception
)
else:
yield from self.process_level(level, reading, **kwargs)
[docs]
def flag_level(
self, level: Level, reading: Reading, **kwargs
) -> Generator[Flag, None, None]:
"""Flag the provided level."""
yield from []
[docs]
def assert_valid_level(self, level: Level, reading: Reading, **kwargs):
"""Perform assertions that a given level can be processed."""
[docs]
@abstractmethod
def process_level(
self, level: Level, reading: Reading, **kwargs
) -> ProcessResultType:
"""Process the provided Level.
Parameters
----------
level
The level to be processed
reading
The reading to be processed
kwargs
Additional arguments passed by :meth:`process_level`.
Yields
------
ProcessResultType
Results from processing the level.
"""
yield NotImplemented
[docs]
class FlagLevelStep(FlagStepMixin, LevelProcessingStep):
"""A level flag class.
Parameters
----------
level_filter
An optional filter to limit the levels being processed. See
:class:`~dispel.processing.level.LevelProcessingStep`.
task_name
An optional abbreviated name value of the task used for the flag. See
:class:`~dispel.processing.flags.FlagStepMixin`.
flag_name
An optional abbreviated name value of the considered flag.
See :class:`~dispel.processing.flags.FlagStepMixin`.
flag_type
An optional flag type.
See :class:`~dispel.data.flags.FlagType`.
flag_severity
An optional flag severity.
See :class:`~dispel.data.flags.FlagSeverity`.
reason
An optional string reason of the considered flag.
See :class:`~dispel.processing.flags.FlagStepMixin`.
stop_processing
An optional boolean that specifies whether the flag is stop_processing,
i.e., raises an error or not. See
:class:`~dispel.processing.flags.FlagStepMixin`.
flagging_function
An optional flagging function to be applied to :class:`~dispel.data.core.Level`.
See :class:`~dispel.processing.flags.FlagStepMixin`.
Examples
--------
Assuming you want to flag the time frame duration of a given level, you can use
the following flag step:
>>> from dispel.data.values import AbbreviatedValue as AV
>>> from dispel.processing.level import FlagLevelStep
>>> step = FlagLevelStep(
... 'utt',
... AV('U-Turn test', 'utt'),
... AV('u-turn duration', 'utt_dur'),
... 'technical',
... 'The U-Turn test duration exceeds 5 minutes.',
... stop_processing=False,
... flagging_function=lambda level: \
level.effective_time_frame.duration.total_seconds() < 5 * 60,
... )
The flagging function will be called with the level ``'utt'`` as specified in the
``level_filter`` argument. If the function has a named parameter matching
``reading``, the reading will be passed to the flagging function.
Another common scenario is to define a class that can be reused.
>>> from dispel.data.flags import FlagType
>>> from dispel.processing.level import FlagLevelStep
>>> class UTTDuration(FlagLevelStep):
... task_name = AV('U-Turn test', 'utt')
... flag_name = AV('u-turn duration', 'utt_dur')
... flag_type = FlagType.TECHNICAL
... flag_severity = FlagSeverity.DEVIATION
... reason = 'The U-Turn test duration exceeds 5 minutes.'
... stop_processing = False
... flagging_function = lambda level: \
level.effective_time_frame.duration.total_seconds() < 5 * 60
Another convenient way to provide the flagging function is to use the
``@flag`` decorator, one can also use multiple flags for the same class
as follows:
>>> from dispel.data.levels import Level
>>> from dispel.data.core import Reading
>>> from dispel.processing.flags import flag
>>> from dispel.processing.level import FlagLevelStep
>>> class UTTDuration(FlagLevelStep):
... task_name = AV('U-Turn test', 'utt')
... flag_name = AV('u-turn duration', 'utt_dur')
... flag_type = 'technical'
... flag_severity = FlagSeverity.DEVIATION
... reason = 'The U-Turn test duration exceeds {duration} minutes.'
... stop_processing = False
...
... @flag(duration=5)
... def _duration_5(self, level: Level) -> bool:
... duration = level.duration
... return duration.total_seconds() < 5 * 60
...
... @flag(duration=4)
... def _duration_4(self, level: Level, reading: Reading) -> bool:
... evaluation_start = reading.evaluation.start
... level_start = level.start
... assert evaluation_start <= level_start
...
... duration = level.duration
... return duration.total_seconds() < 4 * 60
Note that the ``@flag`` decorator can take keyword arguments. These kwargs are
merged with any keyword arguments that come from processing step groups in order to
format the flag ``reason``.
"""
[docs]
def __init__(
self,
level_filter: Optional[LevelFilterType] = None,
task_name: Optional[Union[AV, str]] = None,
flag_name: Optional[Union[AV, str]] = None,
flag_type: Optional[Union[FlagType, str]] = None,
flag_severity: Optional[Union[FlagSeverity, str]] = None,
reason: Optional[Union[AV, str]] = None,
stop_processing: bool = False,
flagging_function: Optional[Callable[..., bool]] = None,
):
super().__init__(
level_filter=level_filter,
task_name=task_name,
flag_name=flag_name,
flag_type=flag_type,
flag_severity=flag_severity,
reason=reason,
stop_processing=stop_processing,
flagging_function=flagging_function,
)
[docs]
def process_level(
self, level: Level, reading: Reading, **kwargs
) -> ProcessResultType:
"""Process the provided level."""
yield from []
[docs]
def get_level_flag_targets(
self, level: Level, reading: Reading, **kwargs
) -> Iterable[EntityType]:
"""Get flag targets for reading flag."""
return self.get_flag_targets(reading, level, **kwargs)
[docs]
def get_flag_targets(
self, reading: Reading, level: Optional[Level] = None, **kwargs
) -> Iterable[EntityType]:
"""Get flag targets for level flag."""
assert level is not None, "Level cannot be null."
return [level]
[docs]
def flag_level(
self, level: Level, reading: Reading, **kwargs
) -> Generator[Flag, None, None]:
"""Flag the provided level."""
for func, func_kwargs in self.get_flagging_functions():
new_kwargs = kwargs.copy()
if "reading" in inspect.getfullargspec(func).args:
new_kwargs["reading"] = reading
if not func(level, **new_kwargs):
(merged_kwargs := kwargs.copy()).update(func_kwargs)
yield self.get_flag(**merged_kwargs)
[docs]
class ProcessingStepGroup(LevelFilterProcessingStepMixin, CoreProcessingStepGroup):
r"""A group of processing steps with an optional level filter.
For examples see :class:`dispel.processing.core.CoreProcessingStepGroup`. This class
ensures that level filters are injected to the steps of this group.
"""
[docs]
def set_steps(self, steps: List[ProcessingStep]):
"""Set processing steps part of the group.
This method ensures that steps added to the group inherit the level filter of
the group.
Parameters
----------
steps
The steps contained in the processing group.
"""
for step in steps:
if isinstance(step, LevelFilterProcessingStepMixin):
step.inject_level_filter_from_step(self)
super().set_steps(steps)
[docs]
def inject_level_filter_from_step(self, step: LevelFilterProcessingStepMixin):
"""Inject level filter into group and steps in group."""
super().inject_level_filter_from_step(step)
for group_step in self.get_steps():
if isinstance(group_step, LevelFilterProcessingStepMixin):
group_step.inject_level_filter_from_step(self)