Source code for dispel.providers.generic.sensor
"""Generic functionality for signal processing steps."""
from functools import partial
from typing import Iterable, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from dispel.data.core import Reading
from dispel.data.levels import Level
from dispel.data.measures import MeasureValueDefinitionPrototype
from dispel.data.raw import (
    ACCELEROMETER_COLUMNS,
    DEFAULT_COLUMNS,
    GRAVITY_COLUMNS,
    RawDataValueDefinition,
)
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing.assertions import NotEmptyDataSetAssertionMixin
from dispel.processing.data_set import transformation
from dispel.processing.extract import ExtractMultipleStep, ExtractStep
from dispel.processing.level import LevelFilterType
from dispel.processing.modalities import SensorModality
from dispel.processing.transform import Apply, TransformStep
from dispel.providers.bdh.data import BDHReading
from dispel.signal.accelerometer import (
    GRAVITY_CONSTANT,
    apply_rotation_matrices,
    compute_rotation_matrices_quaternion,
    remove_gravity_component,
    remove_gravity_component_ori,
)
from dispel.signal.core import (
    amplitude,
    discretize_sampling_frequency,
    energy,
    entropy,
    euclidean_norm,
    peak,
)
from dispel.signal.sensor import SENSOR_UNIT, find_zero_crossings
# Define expected sampling frequencies
FREQ_20HZ = 20
FREQ_50HZ = 50
FREQ_60HZ = 60
FREQ_100HZ = 100  # SensorLog can sample at 100Hz
FREQ_128HZ = 128  # APDM files are sampled at 128Hz
VALID_FREQ_LIST = [FREQ_20HZ, FREQ_50HZ, FREQ_100HZ, FREQ_128HZ]
[docs]
class RenameColumns(TransformStep):
    r"""Rename and select columns of a raw data set.
    Parameters
    ----------
    data_set_id
        The data set id of the time series to be renamed.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine
        the levels to be transformed. If no filter is provided, all levels
        will be transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    kwargs
        All arguments passed into this class will serve as a renaming mapping
        for the raw data set.
    """
[docs]
    def __init__(
        self, data_set_id: str, level_filter: Optional[LevelFilterType] = None, **kwargs
    ):
        def _transform_function(data: pd.DataFrame) -> pd.DataFrame:
            data_ = data.rename(columns=kwargs)
            return data_[kwargs.values()]
        super().__init__(
            data_set_id,
            _transform_function,
            f"{data_set_id}_renamed",
            [RawDataValueDefinition(column, column) for column in kwargs.values()],
            level_filter=level_filter,
        )
[docs]
class SetTimestampIndex(TransformStep):
    r"""Create a new time series based on a date time or time delta column.
    Parameters
    ----------
    data_set_id
        The data set id of the time series to be transformed.
    columns
        The columns to consider in the new raw data set.
    time_stamp_column
        The time series column name to use as index.
    level_filter
        An optional :class:`dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    duplicates
        The strategy used to handle duplicates.
        Has to be one of ``ignore``, ``raise``, ``first``, ``last``.
    """
[docs]
    def __init__(
        self,
        data_set_id: str,
        columns: List[str],
        time_stamp_column: str = "ts",
        level_filter: Optional[LevelFilterType] = None,
        duplicates: Optional[str] = None,
    ):
        def _transform_function(
            data: pd.DataFrame, rm_duplicate: Optional[str]
        ) -> pd.DataFrame:
            if rm_duplicate is None:
                return data.set_index(time_stamp_column)[columns].copy()
            res = data.set_index(time_stamp_column)[columns].copy()
            return res[~res.index.duplicated(keep=duplicates)]
        super().__init__(
            data_set_id,
            lambda x: _transform_function(x, duplicates),
            f"{data_set_id}_ts",
            [RawDataValueDefinition(column, column) for column in columns],
            level_filter=level_filter,
        )
[docs]
class Trim(TransformStep):
    """Trim a sensor signal at the beginning and/or end.
    Parameters
    ----------
    trim_left
        The amount of data to trim from the left side of the sensor readings.
    trim_right
        The amount of data to trim from the right side of the sensor readings.
    ts_column
        The column id to be used in the provided raw data set through
        ``data_set_ids``. If no column is provided, the data set is expected
        to have a time-based index that is used to trim the data set.
    """
    trim_left = pd.Timedelta(0)
    trim_right = pd.Timedelta(0)
    ts_column: Optional[str] = None
