Source code for dispel.processing.transform
"""Transformation functionalities for processing module."""
from abc import ABCMeta
from typing import (
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Union,
cast,
)
import numpy as np
import pandas as pd
from dispel.data.core import Reading
from dispel.data.levels import (
Context,
Level,
LevelId,
LevelIdType,
RawDataSetAlreadyExists,
)
from dispel.data.raw import (
DEFAULT_COLUMNS,
RawDataSet,
RawDataSetDefinition,
RawDataSetSource,
RawDataValueDefinition,
)
from dispel.data.values import ValueDefinition
from dispel.processing.core import ProcessingResult, ProcessingStep, ProcessResultType
from dispel.processing.data_set import (
DataSetProcessingStepProtocol,
MutateDataSetProcessingStepBase,
RawDataSetProcessingResult,
StorageError,
WrapResultGeneratorType,
)
from dispel.processing.level import LevelFilterProcessingStepMixin, LevelFilterType
[docs]
class TransformStepChainMixIn(DataSetProcessingStepProtocol, metaclass=ABCMeta):
"""A mixin class that allows to chain transformation steps.
The basic idea is to leverage the new data set ids from the previous transform step
as the required data set ids for the current step. This avoids having to define the
`data_set_ids` attribute.
"""
[docs]
def get_data_set_ids(self) -> Iterable[str]:
"""Get the data set ids to be processed.
This uses the new data set ids from a previous transform step if set. Otherwise,
falls back to the default behavior of returning the set data set ids from the
constructor or class variable.
Returns
-------
Iterable[str]
An iterable of data set ids.
"""
assert isinstance(
self, ProcessingStep
), "TransformStepChainMixIn must inherit from ProcessingStep"
# pylint: disable=no-member
if isinstance(self.predecessor, TransformStep):
return [self.predecessor.get_new_data_set_id()]
# pylint: enable=no-member
return super().get_data_set_ids() # type: ignore[safe-super]
[docs]
class TransformStep(TransformStepChainMixIn, MutateDataSetProcessingStepBase):
r"""A raw data set transformation processing step.
This class provides a convenient way to transform one or more data sets by
specifying their ids, their level_ids or a level filter, a transformation function
and specifications of a new data set to be returned as result of the processing
step.
Parameters
----------
data_set_ids
An optional list of data set ids to be used for the transformation. See
:class:`~dispel.processing.data_set.DataSetProcessingStepMixin`.
transform_function
An optional function to be applied to the data sets. See
:class:`~dispel.processing.data_set.MutateDataSetProcessingStepBase`. The transform
function is expected to produce one or more columns of a data set according to
the specification in `definitions`. The function can return NumPy unidimensional
arrays, Pandas series and data frames.
new_data_set_id
An optional id used for the
:class:`~dispel.data.raw.RawDataSetDefinition`. If no id was provided, the
:data:`new_data_set_id` class variable will be used. Alternatively, one can
overwrite :meth:`get_new_data_set_id` to provide the new data set id.
definitions
An optional list of :class:`~dispel.data.raw.RawDataValueDefinition` that has to
match the number of columns returned by the :attr:`transform_function`. If no
definitions were provided, the :data:`definitions` class variable will be used.
Alternatively, one can overwrite :meth:`get_definitions` to provide the list of
definitions.
level_filter
An optional filter to limit the levels being processed. See
:class:`~dispel.processing.level.LevelProcessingStep`.
storage_error
This argument is only useful when the given new data id already exists.
In which case, the following options are available:
- ``'ignore'``: the computation of the transformation step for the concerned
level will be ignored.
- ``'overwrite'``: the existing data set id will be overwritten by the
result of transform step computation.
- ``'concatenate'``: the existing data set id will be concatenated with the
result of transform step computation.
- ``'raise'``: An error will be raised if we want to overwrite on an
existing data set id.
Examples
--------
Assuming you want to calculate the euclidean norm of a data set ``'acceleration'``
for a specific level ``'left-small'`` and then name the new data set
``'accelerometer-norm'``, you can create the following step:
>>> from dispel.data.raw import RawDataValueDefinition
>>> from dispel.processing.transform import TransformStep
>>> from dispel.signal.core import euclidean_norm
>>> step = TransformStep(
... 'accelerometer',
... euclidean_norm,
... 'accelerometer-norm',
... [RawDataValueDefinition('norm', 'Accelerometer norm', 'm/s^2')]
... )
The transformation function will be called with the specified data sets as
arguments. If the function has named parameters matching ``level`` or ``reading``,
the respective level and reading will be passed to the transformation function.
Another common scenario is to define a class that can be reused.
>>> from dispel.data.raw import RawDataValueDefinition
>>> from dispel.processing.transform import TransformStep
>>> class MyTransformStep(TransformStep):
... data_set_ids = 'accelerometer'
... transform_function = euclidean_norm
... new_data_set_id = 'accelerometer-norm'
... definitions = [
... RawDataValueDefinition('norm', 'Accelerometer norm', 'm/s^2')
... ]
Another convenient way to provide the transformation function is to use the
``@transformation`` decorator:
>>> import pandas as pd
>>> import numpy as np
>>> from dispel.data.raw import RawDataValueDefinition
>>> from dispel.processing.data_set import transformation
>>> from dispel.processing.transform import TransformStep
>>> class MyTransformStep(TransformStep):
... data_set_ids = 'accelerometer'
... new_data_set_id = 'accelerometer-norm'
... definitions = [
... RawDataValueDefinition('norm', 'Accelerometer norm', 'm/s^2')
... ]
...
... @transformation
... def _euclidean_norm(self, data: pd.DataFrame) -> pd.Series:
... return data.pow(2).sum(axis=1).apply(np.sqrt)
Note that the decorated functions can also use ``level`` and ``reading`` as
parameters to gain access to the respective level and reading being processed.
"""
new_data_set_id: str
definitions: List[RawDataValueDefinition]
storage_error: StorageError = StorageError.RAISE
[docs]
def __init__(
self,
data_set_ids: Optional[Union[str, Iterable[str]]] = None,
transform_function: Optional[Callable[..., Any]] = None,
new_data_set_id: Optional[str] = None,
definitions: Optional[List[RawDataValueDefinition]] = None,
level_filter: Optional[LevelFilterType] = None,
storage_error: Optional[
Union[StorageError, Literal["raise", "ignore", "overwrite", "concatenate"]]
] = None,
):
super().__init__(
data_set_ids=data_set_ids,
transform_function=transform_function,
level_filter=level_filter,
)
if new_data_set_id:
self.new_data_set_id = new_data_set_id
if definitions:
self.definitions = definitions
if storage_error:
self.storage_error = StorageError(storage_error)
[docs]
def get_new_data_set_id(self) -> str:
"""Get the id of the new data set to be created."""
return self.new_data_set_id
[docs]
def get_definitions(self) -> List[RawDataValueDefinition]:
"""Get the definitions of the raw data set values."""
return self.definitions
[docs]
def get_raw_data_set_definition(self):
"""Get the raw data set definition."""
return RawDataSetDefinition(
id=self.get_new_data_set_id(),
source=RawDataSetSource(self.__class__.__name__),
value_definitions_list=self.get_definitions(),
is_computed=True,
)
[docs]
def wrap_result(
self, res: Any, level: Level, reading: Reading, **kwargs: Any
) -> WrapResultGeneratorType:
"""Wrap the result from the processing function into a class."""
# handle series should they be provided
if isinstance(res, (pd.Series, np.ndarray)):
# Wrap into series if it is numpy array
if isinstance(res, np.ndarray):
assert res.ndim == 1, "Cannot handle multidimensional arrays"
res = pd.Series(res)
def_ids = [
d.id for d in filter(lambda d: ~d.is_index, self.get_definitions())
]
if len(def_ids) != 1:
raise ValueError(
"Processing returned a series but did not get single "
"RawDataValueDefinition"
)
res = res.to_frame(def_ids[0])
raw_data_set = RawDataSet(self.get_raw_data_set_definition(), res)
yield RawDataSetProcessingResult(
step=self,
sources=self.get_raw_data_sets(level),
result=raw_data_set,
level=level,
concatenate=self.storage_error.concatenate,
overwrite=self.storage_error.overwrite,
)
[docs]
def process_level(
self, level: Level, reading: Reading, **kwargs
) -> ProcessResultType:
"""Process the provided Level."""
raw_data_set_exists = level.has_raw_data_set(self.get_new_data_set_id())
if raw_data_set_exists and self.storage_error == StorageError.RAISE:
raise RawDataSetAlreadyExists(
self.get_new_data_set_id(),
level.id,
'Please select for `storage_error` either "ignore" to ignore the '
'transformation if the data set already exists, "overwrite" to '
"overwrite the existing data set with the newly computed one, "
'"concatenate" to try and concatenate the two raw data sets or simply '
"change the name of the new data set to a valid one.",
)
if raw_data_set_exists and self.storage_error == StorageError.IGNORE:
pass
else:
yield from super().process_level(level, reading, **kwargs)
[docs]
class SuffixBasedNewDataSetIdMixin(DataSetProcessingStepProtocol, metaclass=ABCMeta):
"""A transformation step that can be chained to a previous step.
In some scenarios it is desirable to simply name the new data set based on the input
of the transformation step with a suffix. This can be achieved by adding the mixin
:class:`SuffixBasedNewDataSetIdMixin` and using the ``&`` operator between the steps
to be chained.
Parameters
----------
suffix
The suffix to be added to the previous data set ids separated with an
underscore. Alternatively, one can overwrite :meth:`get_suffix` to provide a
dynamic suffix.
Examples
--------
Assuming you two transform steps and an extract step
.. code-block:: python
steps = [
InitialTransformStep(new_data_set_id='a'),
SecondTransformStep(data_set_ids='a', new_data_set_id='a_b'),
ExtractStep(data_set_ids='a_b')
]
With the transform steps leverage the :class:`SuffixBasedNewDataSetIdMixin`, the
same can be achieved by chaining the steps in the following way:
.. code-block:: python
steps = [
InitialTransformStep(new_data_set_id='a') &
SecondTransformStep(suffix='b') &
ExtractStep()
]
"""
suffix: str
[docs]
def __init__(self, *args, **kwargs):
if suffix := kwargs.pop("suffix", None):
self.suffix = suffix
super().__init__(*args, **kwargs)
[docs]
def get_suffix(self):
"""Get the suffix to be added to the previous data set id."""
return self.suffix
[docs]
def get_new_data_set_id(self) -> str:
"""Get the new data set id based on the chained step's ids and suffix.
Returns
-------
str
The data set ids of the previous step are concatenated with underscores
(``_``) and combined with another underscore and the specified suffix
obtained from :meth:`get_suffix`.
"""
prefix = "_".join(self.get_data_set_ids())
return f"{prefix}_{self.get_suffix()}"
[docs]
class ConcatenateLevels(LevelFilterProcessingStepMixin, ProcessingStep):
r"""A processing step that create a meta level.
The meta level is created concatenating the data and merging the context. The
contexts are merged by concatenating them with an extra ``_{k}`` in the name, ``k``
incrementing from 0. The effective time frame is created taking the start of the
first level and the end of the last one.
Parameters
----------
new_level_id
The new level id that will be set inside the reading.
data_set_id
The ids of the data sets that will be concatenated.
level_filter
An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels
to be concatenated. If no filter is provided, all levels will be concatenated.
The ``level_filter`` also accepts :class:`str`,
:class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a
:class:`~dispel.processing.level.LevelIdFilter` for convenience.
"""
[docs]
def __init__(
self,
new_level_id: LevelIdType,
data_set_id: Union[str, List[str]],
level_filter: Optional[LevelFilterType] = None,
):
if isinstance(new_level_id, str):
new_level_id = cast(LevelId, LevelId.from_str(new_level_id))
if isinstance(data_set_id, str):
data_set_id = [data_set_id]
self.new_level_id = new_level_id
self.data_set_id = data_set_id
super().__init__(level_filter=level_filter)
@staticmethod
def _get_raw_data_sets(
levels: Iterable[Level], data_set_id: str
) -> Generator[RawDataSet, None, None]:
"""Get the raw data sets corresponding to the given id."""
def _filter_level(level_):
return level_.has_raw_data_set(data_set_id)
for level in filter(_filter_level, levels):
yield level.get_raw_data_set(data_set_id)
[docs]
def get_levels(self, reading: Reading) -> Iterable[Level]:
"""Retrieve the levels used for level concatenation.
Parameters
----------
reading
The reading used for processing.
Returns
-------
Iterable[Level]
The levels used for concatenation after filtering.
"""
return sorted(
self.get_level_filter()(reading.levels), key=lambda level: level.start
)
[docs]
def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType:
"""Create the meta level from reading."""
# collect all matching level ids sorted by their start
levels = self.get_levels(reading)
# Check that the levels are not empty
if len(list(levels)) == 0:
return
# collect raw data sets from all levels
merged_raw_data_sets = []
level: Optional[Level] = None
raw_data_sets = []
for data_set in self.data_set_id:
data_set_definition = None
raw_data_frames = []
for raw_data_set in self._get_raw_data_sets(levels, data_set):
raw_data_sets.append(raw_data_set)
# assign first if not present
if data_set_definition is None:
data_set_definition = raw_data_set.definition
raw_data_frames.append(raw_data_set.data)
if not data_set_definition:
raise ValueError(f"No dataset definition for {data_set}.")
merged_raw_data_sets.append(
RawDataSet(data_set_definition, pd.concat(raw_data_frames))
)
# collect level information
merged_context = Context()
start = None
for index, level in enumerate(levels):
if start is None:
start = level.start
# combine context variables
for key in level.context:
definition = level.context[key].definition
new_definition = ValueDefinition(
id_=f"{definition.id}_{index}",
name=definition.name,
unit=definition.unit,
description=definition.description,
data_type=definition.data_type,
validator=definition.validator,
)
merged_context.set(level.context[key].value, new_definition)
merged_context.set(
level, ValueDefinition(id_=f"level_{index}", name=f"Level {index}")
)
if level is None:
raise ValueError("At least one level needs to be processed.")
end = level.end
new_level = Level(
id_=self.new_level_id,
start=start,
end=end,
context=merged_context,
raw_data_sets=merged_raw_data_sets,
)
# TODO: Implement support for measure set concatenation
yield ProcessingResult(step=self, sources=raw_data_sets, result=new_level)
[docs]
class Apply(TransformStep):
r"""Apply a method onto columns of a raw data set.
Parameters
----------
data_set_id
The data set id of the data set on which the method is to be applied
method
The method in question. This can be any method that accepts a pandas series and
returns an array of same length. See also :meth:`pandas.DataFrame.apply`.
method_kwargs
Optional arguments required for the methods.
columns
The columns to be considered during the method application.
drop_nan
```True`` if NaN values are to be droped after transformation.
level_filter
An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels
to be transformed. If no filter is provided, all levels will be transformed. The
``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s
and lists of either and passes them to a
:class:`~dispel.processing.level.LevelIdFilter` for convenience.
new_data_set_id
The ``id`` used for the :class:`~dispel.data.raw.RawDataSetDefinition`.
Examples
--------
Assuming you want to low-pass filter your gyroscope data of a ``reading`` you can
create the following step to do so (note that the filtering expects a
time-index-based and constant frequency-based data frame, so you might have to
leverage :class:`~dispel.providers.generic.sensor.SetTimestampIndex` and
:class:`~dispel.providers.generic.sensor.Resample` first):
>>> from dispel.processing.transform import Apply
>>> from dispel.signal.filter import butterworth_low_pass_filter
>>> step = Apply(
... 'gyroscope_ts_resampled',
... butterworth_low_pass_filter,
... dict(cutoff=1.5, order=2),
... list('xyz'),
... )
This step will apply a 2. order butterworth low pass filter to the columns ``x``,
``y``, and ``z`` with a cut-off frequency of 1.5Hz.
"""
[docs]
def __init__(
self,
data_set_id: str,
method: Callable[..., Any],
method_kwargs: Optional[Dict[str, Any]] = None,
columns: Optional[List[str]] = None,
new_data_set_id: Optional[str] = None,
drop_nan: Optional[bool] = False,
level_filter: Optional[LevelFilterType] = None,
):
method_kwargs = method_kwargs or {}
columns = columns or DEFAULT_COLUMNS
def _transform_function(data: pd.DataFrame) -> pd.DataFrame:
res = data[columns].apply(method, **method_kwargs)
if drop_nan:
return res.dropna()
return res
def _definition_factory(column: str) -> RawDataValueDefinition:
return RawDataValueDefinition(
column, f"{method.__name__} applied on {column}"
)
super().__init__(
data_set_id,
_transform_function,
new_data_set_id or f"{data_set_id}_{method.__name__}",
[_definition_factory(column) for column in columns],
level_filter=level_filter,
)
[docs]
class Add(TransformStep):
r"""Add the results of a method onto the columns of a raw data set data.
Parameters
----------
data_set_id
The id of the data set to which the norm is added.
method
The method in question. It should output a pandas series with same length as the
pandas data frame that it is fed.
method_kwargs
Optional arguments required for the methods.
columns
The columns on which the method is to be applied.
level_filter
An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels
to be transformed. If no filter is provided, all levels will be transformed.
The ``level_filter`` also accepts :class:`str`,
:class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a
:class:`~dispel.processing.level.LevelIdFilter` for convenience.
new_column
The name of the new column.
Examples
--------
Assuming you want to apply a euclidean norm onto accelerometer you can achieve this
by chaining the following steps:
.. doctest:: processing
>>> from dispel.processing import process
>>> from dispel.processing.transform import Add
>>> from dispel.signal.core import euclidean_norm
>>> step = Add(
... 'accelerometer',
... euclidean_norm,
... columns=list('xyz')
... )
This step will apply a 2. order euclidean norm to the columns ``x``, ``y``, and
``z`` and add a column ``xyz`` to the transformed data set.
"""
[docs]
def __init__(
self,
data_set_id: str,
method: Callable[..., Any],
method_kwargs: Optional[Dict[str, Any]] = None,
columns: Optional[List[str]] = None,
level_filter: Optional[LevelFilterType] = None,
new_column: Optional[str] = None,
):
kwargs: Dict[str, Any] = method_kwargs or {}
old_columns: List[str] = columns or list("xyz")
new_column = new_column or "".join(old_columns)
def _transform_function(data: pd.DataFrame) -> pd.DataFrame:
data_copy = data.copy()
data_copy[new_column] = method(data_copy[columns], **kwargs)
return data_copy
def _definition_factory(column: str) -> RawDataValueDefinition:
if column in old_columns:
return RawDataValueDefinition(column, column)
return RawDataValueDefinition(
column, f"{method.__name__} applied on {column}"
)
super().__init__(
data_set_id,
_transform_function,
f"{data_set_id}_{method.__name__}",
[_definition_factory(column) for column in [*old_columns, new_column]],
level_filter=level_filter,
)