Source code for dispel.providers.generic.sensor

"""Generic functionality for signal processing steps."""
from functools import partial
from typing import Iterable, List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from dispel.data.core import Reading
from dispel.data.levels import Level
from dispel.data.measures import MeasureValueDefinitionPrototype
from dispel.data.raw import (
    ACCELEROMETER_COLUMNS,
    DEFAULT_COLUMNS,
    GRAVITY_COLUMNS,
    RawDataValueDefinition,
)
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing.assertions import NotEmptyDataSetAssertionMixin
from dispel.processing.data_set import transformation
from dispel.processing.extract import ExtractMultipleStep, ExtractStep
from dispel.processing.level import LevelFilterType
from dispel.processing.modalities import SensorModality
from dispel.processing.transform import Apply, TransformStep
from dispel.providers.bdh.data import BDHReading
from dispel.signal.accelerometer import (
    GRAVITY_CONSTANT,
    apply_rotation_matrices,
    compute_rotation_matrices_quaternion,
    remove_gravity_component,
    remove_gravity_component_ori,
)
from dispel.signal.core import (
    amplitude,
    discretize_sampling_frequency,
    energy,
    entropy,
    euclidean_norm,
    peak,
)
from dispel.signal.sensor import SENSOR_UNIT, find_zero_crossings

# Define expected sampling frequencies
FREQ_20HZ = 20
FREQ_50HZ = 50
FREQ_60HZ = 60
FREQ_100HZ = 100  # SensorLog can sample at 100Hz
FREQ_128HZ = 128  # APDM files are sampled at 128Hz

VALID_FREQ_LIST = [FREQ_20HZ, FREQ_50HZ, FREQ_100HZ, FREQ_128HZ]