[docs]
    def __init__(self, *args, **kwargs):
        if (left := kwargs.pop("trim_left", None)) is not None:
            self.trim_left = left
        if (right := kwargs.pop("trim_right", None)) is not None:
            self.trim_right = right
        if (column := kwargs.pop("ts_column", None)) is not None:
            self.ts_column = column
        super().__init__(*args, **kwargs)
    @transformation
    def _trim(self, data: pd.DataFrame) -> pd.DataFrame:
        ts_col = data.index if self.ts_column is None else data[self.ts_column]
        if self.trim_left > pd.Timedelta(0):
            data = data[ts_col > ts_col.min() + self.trim_left]
        if self.trim_right > pd.Timedelta(0):
            data = data[ts_col < ts_col.max() - self.trim_right]
        return data.copy()
[docs]
class Resample(NotEmptyDataSetAssertionMixin, TransformStep):
    r"""Resample a time-based raw data set to a specific sampling frequency.
    The resampling creates a new raw data set which is accessible via the
    data set comprised of the original one concatenated with ``_resampled``.
    Parameters
    ----------
    data_set_id
        The data set to be resampled. This has to be a data set that uses a
        time-based index. You might first have to apply the
        :class:`SetTimestampIndex` processing step before you can apply
        this step.
    aggregations
        A list of resampling methods to be applied in order. Each can be any
         method that is also accepted by :meth:`pandas.DataFrame.agg`.
    columns
        The columns to be considered during the resampling.
    freq
        The frequency to resample to. See also
        :meth:`pandas.DataFrame.resample` for details. If freq is not provided
        the frequency is estimated automatically taking the median frequency.
    max_frequency_distance
        An optional integer specifying the maximum accepted
        distance between the expected frequency and the estimated frequency
        above which we raise an error.
    level_filter
        An optional :class:`dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    """
[docs]
    def __init__(
        self,
        data_set_id: str,
        aggregations: Iterable[str],
        columns: Iterable[str],
        freq: Optional[Union[float, str]] = None,
        max_frequency_distance: Optional[int] = None,
        level_filter: Optional[LevelFilterType] = None,
    ):
        def _resample(
            data: pd.DataFrame, sampling_frequency: Optional[Union[float, str]] = None
        ) -> pd.DataFrame:
            # Check if a sampling frequency is provided
            # If not, we discretized the sampling frequency
            if sampling_frequency is None:
                discretize_args = [data, VALID_FREQ_LIST]
                if max_frequency_distance:
                    discretize_args.append(max_frequency_distance)
                sampling_frequency = discretize_sampling_frequency(*discretize_args)
            # Convert the float sampling frequency to a Timedelta format
            if not isinstance(sampling_frequency, str):
                sampling_frequency = pd.Timedelta(1 / sampling_frequency, unit="s")
            resample_obj = data[columns].resample(sampling_frequency)
            for method in aggregations:
                resample_obj = resample_obj.agg(method)
            return resample_obj
        def _definition_factory(column: str) -> RawDataValueDefinition:
            return RawDataValueDefinition(
                column, f"{column} resampled with {aggregations}"
            )
        super().__init__(
            data_set_id,
            partial(_resample, sampling_frequency=freq),
            f"{data_set_id}_resampled",
            [_definition_factory(column) for column in columns],
            level_filter=level_filter,
        )
[docs]
class Upsample(Apply):
    r"""Upsample a time-based raw data set to a specific sampling frequency.
    The upsampling creates a new raw data set which is an upsampled version
    of the original data set identified by data_set_id. The upsampled data
    set is accessible via the new_data_set_id which is a concatenation of the
    original data_set_id and a suffix ``_upsampled``.
    Parameters
    ----------
    interpolation_method
        Interpolation technique to use to fill NaN values. It should be a
         method that is also accepted by :meth:`pandas.DataFrame.interpolate`.
    freq
        The frequency to upsample to. See also
        :meth:`pandas.DataFrame.resample` for details.
    """
[docs]
    def get_new_data_set_id(self) -> str:
        """Overwrite new_data_set_id."""
        return f"{self.get_data_set_ids()[0]}_upsampled"  # type: ignore
