Source code for dispel.processing.transform
"""Transformation functionalities for processing module."""
from abc import ABCMeta
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    Iterable,
    List,
    Literal,
    Optional,
    Union,
    cast,
)
import numpy as np
import pandas as pd
from dispel.data.core import Reading
from dispel.data.levels import (
    Context,
    Level,
    LevelId,
    LevelIdType,
    RawDataSetAlreadyExists,
)
from dispel.data.raw import (
    DEFAULT_COLUMNS,
    RawDataSet,
    RawDataSetDefinition,
    RawDataSetSource,
    RawDataValueDefinition,
)
from dispel.data.values import ValueDefinition
from dispel.processing.core import ProcessingResult, ProcessingStep, ProcessResultType
from dispel.processing.data_set import (
    DataSetProcessingStepProtocol,
    MutateDataSetProcessingStepBase,
    RawDataSetProcessingResult,
    StorageError,
    WrapResultGeneratorType,
)
from dispel.processing.level import LevelFilterProcessingStepMixin, LevelFilterType
[docs]
class TransformStepChainMixIn(DataSetProcessingStepProtocol, metaclass=ABCMeta):
    """A mixin class that allows to chain transformation steps.
    The basic idea is to leverage the new data set ids from the previous transform step
    as the required data set ids for the current step. This avoids having to define the
    `data_set_ids` attribute.
    """
[docs]
    def get_data_set_ids(self) -> Iterable[str]:
        """Get the data set ids to be processed.
        This uses the new data set ids from a previous transform step if set. Otherwise,
        falls back to the default behavior of returning the set data set ids from the
        constructor or class variable.
        Returns
        -------
        Iterable[str]
            An iterable of data set ids.
        """
        assert isinstance(
            self, ProcessingStep
        ), "TransformStepChainMixIn must inherit from ProcessingStep"
        # pylint: disable=no-member
        if isinstance(self.predecessor, TransformStep):
            return [self.predecessor.get_new_data_set_id()]
        # pylint: enable=no-member
        return super().get_data_set_ids()  # type: ignore[safe-super]