[docs] class RenameColumns(TransformStep): r"""Rename and select columns of a raw data set. Parameters ---------- data_set_id The data set id of the time series to be renamed. level_filter An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. kwargs All arguments passed into this class will serve as a renaming mapping for the raw data set. """
[docs] def __init__( self, data_set_id: str, level_filter: Optional[LevelFilterType] = None, **kwargs ): def _transform_function(data: pd.DataFrame) -> pd.DataFrame: data_ = data.rename(columns=kwargs) return data_[kwargs.values()] super().__init__( data_set_id, _transform_function, f"{data_set_id}_renamed", [RawDataValueDefinition(column, column) for column in kwargs.values()], level_filter=level_filter, )
[docs] class SetTimestampIndex(TransformStep): r"""Create a new time series based on a date time or time delta column. Parameters ---------- data_set_id The data set id of the time series to be transformed. columns The columns to consider in the new raw data set. time_stamp_column The time series column name to use as index. level_filter An optional :class:`dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. duplicates The strategy used to handle duplicates. Has to be one of ``ignore``, ``raise``, ``first``, ``last``. """
[docs] def __init__( self, data_set_id: str, columns: List[str], time_stamp_column: str = "ts", level_filter: Optional[LevelFilterType] = None, duplicates: Optional[str] = None, ): def _transform_function( data: pd.DataFrame, rm_duplicate: Optional[str] ) -> pd.DataFrame: if rm_duplicate is None: return data.set_index(time_stamp_column)[columns].copy() res = data.set_index(time_stamp_column)[columns].copy() return res[~res.index.duplicated(keep=duplicates)] super().__init__( data_set_id, lambda x: _transform_function(x, duplicates), f"{data_set_id}_ts", [RawDataValueDefinition(column, column) for column in columns], level_filter=level_filter, )
[docs] class Trim(TransformStep): """Trim a sensor signal at the beginning and/or end. Parameters ---------- trim_left The amount of data to trim from the left side of the sensor readings. trim_right The amount of data to trim from the right side of the sensor readings. ts_column The column id to be used in the provided raw data set through ``data_set_ids``. If no column is provided, the data set is expected to have a time-based index that is used to trim the data set. """ trim_left = pd.Timedelta(0) trim_right = pd.Timedelta(0) ts_column: Optional[str] = None
[docs] def __init__(self, *args, **kwargs): if (left := kwargs.pop("trim_left", None)) is not None: self.trim_left = left if (right := kwargs.pop("trim_right", None)) is not None: self.trim_right = right if (column := kwargs.pop("ts_column", None)) is not None: self.ts_column = column super().__init__(*args, **kwargs)
@transformation def _trim(self, data: pd.DataFrame) -> pd.DataFrame: ts_col = data.index if self.ts_column is None else data[self.ts_column] if self.trim_left > pd.Timedelta(0): data = data[ts_col > ts_col.min() + self.trim_left] if self.trim_right > pd.Timedelta(0): data = data[ts_col < ts_col.max() - self.trim_right] return data.copy()
[docs] class Resample(NotEmptyDataSetAssertionMixin, TransformStep): r"""Resample a time-based raw data set to a specific sampling frequency. The resampling creates a new raw data set which is accessible via the data set comprised of the original one concatenated with ``_resampled``. Parameters ---------- data_set_id The data set to be resampled. This has to be a data set that uses a time-based index. You might first have to apply the :class:`SetTimestampIndex` processing step before you can apply this step. aggregations A list of resampling methods to be applied in order. Each can be any method that is also accepted by :meth:`pandas.DataFrame.agg`. columns The columns to be considered during the resampling. freq The frequency to resample to. See also :meth:`pandas.DataFrame.resample` for details. If freq is not provided the frequency is estimated automatically taking the median frequency. max_frequency_distance An optional integer specifying the maximum accepted distance between the expected frequency and the estimated frequency above which we raise an error. level_filter An optional :class:`dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. """
[docs] def __init__( self, data_set_id: str, aggregations: Iterable[str], columns: Iterable[str], freq: Optional[Union[float, str]] = None, max_frequency_distance: Optional[int] = None, level_filter: Optional[LevelFilterType] = None, ): def _resample( data: pd.DataFrame, sampling_frequency: Optional[Union[float, str]] = None ) -> pd.DataFrame: # Check if a sampling frequency is provided # If not, we discretized the sampling frequency if sampling_frequency is None: discretize_args = [data, VALID_FREQ_LIST] if max_frequency_distance: discretize_args.append(max_frequency_distance) sampling_frequency = discretize_sampling_frequency(*discretize_args) # Convert the float sampling frequency to a Timedelta format if not isinstance(sampling_frequency, str): sampling_frequency = pd.Timedelta(1 / sampling_frequency, unit="s") resample_obj = data[columns].resample(sampling_frequency) for method in aggregations: resample_obj = resample_obj.agg(method) return resample_obj def _definition_factory(column: str) -> RawDataValueDefinition: return RawDataValueDefinition( column, f"{column} resampled with {aggregations}" ) super().__init__( data_set_id, partial(_resample, sampling_frequency=freq), f"{data_set_id}_resampled", [_definition_factory(column) for column in columns], level_filter=level_filter, )
[docs] class Upsample(Apply): r"""Upsample a time-based raw data set to a specific sampling frequency. The upsampling creates a new raw data set which is an upsampled version of the original data set identified by data_set_id. The upsampled data set is accessible via the new_data_set_id which is a concatenation of the original data_set_id and a suffix ``_upsampled``. Parameters ---------- interpolation_method Interpolation technique to use to fill NaN values. It should be a method that is also accepted by :meth:`pandas.DataFrame.interpolate`. freq The frequency to upsample to. See also :meth:`pandas.DataFrame.resample` for details. """
[docs] def get_new_data_set_id(self) -> str: """Overwrite new_data_set_id.""" return f"{self.get_data_set_ids()[0]}_upsampled" # type: ignore
[docs] def __init__(self, interpolation_method: str, freq: Union[float, str], **kwargs): def _upsample( data: pd.DataFrame, sampling_frequency: Union[float, str] ) -> pd.DataFrame: """Upsample a dataframe to a given sampling frequency.""" # Convert the float sampling frequency to a Timedelta format if not isinstance(sampling_frequency, str): sampling_frequency = pd.Timedelta(1 / sampling_frequency, unit="s") resample_obj = data.resample(sampling_frequency) return resample_obj.interpolate(interpolation_method) super().__init__( method=_upsample, method_kwargs={"sampling_frequency": freq}, **kwargs )
[docs] class ExtractAverageSignalEnergy(NotEmptyDataSetAssertionMixin, ExtractStep): r"""An average signal energy extraction step. Parameters ---------- sensor The type of sensor on which the extraction is to be performed. data_set_id The data set id on which the extraction is to be performed. columns The columns onto which the signal energy is to be computed. level_filter An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. """
[docs] def __init__( self, sensor: SensorModality, data_set_id: str, columns: List[str], level_filter: Optional[LevelFilterType] = None, ): def _average_signal(data: pd.DataFrame): return np.linalg.norm(data[columns], ord=2) super().__init__( data_set_id, _average_signal, definition=MeasureValueDefinitionPrototype( measure_name=AV(f"average {sensor} energy", f"{sensor.abbr}_sig_ene"), data_type="float64", description=f"The average {sensor} energy of the " f'{"".join(columns)} columns of the signal.', unit=SENSOR_UNIT[sensor.abbr], ), level_filter=level_filter, )
[docs] class ExtractPowerSpectrumMeasures(NotEmptyDataSetAssertionMixin, ExtractMultipleStep): r"""A measure extraction processing step for power spectrum measures. Parameters ---------- sensor The type of sensor on which the extraction is to be performed. data_set_id The data set id on which the extraction is to be performed. columns The columns onto which the power spectrum measures are to be extracted. lower_bound The lower bound of frequencies below which the signal is filtered. upper_bound The higher bound of frequencies above which the signal is filtered. level_filter An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. """
[docs] def __init__( self, sensor: SensorModality, data_set_id: str, columns: List[str], lower_bound: Optional[float] = None, upper_bound: Optional[float] = None, level_filter: Optional[LevelFilterType] = None, ): unit = sensor.unit(order=2) atomic_functions = [ { "func": partial(energy, lowcut=lower_bound, highcut=upper_bound), "name": AV("energy", "ene"), "description": "The power spectrum energy summed between the " f"frequencies ({lower_bound}, {upper_bound}) " f"of the {{axis}} axis for the {sensor} " f"signal.", "unit": unit, "outcome_uuid": "99ef9a8d-a925-4eb0-9e80-be58cd4a9ac9", }, { "func": peak, "name": AV("peak", "peak"), "description": f"The frequency at which the power spectrum of " "the {axis} axis reaches its maximum value for " f"the {sensor} signal.", "unit": "Hz", "outcome_uuid": "87512c93-3a5b-4c9e-9575-fd9ed19649ca", }, { "func": entropy, "name": AV("entropy", "ent"), "description": "The power spectrum entropy of the {axis} axis " f"for the {sensor} signal.", "unit": unit, "outcome_uuid": "6726bb5a-8084-49f5-a53e-6a28a8f27695", }, { "func": amplitude, "name": AV("amplitude", "amp"), "description": "The power spectrum amplitude (i.e. the maximum" " value) of the {axis} axis for the " f"{sensor} signal.", "unit": unit, "outcome_uuid": "bde2c1f9-abf7-41e7-91f8-e0ddddf34a5c", }, ] def _function_factory(atomic_function, axis): return dict( func=lambda x: atomic_function["func"](x[axis]), description=atomic_function["description"].format(axis=axis), unit=atomic_function["unit"], measure_name=AV( f'{sensor} power spectrum {atomic_function["name"]} {axis}' f" axis", f'{sensor.abbr}_ps_{atomic_function["name"].abbr}_{axis}', ), ) functions = [ _function_factory(atomic_function, axis) for atomic_function in atomic_functions for axis in columns ] super().__init__( data_set_id, functions, definition=MeasureValueDefinitionPrototype(data_type="float64"), level_filter=level_filter, )
[docs] class ComputeGravityRotationMatrices(TransformStep): r"""Compute a series of rotation matrices to align sensors to gravity. This transformation step creates a series of rotation matrices based on the gravity information contained in the accelerometer sensor. This allows to rotate other sensors on a desired orientation related to gravity. This is in particular of interest if we want to measure physical interactions with devices around the plane perpendicular to gravity. Parameters ---------- target_gravity The target gravity vector, e.g. ``(-1, 0, 0)`` to create rotation matrices that rotate the x-axis of a device onto gravity. level_filter An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. """
[docs] def __init__( self, data_set_id: str, target_gravity: Tuple[float, float, float], **kwargs ): def _transform_function(data: pd.DataFrame) -> pd.Series: return compute_rotation_matrices_quaternion( data[GRAVITY_COLUMNS], target_gravity ) super().__init__( data_set_id, _transform_function, "gravity_rotation_matrices", [RawDataValueDefinition("rotation_matrix", "Rotation Matrix")], **kwargs, )
[docs] class RotateSensorWithGravityRotationMatrices(TransformStep): r"""Apply a series of rotation matrices to a sensor. This is a complementary step to :class:`ComputeGravityRotationMatrices` and applies the rotation matrices to the specified sensor. Parameters ---------- data_set_id The id of the sensor data set to be rotated. columns The columns of the sensor data set to be considered in the rotation. level_filter An optional :class:`~dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. Examples -------- Assuming you want to rotate the gyroscope vector onto gravity you can achieve this by chaining the following steps: .. doctest:: processing >>> from dispel.data.raw import DEFAULT_COLUMNS >>> from dispel.processing import process >>> from dispel.providers.generic.sensor import ( ... ComputeGravityRotationMatrices, ... RotateSensorWithGravityRotationMatrices ... ) >>> cols = DEFAULT_COLUMNS >>> steps = [ ... ComputeGravityRotationMatrices('accelerometer', (-1, 0, 0)), ... RotateSensorWithGravityRotationMatrices('gyroscope', cols) ... ] >>> _ = process(reading, steps) # doctest: +SKIP The results of the roation are available in the raw data set with the id ``<data_set_id>_rotated``: .. doctest:: processing :options: +NORMALIZE_WHITESPACE >>> level = reading.get_level(level_id) # doctest: +SKIP >>> level.get_raw_data_set('gyroscope').data.head() # doctest: +SKIP x y z ts 0 0.035728 -0.021515 0.014879 2020-05-04 17:31:38.574 1 -0.012046 0.005010 -0.009029 2020-05-04 17:31:38.625 2 0.006779 0.000761 -0.003253 2020-05-04 17:31:38.680 3 0.032636 -0.020272 -0.021915 2020-05-04 17:31:38.729 4 0.007495 -0.014061 0.012886 2020-05-04 17:31:38.779 >>> level.get_raw_data_set( ... 'gyroscope_rotated' ... ).data.head() # doctest: +SKIP x y z 0 -0.002309 -0.042509 -0.012182 1 -0.003754 0.014983 0.003624 2 -0.002237 -0.002116 -0.006901 3 -0.030461 -0.021654 -0.023656 4 0.001203 -0.019580 0.005924 """
[docs] def __init__( self, data_set_id: str, columns: Iterable[str], level_filter: Optional[LevelFilterType] = None, ): def _transform_function( sensor_df: pd.DataFrame, matrices: pd.DataFrame ) -> pd.DataFrame: return apply_rotation_matrices( matrices["rotation_matrix"], sensor_df[columns] ) def _definition_factory(column: str) -> RawDataValueDefinition: return RawDataValueDefinition(column, f"{column} rotated") super().__init__( [data_set_id, "gravity_rotation_matrices"], _transform_function, f"{data_set_id}_rotated", [_definition_factory(column) for column in columns], level_filter=level_filter, )
[docs] class TransformUserAcceleration(TransformStep): r"""Format accelerometer data to ADS format if not already the case. Prior to formatting, linear acceleration and gravity are decoupled from acceleration. Parameters ---------- level_filter An optional :class:`dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. """ data_set_ids = "accelerometer" new_data_set_id = "acc" definitions = ( [ RawDataValueDefinition( f"userAcceleration{axis}", f"Linear Acceleration along the {axis} axis.", data_type="float", ) for axis in "XYZ" ] + [ RawDataValueDefinition( f"gravity{axis}", f"gravity component along the {axis} axis.", data_type="float", ) for axis in "XYZ" ] + [RawDataValueDefinition("ts", "time index")] )
[docs] @staticmethod def add_gravity( accelerometer: pd.DataFrame, level: Level, gravity: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: """Format gravity data to ADS format.""" if gravity is None: cols = ["x", "y", "z"] raw_acc = level.get_raw_data_set("raw_accelerometer").data accelerometer = raw_acc if level.has_raw_data_set("attitude"): ori = level.get_raw_data_set("attitude").data ori_cols = ["w", "x", "y", "z"] lin_accelerometer, gravity = remove_gravity_component_ori( accelerometer[cols].values, ori[ori_cols].values ) lin_accelerometer = pd.DataFrame(lin_accelerometer, columns=cols) gravity = pd.DataFrame(gravity, columns=cols) else: lin_accelerometer, gravity = remove_gravity_component( accelerometer[cols] ) res = pd.DataFrame( { "userAccelerationX": lin_accelerometer["x"], "userAccelerationY": lin_accelerometer["y"], "userAccelerationZ": lin_accelerometer["z"], } ) res["gravityX"] = gravity["x"] res["gravityY"] = gravity["y"] res["gravityZ"] = gravity["z"] res["ts"] = accelerometer["ts"] else: # Merging on the timestamps vs. on the indexes acc_renamed = accelerometer.rename( mapper={ "x": "userAccelerationX", "y": "userAccelerationY", "z": "userAccelerationZ", }, axis=1, ) gravity_renamed = gravity.rename( mapper={"x": "gravityX", "y": "gravityY", "z": "gravityZ"}, axis=1 ) merged = acc_renamed.merge(gravity_renamed, how="outer") merged = merged.set_index("ts") merged_sorted = merged.sort_index() merged_sorted_interpolated = merged_sorted.interpolate( method="nearest", limit_direction="both" ) res = merged_sorted_interpolated.loc[acc_renamed.ts].reset_index() return res.dropna()
@staticmethod @transformation def _reformat(accelerometer: pd.DataFrame, level: Level) -> pd.DataFrame: target_cols = { f"{sensor}{axis}" for sensor in ("userAcceleration", "gravity") for axis in "XYZ" } if not target_cols.issubset(accelerometer.columns): try: return TransformUserAcceleration.add_gravity( accelerometer, level, level.get_raw_data_set("gravity").data ) except ValueError: # Happens in BDH pinch return TransformUserAcceleration.add_gravity(accelerometer, level) return accelerometer
[docs] class TransformGyroscope(TransformStep): r"""Format gyroscope data to ADS format if not already the case. On ADS format, the gyroscope is synchronized with the accelerometer. Here we make sure gyroscope is synchronized with the acc data set. Parameters ---------- level_filter An optional :class:`dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`~dispel.processing.level.LevelFilter` for convenience. """ data_set_ids = ["acc", "gyroscope"] new_data_set_id = "gyroscope" definitions = [ RawDataValueDefinition( axis, f"Rotation speed along the {axis} axis.", data_type="float" ) for axis in "xyz" ] + [RawDataValueDefinition("ts", "time index")] @staticmethod @transformation def _synchronize_gyroscope( accelerometer: pd.DataFrame, gyroscope: pd.DataFrame, reading: Reading ) -> pd.DataFrame: if isinstance(reading, BDHReading): # Merging on the timestamps vs. on the indexes acc_renamed = accelerometer.rename( mapper={ "x": "userAccelerationX", "y": "userAccelerationY", "z": "userAccelerationZ", }, axis=1, ) return pd.merge_asof(acc_renamed, gyroscope, on="ts", direction="nearest")[ ["ts", "x", "y", "z"] ] return gyroscope
[docs] class EuclideanNorm(TransformStep): r"""Compute euclidean norm of the specified columns of a raw data set. Parameters ---------- data_set_id The data set id of the data set on which the method is to be applied columns The columns to be considered during the method application. drop_nan ```True`` if NaN values are to be dropped after transformation. level_filter An optional :class:`dispel.processing.level.LevelFilter` to determine the levels to be transformed. If no filter is provided, all levels will be transformed. The ``level_filter`` also accepts :class:`str`, :class:`~dispel.data.core.LevelId`\ s and lists of either and passes them to a :class:`dispel.processing.level.LevelIdFilter` for convenience. """
[docs] def __init__( self, data_set_id: str, columns: Optional[List[str]] = None, drop_nan: bool = False, level_filter: Optional[LevelFilterType] = None, ): columns = columns or DEFAULT_COLUMNS def _transform_function(data: pd.DataFrame) -> pd.Series: res = euclidean_norm(data[columns]) if drop_nan: return res.dropna() return res definition = RawDataValueDefinition( "norm", f"euclidean_norm computed on {columns}" ) super().__init__( data_set_id, _transform_function, f"{data_set_id}_euclidean_norm", [definition], level_filter=level_filter, )
[docs] class AddGravityAndScale(TransformStep): """Add gravity to userAcceleration and scale to m/s^2. The step expects a unique data set id for `data_set_ids` pointing to a data frame containing both acceleration and gravity with a :class:`pandas.DatetimeIndex` index. """ definitions = [ RawDataValueDefinition(f"acceleration_{ax}", f"acceleration_{ax}") for ax in DEFAULT_COLUMNS ] @transformation def _transform(self, data) -> pd.DataFrame: acc = {} for i, ax in enumerate(DEFAULT_COLUMNS): acc[f"acceleration_{ax}"] = ( data[ACCELEROMETER_COLUMNS[i]] + data[GRAVITY_COLUMNS[i]] ) return pd.DataFrame(acc, index=data.index) * GRAVITY_CONSTANT
[docs] def get_new_data_set_id(self): """Overwrite new_data_set_id.""" return f"{self.data_set_ids}_g"
[docs] class TransformFindZeroCrossings(TransformStep): """Find zero crossings in the signal. To find the zeros, the function identifies the sign change in the signal by differentiating `data > 0`. Attributes ---------- column The column to be used to find the zero crossings in the data set Parameters ---------- column See :data:FindZeroCrossings.column`. """ column: str definitions: List[RawDataValueDefinition] = [ RawDataValueDefinition( "zero_crossings", "Zero-crossings of the signal.", data_type="float" ) ]
[docs] def __init__(self, *args, column: Optional[str] = None, **kwargs): super().__init__(*args, **kwargs) self.column = column or self.column
@transformation def _find_zero_crossings(self, data: pd.DataFrame) -> pd.DataFrame: return find_zero_crossings(data, self.column)