[docs]
    def __init__(self, interpolation_method: str, freq: Union[float, str], **kwargs):
        def _upsample(
            data: pd.DataFrame, sampling_frequency: Union[float, str]
        ) -> pd.DataFrame:
            """Upsample a dataframe to a given sampling frequency."""
            # Convert the float sampling frequency to a Timedelta format
            if not isinstance(sampling_frequency, str):
                sampling_frequency = pd.Timedelta(1 / sampling_frequency, unit="s")
            resample_obj = data.resample(sampling_frequency)
            return resample_obj.interpolate(interpolation_method)
        super().__init__(
            method=_upsample, method_kwargs={"sampling_frequency": freq}, **kwargs
        )
[docs]
class ExtractAverageSignalEnergy(NotEmptyDataSetAssertionMixin, ExtractStep):
    r"""An average signal energy extraction step.
    Parameters
    ----------
    sensor
        The type of sensor on which the extraction is to be performed.
    data_set_id
        The data set id on which the extraction is to be performed.
    columns
        The columns onto which the signal energy is to be computed.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    """
[docs]
    def __init__(
        self,
        sensor: SensorModality,
        data_set_id: str,
        columns: List[str],
        level_filter: Optional[LevelFilterType] = None,
    ):
        def _average_signal(data: pd.DataFrame):
            return np.linalg.norm(data[columns], ord=2)
        super().__init__(
            data_set_id,
            _average_signal,
            definition=MeasureValueDefinitionPrototype(
                measure_name=AV(f"average {sensor} energy", f"{sensor.abbr}_sig_ene"),
                data_type="float64",
                description=f"The average {sensor} energy of the "
                f'{"".join(columns)} columns of the signal.',
                unit=SENSOR_UNIT[sensor.abbr],
            ),
            level_filter=level_filter,
        )
[docs]
class ExtractPowerSpectrumMeasures(NotEmptyDataSetAssertionMixin, ExtractMultipleStep):
    r"""A measure extraction processing step for power spectrum measures.
    Parameters
    ----------
    sensor
        The type of sensor on which the extraction is to be performed.
    data_set_id
        The data set id on which the extraction is to be performed.
    columns
        The columns onto which the power spectrum measures are to be extracted.
    lower_bound
        The lower bound of frequencies below which the signal is filtered.
    upper_bound
        The higher bound of frequencies above which the signal is filtered.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    """
[docs]
    def __init__(
        self,
        sensor: SensorModality,
        data_set_id: str,
        columns: List[str],
        lower_bound: Optional[float] = None,
        upper_bound: Optional[float] = None,
        level_filter: Optional[LevelFilterType] = None,
    ):
        unit = sensor.unit(order=2)
        atomic_functions = [
            {
                "func": partial(energy, lowcut=lower_bound, highcut=upper_bound),
                "name": AV("energy", "ene"),
                "description": "The power spectrum energy summed between the "
                f"frequencies ({lower_bound}, {upper_bound}) "
                f"of the {{axis}} axis for the {sensor} "
                f"signal.",
                "unit": unit,
                "outcome_uuid": "99ef9a8d-a925-4eb0-9e80-be58cd4a9ac9",
            },
            {
                "func": peak,
                "name": AV("peak", "peak"),
                "description": f"The frequency at which the power spectrum of "
                "the {axis} axis reaches its maximum value for "
                f"the {sensor} signal.",
                "unit": "Hz",
                "outcome_uuid": "87512c93-3a5b-4c9e-9575-fd9ed19649ca",
            },
            {
                "func": entropy,
                "name": AV("entropy", "ent"),
                "description": "The power spectrum entropy of the {axis} axis "
                f"for the {sensor} signal.",
                "unit": unit,
                "outcome_uuid": "6726bb5a-8084-49f5-a53e-6a28a8f27695",
            },
            {
                "func": amplitude,
                "name": AV("amplitude", "amp"),
                "description": "The power spectrum amplitude (i.e. the maximum"
                " value) of the {axis} axis for the "
                f"{sensor} signal.",
                "unit": unit,
                "outcome_uuid": "bde2c1f9-abf7-41e7-91f8-e0ddddf34a5c",
            },
        ]
        def _function_factory(atomic_function, axis):
            return dict(
                func=lambda x: atomic_function["func"](x[axis]),
                description=atomic_function["description"].format(axis=axis),
                unit=atomic_function["unit"],
                measure_name=AV(
                    f'{sensor} power spectrum {atomic_function["name"]} {axis}'
                    f" axis",
                    f'{sensor.abbr}_ps_{atomic_function["name"].abbr}_{axis}',
                ),
            )
        functions = [
            _function_factory(atomic_function, axis)
            for atomic_function in atomic_functions
            for axis in columns
        ]
        super().__init__(
            data_set_id,
            functions,
            definition=MeasureValueDefinitionPrototype(data_type="float64"),
            level_filter=level_filter,
        )