[docs]
class TransformStep(TransformStepChainMixIn, MutateDataSetProcessingStepBase):
    r"""A raw data set transformation processing step.
    This class provides a convenient way to transform one or more data sets by
    specifying their ids, their level_ids or a level filter, a transformation function
    and specifications of a new data set to be returned as result of the processing
    step.
    Parameters
    ----------
    data_set_ids
        An optional list of data set ids to be used for the transformation. See
        :class:`~dispel.processing.data_set.DataSetProcessingStepMixin`.
    transform_function
        An optional function to be applied to the data sets. See
        :class:`~dispel.processing.data_set.MutateDataSetProcessingStepBase`. The transform
        function is expected to produce one or more columns of a data set according to
        the specification in `definitions`. The function can return NumPy unidimensional
        arrays, Pandas series and data frames.
    new_data_set_id
        An optional id used for the
        :class:`~dispel.data.raw.RawDataSetDefinition`. If no id was provided, the
        :data:`new_data_set_id` class variable will be used. Alternatively, one can
        overwrite :meth:`get_new_data_set_id` to provide the new data set id.
    definitions
        An optional list of :class:`~dispel.data.raw.RawDataValueDefinition` that has to
        match the number of columns returned by the :attr:`transform_function`. If no
        definitions were provided, the :data:`definitions` class variable will be used.
        Alternatively, one can overwrite :meth:`get_definitions` to provide the list of
        definitions.
    level_filter
        An optional filter to limit the levels being processed. See
        :class:`~dispel.processing.level.LevelProcessingStep`.
    storage_error
        This argument is only useful when the given new data id already exists.
        In which case, the following options are available:
            - ``'ignore'``: the computation of the transformation step for the concerned
              level will be ignored.
            - ``'overwrite'``: the existing data set id will be overwritten by the
              result of transform step computation.
            - ``'concatenate'``: the existing data set id will be concatenated with the
              result of transform step computation.
            - ``'raise'``: An error will be raised if we want to overwrite on an
              existing data set id.
    Examples
    --------
    Assuming you want to calculate the euclidean norm of a data set ``'acceleration'``
    for a specific level ``'left-small'`` and then name the new data set
    ``'accelerometer-norm'``, you can create the following step:
    >>> from dispel.data.raw import RawDataValueDefinition
    >>> from dispel.processing.transform import TransformStep
    >>> from dispel.signal.core import euclidean_norm
    >>> step = TransformStep(
    ...     'accelerometer',
    ...     euclidean_norm,
    ...     'accelerometer-norm',
    ...     [RawDataValueDefinition('norm', 'Accelerometer norm', 'm/s^2')]
    ... )
    The transformation function will be called with the specified data sets as
    arguments. If the function has named parameters matching ``level`` or ``reading``,
    the respective level and reading will be passed to the transformation function.
    Another common scenario is to define a class that can be reused.
    >>> from dispel.data.raw import RawDataValueDefinition
    >>> from dispel.processing.transform import TransformStep
    >>> class MyTransformStep(TransformStep):
    ...     data_set_ids = 'accelerometer'
    ...     transform_function = euclidean_norm
    ...     new_data_set_id = 'accelerometer-norm'
    ...     definitions = [
    ...         RawDataValueDefinition('norm', 'Accelerometer norm', 'm/s^2')
    ...     ]
    Another convenient way to provide the transformation function is to use the
    ``@transformation`` decorator:
    >>> import pandas as pd
    >>> import numpy as np
    >>> from dispel.data.raw import RawDataValueDefinition
    >>> from dispel.processing.data_set import transformation
    >>> from dispel.processing.transform import TransformStep
    >>> class MyTransformStep(TransformStep):
    ...     data_set_ids = 'accelerometer'
    ...     new_data_set_id = 'accelerometer-norm'
    ...     definitions = [
    ...         RawDataValueDefinition('norm', 'Accelerometer norm', 'm/s^2')
    ...     ]
    ...
    ...     @transformation
    ...     def _euclidean_norm(self, data: pd.DataFrame) -> pd.Series:
    ...         return data.pow(2).sum(axis=1).apply(np.sqrt)
    Note that the decorated functions can also use ``level`` and ``reading`` as
    parameters to gain access to the respective level and reading being processed.
    """
    new_data_set_id: str
    definitions: List[RawDataValueDefinition]
    storage_error: StorageError = StorageError.RAISE
[docs]
    def __init__(
        self,
        data_set_ids: Optional[Union[str, Iterable[str]]] = None,
        transform_function: Optional[Callable[..., Any]] = None,
        new_data_set_id: Optional[str] = None,
        definitions: Optional[List[RawDataValueDefinition]] = None,
        level_filter: Optional[LevelFilterType] = None,
        storage_error: Optional[
            Union[StorageError, Literal["raise", "ignore", "overwrite", "concatenate"]]
        ] = None,
    ):
        super().__init__(
            data_set_ids=data_set_ids,
            transform_function=transform_function,
            level_filter=level_filter,
        )
        if new_data_set_id:
            self.new_data_set_id = new_data_set_id
        if definitions:
            self.definitions = definitions
        if storage_error:
            self.storage_error = StorageError(storage_error)
[docs]
    def get_new_data_set_id(self) -> str:
        """Get the id of the new data set to be created."""
        return self.new_data_set_id
[docs]
    def get_definitions(self) -> List[RawDataValueDefinition]:
        """Get the definitions of the raw data set values."""
        return self.definitions
[docs]
    def get_raw_data_set_definition(self):
        """Get the raw data set definition."""
        return RawDataSetDefinition(
            id=self.get_new_data_set_id(),
            source=RawDataSetSource(self.__class__.__name__),
            value_definitions_list=self.get_definitions(),
            is_computed=True,
        )
[docs]
    def wrap_result(
        self, res: Any, level: Level, reading: Reading, **kwargs: Any
    ) -> WrapResultGeneratorType:
        """Wrap the result from the processing function into a class."""
        # handle series should they be provided
        if isinstance(res, (pd.Series, np.ndarray)):
            # Wrap into series if it is numpy array
            if isinstance(res, np.ndarray):
                assert res.ndim == 1, "Cannot handle multidimensional arrays"
                res = pd.Series(res)
            def_ids = [
                d.id for d in filter(lambda d: ~d.is_index, self.get_definitions())
            ]
            if len(def_ids) != 1:
                raise ValueError(
                    "Processing returned a series but did not get single "
                    "RawDataValueDefinition"
                )
            res = res.to_frame(def_ids[0])
        raw_data_set = RawDataSet(self.get_raw_data_set_definition(), res)
        yield RawDataSetProcessingResult(
            step=self,
            sources=self.get_raw_data_sets(level),
            result=raw_data_set,
            level=level,
            concatenate=self.storage_error.concatenate,
            overwrite=self.storage_error.overwrite,
        )
[docs]
    def process_level(
        self, level: Level, reading: Reading, **kwargs
    ) -> ProcessResultType:
        """Process the provided Level."""
        raw_data_set_exists = level.has_raw_data_set(self.get_new_data_set_id())
        if raw_data_set_exists and self.storage_error == StorageError.RAISE:
            raise RawDataSetAlreadyExists(
                self.get_new_data_set_id(),
                level.id,
                'Please select for `storage_error` either "ignore" to ignore the '
                'transformation if the data set already exists, "overwrite" to '
                "overwrite the existing data set with the newly computed one, "
                '"concatenate" to try and concatenate the two raw data sets or simply '
                "change the name of the new data set to a valid one.",
            )
        if raw_data_set_exists and self.storage_error == StorageError.IGNORE:
            pass
        else:
            yield from super().process_level(level, reading, **kwargs)
[docs]
class SuffixBasedNewDataSetIdMixin(DataSetProcessingStepProtocol, metaclass=ABCMeta):
    """A transformation step that can be chained to a previous step.
    In some scenarios it is desirable to simply name the new data set based on the input
    of the transformation step with a suffix. This can be achieved by adding the mixin
    :class:`SuffixBasedNewDataSetIdMixin` and using the ``&`` operator between the steps
    to be chained.
    Parameters
    ----------
    suffix
        The suffix to be added to the previous data set ids separated with an
        underscore. Alternatively, one can overwrite :meth:`get_suffix` to provide a
        dynamic suffix.
    Examples
    --------
    Assuming you two transform steps and an extract step
    .. code-block:: python
        steps = [
            InitialTransformStep(new_data_set_id='a'),
            SecondTransformStep(data_set_ids='a', new_data_set_id='a_b'),
            ExtractStep(data_set_ids='a_b')
        ]
    With the transform steps leverage the :class:`SuffixBasedNewDataSetIdMixin`, the
    same can be achieved by chaining the steps in the following way:
    .. code-block:: python
        steps = [
            InitialTransformStep(new_data_set_id='a') &
            SecondTransformStep(suffix='b') &
            ExtractStep()
        ]
    """
    suffix: str
[docs]
    def __init__(self, *args, **kwargs):
        if suffix := kwargs.pop("suffix", None):
            self.suffix = suffix
        super().__init__(*args, **kwargs)
[docs]
    def get_suffix(self):
        """Get the suffix to be added to the previous data set id."""
        return self.suffix
[docs]
    def get_new_data_set_id(self) -> str:
        """Get the new data set id based on the chained step's ids and suffix.
        Returns
        -------
        str
            The data set ids of the previous step are concatenated with underscores
            (``_``) and combined with another underscore and the specified suffix
            obtained from :meth:`get_suffix`.
        """
        prefix = "_".join(self.get_data_set_ids())
        return f"{prefix}_{self.get_suffix()}"
[docs]
class ConcatenateLevels(LevelFilterProcessingStepMixin, ProcessingStep):
    r"""A processing step that create a meta level.
    The meta level is created concatenating the data and merging the context. The
    contexts are merged by concatenating them with an extra ``_{k}`` in the name, ``k``
    incrementing from 0. The effective time frame is created taking the start of the
    first level and the end of the last one.
    Parameters
    ----------
    new_level_id
        The new level id that will be set inside the reading.
    data_set_id
        The ids of the data sets that will be concatenated.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels
        to be concatenated. If no filter is provided, all levels will be concatenated.
        The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a
        :class:`~dispel.processing.level.LevelIdFilter` for convenience.
    """