[docs]
class ComputeGravityRotationMatrices(TransformStep):
    r"""Compute a series of rotation matrices to align sensors to gravity.
    This transformation step creates a series of rotation matrices based on the
    gravity information contained in the accelerometer sensor. This allows to
    rotate other sensors on a desired orientation related to gravity. This is
    in particular of interest if we want to measure physical interactions with
    devices around the plane perpendicular to gravity.
    Parameters
    ----------
    target_gravity
        The target gravity vector, e.g. ``(-1, 0, 0)`` to create rotation
        matrices that rotate the x-axis of a device onto gravity.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    """
[docs]
    def __init__(
        self, data_set_id: str, target_gravity: Tuple[float, float, float], **kwargs
    ):
        def _transform_function(data: pd.DataFrame) -> pd.Series:
            return compute_rotation_matrices_quaternion(
                data[GRAVITY_COLUMNS], target_gravity
            )
        super().__init__(
            data_set_id,
            _transform_function,
            "gravity_rotation_matrices",
            [RawDataValueDefinition("rotation_matrix", "Rotation Matrix")],
            **kwargs,
        )
[docs]
class RotateSensorWithGravityRotationMatrices(TransformStep):
    r"""Apply a series of rotation matrices to a sensor.
    This is a complementary step to :class:`ComputeGravityRotationMatrices` and
    applies the rotation matrices to the specified sensor.
    Parameters
    ----------
    data_set_id
        The id of the sensor data set to be rotated.
    columns
        The columns of the sensor data set to be considered in the rotation.
    level_filter
        An optional :class:`~dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    Examples
    --------
    Assuming you want to rotate the gyroscope vector onto gravity you can
    achieve this by chaining the following steps:
    .. doctest:: processing
        >>> from dispel.data.raw import DEFAULT_COLUMNS
        >>> from dispel.processing import process
        >>> from dispel.providers.generic.sensor import (
        ...     ComputeGravityRotationMatrices,
        ...     RotateSensorWithGravityRotationMatrices
        ... )
        >>> cols = DEFAULT_COLUMNS
        >>> steps = [
        ...     ComputeGravityRotationMatrices('accelerometer', (-1, 0, 0)),
        ...     RotateSensorWithGravityRotationMatrices('gyroscope', cols)
        ... ]
        >>> _ = process(reading, steps)  # doctest: +SKIP
    The results of the roation are available in the raw data set with the id
    ``<data_set_id>_rotated``:
    .. doctest:: processing
        :options: +NORMALIZE_WHITESPACE
        >>> level = reading.get_level(level_id)  # doctest: +SKIP
        >>> level.get_raw_data_set('gyroscope').data.head()  # doctest: +SKIP
                  x         y         z                      ts
        0  0.035728 -0.021515  0.014879 2020-05-04 17:31:38.574
        1 -0.012046  0.005010 -0.009029 2020-05-04 17:31:38.625
        2  0.006779  0.000761 -0.003253 2020-05-04 17:31:38.680
        3  0.032636 -0.020272 -0.021915 2020-05-04 17:31:38.729
        4  0.007495 -0.014061  0.012886 2020-05-04 17:31:38.779
        >>> level.get_raw_data_set(
        ...     'gyroscope_rotated'
        ... ).data.head()  # doctest: +SKIP
                  x         y         z
        0 -0.002309 -0.042509 -0.012182
        1 -0.003754  0.014983  0.003624
        2 -0.002237 -0.002116 -0.006901
        3 -0.030461 -0.021654 -0.023656
        4  0.001203 -0.019580  0.005924
    """