[docs]
    def __init__(
        self,
        new_level_id: LevelIdType,
        data_set_id: Union[str, List[str]],
        level_filter: Optional[LevelFilterType] = None,
    ):
        if isinstance(new_level_id, str):
            new_level_id = cast(LevelId, LevelId.from_str(new_level_id))
        if isinstance(data_set_id, str):
            data_set_id = [data_set_id]
        self.new_level_id = new_level_id
        self.data_set_id = data_set_id
        super().__init__(level_filter=level_filter)
    @staticmethod
    def _get_raw_data_sets(
        levels: Iterable[Level], data_set_id: str
    ) -> Generator[RawDataSet, None, None]:
        """Get the raw data sets corresponding to the given id."""
        def _filter_level(level_):
            return level_.has_raw_data_set(data_set_id)
        for level in filter(_filter_level, levels):
            yield level.get_raw_data_set(data_set_id)
[docs]
    def get_levels(self, reading: Reading) -> Iterable[Level]:
        """Retrieve the levels used for level concatenation.
        Parameters
        ----------
        reading
            The reading used for processing.
        Returns
        -------
        Iterable[Level]
            The levels used for concatenation after filtering.
        """
        return sorted(
            self.get_level_filter()(reading.levels), key=lambda level: level.start
        )
[docs]
    def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType:
        """Create the meta level from reading."""
        # collect all matching level ids sorted by their start
        levels = self.get_levels(reading)
        # Check that the levels are not empty
        if len(list(levels)) == 0:
            return
        # collect raw data sets from all levels
        merged_raw_data_sets = []
        level: Optional[Level] = None
        raw_data_sets = []
        for data_set in self.data_set_id:
            data_set_definition = None
            raw_data_frames = []
            for raw_data_set in self._get_raw_data_sets(levels, data_set):
                raw_data_sets.append(raw_data_set)
                # assign first if not present
                if data_set_definition is None:
                    data_set_definition = raw_data_set.definition
                raw_data_frames.append(raw_data_set.data)
            if not data_set_definition:
                raise ValueError(f"No dataset definition for {data_set}.")
            merged_raw_data_sets.append(
                RawDataSet(data_set_definition, pd.concat(raw_data_frames))
            )
        # collect level information
        merged_context = Context()
        start = None
        for index, level in enumerate(levels):
            if start is None:
                start = level.start
            # combine context variables
            for key in level.context:
                definition = level.context[key].definition
                new_definition = ValueDefinition(
                    id_=f"{definition.id}_{index}",
                    name=definition.name,
                    unit=definition.unit,
                    description=definition.description,
                    data_type=definition.data_type,
                    validator=definition.validator,
                )
                merged_context.set(level.context[key].value, new_definition)
            merged_context.set(
                level, ValueDefinition(id_=f"level_{index}", name=f"Level {index}")
            )
        if level is None:
            raise ValueError("At least one level needs to be processed.")
        end = level.end
        new_level = Level(
            id_=self.new_level_id,
            start=start,
            end=end,
            context=merged_context,
            raw_data_sets=merged_raw_data_sets,
        )
        # TODO: Implement support for measure set concatenation
        yield ProcessingResult(step=self, sources=raw_data_sets, result=new_level)
[docs]
class Apply(TransformStep):
    r"""Apply a method onto columns of a raw data set.
    Parameters
    ----------
    data_set_id
        The data set id of the data set on which the method is to be applied
    method
        The method in question. This can be any method that accepts a pandas series and
        returns an array of same length. See also :meth:`pandas.DataFrame.apply`.
    method_kwargs
        Optional arguments required for the methods.
    columns
        The columns to be considered during the method application.
    drop_nan
        ```True`` if NaN values are to be droped after transformation.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels
        to be transformed. If no filter is provided, all levels will be transformed. The
        ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s
        and lists of either and passes them to a
        :class:`~dispel.processing.level.LevelIdFilter` for convenience.
    new_data_set_id
        The ``id`` used for the :class:`~dispel.data.raw.RawDataSetDefinition`.
    Examples
    --------
    Assuming you want to low-pass filter your gyroscope data of a ``reading`` you can
    create the following step to do so (note that the filtering expects a
    time-index-based and constant frequency-based data frame, so you might have to
    leverage :class:`~dispel.providers.generic.sensor.SetTimestampIndex` and
    :class:`~dispel.providers.generic.sensor.Resample` first):
    >>> from dispel.processing.transform import Apply
    >>> from dispel.signal.filter import butterworth_low_pass_filter
    >>> step = Apply(
    ...     'gyroscope_ts_resampled',
    ...     butterworth_low_pass_filter,
    ...     dict(cutoff=1.5, order=2),
    ...     list('xyz'),
    ... )
    This step will apply a 2. order butterworth low pass filter to the columns ``x``,
    ``y``, and ``z`` with a cut-off frequency of 1.5Hz.
    """