[docs]
    def __init__(
        self,
        data_set_id: str,
        columns: Iterable[str],
        level_filter: Optional[LevelFilterType] = None,
    ):
        def _transform_function(
            sensor_df: pd.DataFrame, matrices: pd.DataFrame
        ) -> pd.DataFrame:
            return apply_rotation_matrices(
                matrices["rotation_matrix"], sensor_df[columns]
            )
        def _definition_factory(column: str) -> RawDataValueDefinition:
            return RawDataValueDefinition(column, f"{column} rotated")
        super().__init__(
            [data_set_id, "gravity_rotation_matrices"],
            _transform_function,
            f"{data_set_id}_rotated",
            [_definition_factory(column) for column in columns],
            level_filter=level_filter,
        )
[docs]
class TransformUserAcceleration(TransformStep):
    r"""Format accelerometer data to ADS format if not already the case.
    Prior to formatting, linear acceleration and gravity are decoupled
    from acceleration.
    Parameters
    ----------
    level_filter
        An optional :class:`dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    """
    data_set_ids = "accelerometer"
    new_data_set_id = "acc"
    definitions = (
        [
            RawDataValueDefinition(
                f"userAcceleration{axis}",
                f"Linear Acceleration along the {axis} axis.",
                data_type="float",
            )
            for axis in "XYZ"
        ]
        + [
            RawDataValueDefinition(
                f"gravity{axis}",
                f"gravity component along the {axis} axis.",
                data_type="float",
            )
            for axis in "XYZ"
        ]
        + [RawDataValueDefinition("ts", "time index")]
    )
[docs]
    @staticmethod
    def add_gravity(
        accelerometer: pd.DataFrame,
        level: Level,
        gravity: Optional[pd.DataFrame] = None,
    ) -> pd.DataFrame:
        """Format gravity data to ADS format."""
        if gravity is None:
            cols = ["x", "y", "z"]
            raw_acc = level.get_raw_data_set("raw_accelerometer").data
            accelerometer = raw_acc
            if level.has_raw_data_set("attitude"):
                ori = level.get_raw_data_set("attitude").data
                ori_cols = ["w", "x", "y", "z"]
                lin_accelerometer, gravity = remove_gravity_component_ori(
                    accelerometer[cols].values, ori[ori_cols].values
                )
                lin_accelerometer = pd.DataFrame(lin_accelerometer, columns=cols)
                gravity = pd.DataFrame(gravity, columns=cols)
            else:
                lin_accelerometer, gravity = remove_gravity_component(
                    accelerometer[cols]
                )
            res = pd.DataFrame(
                {
                    "userAccelerationX": lin_accelerometer["x"],
                    "userAccelerationY": lin_accelerometer["y"],
                    "userAccelerationZ": lin_accelerometer["z"],
                }
            )
            res["gravityX"] = gravity["x"]
            res["gravityY"] = gravity["y"]
            res["gravityZ"] = gravity["z"]
            res["ts"] = accelerometer["ts"]
        else:
            # Merging on the timestamps vs. on the indexes
            acc_renamed = accelerometer.rename(
                mapper={
                    "x": "userAccelerationX",
                    "y": "userAccelerationY",
                    "z": "userAccelerationZ",
                },
                axis=1,
            )
            gravity_renamed = gravity.rename(
                mapper={"x": "gravityX", "y": "gravityY", "z": "gravityZ"}, axis=1
            )
            merged = acc_renamed.merge(gravity_renamed, how="outer")
            merged = merged.set_index("ts")
            merged_sorted = merged.sort_index()
            merged_sorted_interpolated = merged_sorted.interpolate(
                method="nearest", limit_direction="both"
            )
            res = merged_sorted_interpolated.loc[acc_renamed.ts].reset_index()
        return res.dropna()
    @staticmethod
    @transformation
    def _reformat(accelerometer: pd.DataFrame, level: Level) -> pd.DataFrame:
        target_cols = {
            f"{sensor}{axis}"
            for sensor in ("userAcceleration", "gravity")
            for axis in "XYZ"
        }
        if not target_cols.issubset(accelerometer.columns):
            try:
                return TransformUserAcceleration.add_gravity(
                    accelerometer, level, level.get_raw_data_set("gravity").data
                )
            except ValueError:
                # Happens in BDH pinch
                return TransformUserAcceleration.add_gravity(accelerometer, level)
        return accelerometer
[docs]
class TransformGyroscope(TransformStep):
    r"""Format gyroscope data to ADS format if not already the case.
    On ADS format, the gyroscope is synchronized with the accelerometer. Here
    we make sure gyroscope is synchronized with the acc data set.
    Parameters
    ----------
    level_filter
        An optional :class:`dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`~dispel.processing.level.LevelFilter` for convenience.
    """
    data_set_ids = ["acc", "gyroscope"]
    new_data_set_id = "gyroscope"
    definitions = [
        RawDataValueDefinition(
            axis, f"Rotation speed along the {axis} axis.", data_type="float"
        )
        for axis in "xyz"
    ] + [RawDataValueDefinition("ts", "time index")]
    @staticmethod
    @transformation
    def _synchronize_gyroscope(
        accelerometer: pd.DataFrame, gyroscope: pd.DataFrame, reading: Reading
    ) -> pd.DataFrame:
        if isinstance(reading, BDHReading):
            # Merging on the timestamps vs. on the indexes
            acc_renamed = accelerometer.rename(
                mapper={
                    "x": "userAccelerationX",
                    "y": "userAccelerationY",
                    "z": "userAccelerationZ",
                },
                axis=1,
            )
            return pd.merge_asof(acc_renamed, gyroscope, on="ts", direction="nearest")[
                ["ts", "x", "y", "z"]
            ]
        return gyroscope
[docs]
class EuclideanNorm(TransformStep):
    r"""Compute euclidean norm of the specified columns of a raw data set.
    Parameters
    ----------
    data_set_id
        The data set id of the data set on which the method is to be applied
    columns
        The columns to be considered during the method application.
    drop_nan
        ```True`` if NaN values are to be dropped after transformation.
    level_filter
        An optional :class:`dispel.processing.level.LevelFilter` to determine the
        levels to be transformed. If no filter is provided, all levels will be
        transformed. The ``level_filter`` also accepts :class:`str`,
        :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them
        to a :class:`dispel.processing.level.LevelIdFilter` for convenience.
    """
[docs]
    def __init__(
        self,
        data_set_id: str,
        columns: Optional[List[str]] = None,
        drop_nan: bool = False,
        level_filter: Optional[LevelFilterType] = None,
    ):
        columns = columns or DEFAULT_COLUMNS
        def _transform_function(data: pd.DataFrame) -> pd.Series:
            res = euclidean_norm(data[columns])
            if drop_nan:
                return res.dropna()
            return res
        definition = RawDataValueDefinition(
            "norm", f"euclidean_norm computed on {columns}"
        )
        super().__init__(
            data_set_id,
            _transform_function,
            f"{data_set_id}_euclidean_norm",
            [definition],
            level_filter=level_filter,
        )
[docs]
class AddGravityAndScale(TransformStep):
    """Add gravity to userAcceleration and scale to m/s^2.
    The step expects a unique data set id for `data_set_ids` pointing to a
    data frame containing both acceleration and gravity with a
    :class:`pandas.DatetimeIndex` index.
    """
    definitions = [
        RawDataValueDefinition(f"acceleration_{ax}", f"acceleration_{ax}")
        for ax in DEFAULT_COLUMNS
    ]
    @transformation
    def _transform(self, data) -> pd.DataFrame:
        acc = {}
        for i, ax in enumerate(DEFAULT_COLUMNS):
            acc[f"acceleration_{ax}"] = (
                data[ACCELEROMETER_COLUMNS[i]] + data[GRAVITY_COLUMNS[i]]
            )
        return pd.DataFrame(acc, index=data.index) * GRAVITY_CONSTANT
[docs]
    def get_new_data_set_id(self):
        """Overwrite new_data_set_id."""
        return f"{self.data_set_ids}_g"
[docs]
class TransformFindZeroCrossings(TransformStep):
    """Find zero crossings in the signal.
    To find the zeros, the function identifies the sign change in the signal by
    differentiating `data > 0`.
    Attributes
    ----------
    column
        The column to be used to find the zero crossings in the data set
    Parameters
    ----------
    column
        See :data:FindZeroCrossings.column`.
    """
    column: str
    definitions: List[RawDataValueDefinition] = [
        RawDataValueDefinition(
            "zero_crossings", "Zero-crossings of the signal.", data_type="float"
        )
    ]
[docs]
    def __init__(self, *args, column: Optional[str] = None, **kwargs):
        super().__init__(*args, **kwargs)
        self.column = column or self.column
    @transformation
    def _find_zero_crossings(self, data: pd.DataFrame) -> pd.DataFrame:
        return find_zero_crossings(data, self.column)