[docs]
    def __init__(
        self,
        data_set_id: str,
        method: Callable[..., Any],
        method_kwargs: Optional[Dict[str, Any]] = None,
        columns: Optional[List[str]] = None,
        new_data_set_id: Optional[str] = None,
        drop_nan: Optional[bool] = False,
        level_filter: Optional[LevelFilterType] = None,
    ):
        method_kwargs = method_kwargs or {}
        columns = columns or DEFAULT_COLUMNS
        def _transform_function(data: pd.DataFrame) -> pd.DataFrame:
            res = data[columns].apply(method, **method_kwargs)
            if drop_nan:
                return res.dropna()
            return res
        def _definition_factory(column: str) -> RawDataValueDefinition:
            return RawDataValueDefinition(
                column, f"{method.__name__} applied on {column}"
            )
        super().__init__(
            data_set_id,
            _transform_function,
            new_data_set_id or f"{data_set_id}_{method.__name__}",
            [_definition_factory(column) for column in columns],
            level_filter=level_filter,
        )
[docs]
class Add(TransformStep):
    r"""Add the results of a method onto the columns of a raw data set data.
    Parameters
    ----------
    data_set_id
        The id of the data set to which the norm is added.
    method
        The method in question. It should output a pandas series with same length as the
        pandas data frame that it is fed.
    method_kwargs
        Optional arguments required for the methods.
    columns
        The columns on which the method is to be applied.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels
        to be transformed. If no filter is provided, all levels will be transformed.
        The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a
        :class:`~dispel.processing.level.LevelIdFilter` for convenience.
    new_column
        The name of the new column.
    Examples
    --------
    Assuming you want to apply a euclidean norm onto accelerometer you can achieve this
    by chaining the following steps:
    .. doctest:: processing
        >>> from dispel.processing import process
        >>> from dispel.processing.transform import Add
        >>> from dispel.signal.core import euclidean_norm
        >>> step = Add(
        ...     'accelerometer',
        ...     euclidean_norm,
        ...     columns=list('xyz')
        ... )
    This step will apply a 2. order euclidean norm to the columns ``x``, ``y``, and
    ``z`` and add a column ``xyz`` to the transformed data set.
    """
[docs]
    def __init__(
        self,
        data_set_id: str,
        method: Callable[..., Any],
        method_kwargs: Optional[Dict[str, Any]] = None,
        columns: Optional[List[str]] = None,
        level_filter: Optional[LevelFilterType] = None,
        new_column: Optional[str] = None,
    ):
        kwargs: Dict[str, Any] = method_kwargs or {}
        old_columns: List[str] = columns or list("xyz")
        new_column = new_column or "".join(old_columns)
        def _transform_function(data: pd.DataFrame) -> pd.DataFrame:
            data_copy = data.copy()
            data_copy[new_column] = method(data_copy[columns], **kwargs)
            return data_copy
        def _definition_factory(column: str) -> RawDataValueDefinition:
            if column in old_columns:
                return RawDataValueDefinition(column, column)
            return RawDataValueDefinition(
                column, f"{method.__name__} applied on {column}"
            )
        super().__init__(
            data_set_id,
            _transform_function,
            f"{data_set_id}_{method.__name__}",
            [_definition_factory(column) for column in [*old_columns, new_column]],
            level_filter=level_filter,
        )