"""Core functionality for measure extraction based on continuous wavelet transform.
Part of this module is based on a modified version of GaitPy.
References
----------
.. [1] Czech M et al. (2019) GaitPy: An Open-Source Python Package for Gait Analysis
Using an Accelerometer on the Lower Back.
"""
import warnings
from enum import IntEnum
from typing import Iterable, List, Tuple
import numpy as np
import pandas as pd
import pywt
from scipy import integrate
from scipy.signal import find_peaks
from dispel.data.levels import Context, Level
from dispel.data.measures import MeasureValueDefinitionPrototype
from dispel.data.raw import DEFAULT_COLUMNS, RawDataValueDefinition
from dispel.data.validators import BETWEEN_MINUS_ONE_AND_ONE, GREATER_THAN_ZERO
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing import ProcessingStep
from dispel.processing.data_set import transformation
from dispel.processing.extract import (
    DEFAULT_AGGREGATIONS,
    DEFAULT_AGGREGATIONS_Q95,
    DEFAULT_AGGREGATIONS_Q95_CV,
    AggregateRawDataSetColumn,
    ExtractStep,
)
from dispel.processing.level import ProcessingStepGroup
from dispel.processing.transform import TransformStep
from dispel.providers.generic.preprocessing import Detrend, Resample, RotateFrame
from dispel.providers.generic.sensor import AddGravityAndScale
from dispel.providers.generic.tasks.gait.bout_strategy import BoutStrategyModality
from dispel.providers.generic.tasks.gait.core import (
    DEF_BOUT_ID,
    DetectStepsProcessingBase,
    DetectStepsWithoutBoutsBase,
    ExtractPowerBoutDivSteps,
    ExtractStepIntensityAll,
    ExtractStepPowerAll,
    ExtractStepRegularity,
    ExtractStrideRegularity,
    FootUsed,
    GaitBoutAggregateStep,
    GaitBoutExtractStep,
    StepEvent,
    power_bout_div_steps,
)
from dispel.signal.core import get_sampling_rate_idx
from dispel.signal.filter import butterworth_low_pass_filter
from dispel.signal.sensor import detrend_signal
INITIAL_CONTACT_PROMINENCE = 5
r"""Prominence of a vertical acceleration peak indicating a heel strike or
foot initial contact."""
FINAL_CONTACT_PROMINENCE = 10
r"""Prominence of a vertical acceleration peak indicating a toe off or
foot final contact."""
STEP_LENGTH_HEIGHT_RATIO = 0.41
r"""Average step length relative to subject height."""
SENSOR_HEIGHT_RATIO = 0.53
r"""Height of the sensor relative to subject height."""
DEFAULT_SUBJECT_HEIGHT = 1.7
r"""A default height that we will use to compute spatial measures until we
have access to the correct height."""
DEFAULT_SENSOR_HEIGHT = SENSOR_HEIGHT_RATIO * DEFAULT_SUBJECT_HEIGHT
r"""A default sensor height given sensor_height ratio and subject height."""
GAIT_CYCLE_FORWARD_IC = 2.25
r"""Maximum allowable time (seconds) between initial contact of same foot"""
LOADING_FORWARD_IC = GAIT_CYCLE_FORWARD_IC * 0.2
r"""Maximum time (seconds) for loading phase."""
STANCE_FORWARD_IC = (GAIT_CYCLE_FORWARD_IC / 2) + LOADING_FORWARD_IC
r"""Maximum time (seconds) for stance phase."""
MIN_SAMPLES_HEIGHT_CHANGES_COM = 15
r"""The minimum number of samples to compute changes of height of the center
of mass."""
MIN_FREQ_TO_FILTER = 40
r"""The minimum sampling frequency at which we start low pass filter before
computing the wavelets."""
IC_DEF = RawDataValueDefinition(
    id_="IC",
    name="Initial Contact",
    data_type="datetime64[ms]",
    description="Initial contact timestamp.",
)
FC_DEF = RawDataValueDefinition(
    id_="FC",
    name="Final Contact",
    data_type="datetime64[ms]",
    description="Final contact timestamp.",
)
FC_OPP_FOOT_DEF = RawDataValueDefinition(
    id_="FC_opp_foot",
    name="Final Contact of the opposite foot",
    data_type="datetime64[ms]",
    description="Final contact of the opposite foot to the one "
    "performing the initial contact timestamp.",
)
GAIT_CYCLE_DEF = RawDataValueDefinition(
    id_="Gait_Cycle",
    name="Gait Cycle",
    data_type="bool",
    description="Gait Cycle",
)
STEPS_DEF = RawDataValueDefinition(
    id_="steps",
    name="steps",
    description="Number of steps",
    data_type="int",
)
STRIDE_DUR_DEF = RawDataValueDefinition(
    id_="stride_dur",
    name="stride duration",
    description="Stride duration is defined as the time elapsed "
    "between the first contact of two consecutive "
    "footsteps of the same foot.",
    data_type="float64",
    unit="s",
    precision=3,
)
STRIDE_DUR_ASYM_DEF = RawDataValueDefinition(
    id_="stride_dur_asym",
    name="stride duration asymmetry",
    data_type="float64",
    description="Stride duration asymmetry is defined as the absolute "
    "difference of the stride duration between the right "
    "and left foot.",
    unit="s",
    precision=3,
)
STRIDE_DUR_ASYM_LN_DEF = RawDataValueDefinition(
    id_="stride_dur_asym_ln",
    name="stride duration asymmetry (ln)",
    data_type="float64",
    description="Stride duration asymmetry ln is defined as the natural "
    "logarithm of the ratio of the minimum duration to the "
    "maximum duration of two consecutive strides.",
    precision=3,
)
STEP_DUR_DEF = RawDataValueDefinition(
    id_="step_dur",
    name="step dur",
    data_type="float64",
    description="Step duration is the time elapsed between two "
    "consecutive footsteps. More specifically between two "
    "consecutive initial contact events.",
    unit="s",
    precision=3,
)
STEP_DUR_ASYM_DEF = RawDataValueDefinition(
    id_="step_dur_asym",
    name="step duration asymmetry",
    data_type="float64",
    description="Step duration asymmetry is defined as the absolute "
    "difference of the step duration between the right "
    "and left foot.",
    unit="s",
    precision=3,
)
STEP_DUR_ASYM_LN_DEF = RawDataValueDefinition(
    id_="step_dur_asym_ln",
    name="step duration asymmetry (ln)",
    data_type="float64",
    description="Step duration asymmetry ln is defined as the natural "
    "logarithm of the ratio of the minimum duration to the "
    "maximum duration of two consecutive steps.",
    precision=3,
)
CADENCE_DEF = RawDataValueDefinition(
    id_="cadence",
    name="cadence",
    data_type="float64",
    description="Cadence is defined as the instantaneous number of "
    "steps per minutes.",
    precision=3,
)
INIT_DOUBLE_SUPP_DEF = RawDataValueDefinition(
    id_="init_double_supp",
    name="initial double support",
    data_type="float64",
    description="Initial double support phase is the sub-phase "
    "between heel contact of the phase to contralateral "
    "foot-off.",
    unit="s",
    precision=3,
)
INIT_DOUBLE_SUPP_ASYM_DEF = RawDataValueDefinition(
    id_="init_double_supp_asym",
    name="initial double support asymmetry",
    data_type="float64",
    description="Initial double support asymmetry is defined as the "
    "absolute difference of the initial double support "
    "duration between the right and left foot.",
    unit="s",
    precision=3,
)
TERM_DOUBLE_SUPP_DEF = RawDataValueDefinition(
    id_="term_double_supp",
    name="terminal double support",
    data_type="float64",
    description="Terminal double support phase is the  sub-phase "
    "from contralateral foot-on to the toe-off.",
    unit="s",
    precision=3,
)
TERM_DOUBLE_SUPP_ASYM_DEF = RawDataValueDefinition(
    id_="term_double_supp_asym",
    name="terminal double support asymmetry",
    data_type="float64",
    description="Terminal double support asymmetry is defined "
    "as the absolute difference of the terminal"
    "double support duration between the right and"
    "left foot.",
    unit="s",
    precision=3,
)
DOUBLE_SUPP_DEF = RawDataValueDefinition(
    id_="double_supp",
    name="double support",
    data_type="float64",
    description="Double support phase is defined as the sum of"
    "the initial and terminal double support "
    "phase.",
    unit="s",
    precision=3,
)
DOUBLE_SUPP_ASYM_DEF = RawDataValueDefinition(
    id_="double_supp_asym",
    name="double support asymmetry",
    data_type="float64",
    description="Double support asymmetry is defined "
    "as the absolute difference of the double "
    "support duration between the right and left "
    "foot.",
    unit="s",
    precision=3,
)
SINGLE_LIMB_SUPP_DEF = RawDataValueDefinition(
    id_="single_limb_supp",
    name="single limb support",
    data_type="float64",
    description="Single limb support represents a phase in "
    "the gait cycle when the body weight is "
    "entirely supported by one limb, while the "
    "contra-lateral limb swings forward.",
    unit="s",
    precision=3,
)
SINGLE_LIMB_SUPP_ASYM_DEF = RawDataValueDefinition(
    id_="single_limb_supp_asym",
    name="single limb support asymmetry",
    data_type="float64",
    description="Single limb support asymmetry is defined "
    "as the absolute difference of the single "
    "limb support duration between the right "
    "and left foot.",
    unit="s",
    precision=3,
)
STANCE_DEF = RawDataValueDefinition(
    id_="stance",
    name="stance",
    data_type="float64",
    description="The stance phase of gait begins when the "
    "foot first touches the ground and ends when "
    "the same foot leaves the ground.",
    unit="s",
    precision=3,
)
STANCE_ASYM_DEF = RawDataValueDefinition(
    id_="stance_asym",
    name="stance asymmetry",
    data_type="float64",
    description="Stance asymmetry is defined as the absolute "
    "difference of the stance duration between "
    "the right and left foot.",
    unit="s",
    precision=3,
)
SWING_DEF = RawDataValueDefinition(
    id_="swing",
    name="swing",
    data_type="float64",
    description="The swing phase of gait begins when the foot "
    "first leaves the ground and ends when the "
    "same foot touches the ground again.",
    unit="s",
    precision=3,
)
SWING_ASYM_DEF = RawDataValueDefinition(
    id_="swing_asym",
    name="swing asymmetry",
    data_type="float64",
    description="Swing asymmetry is defined as the absolute "
    "difference of the swing duration between "
    "the right and left foot.",
    unit="s",
    precision=3,
)
STEP_LEN_DEF = RawDataValueDefinition(
    id_="step_len",
    name="step length",
    data_type="float64",
    description="Step length is the distance between the "
    "point of initial contact of one foot and the "
    "point of initial contact of the opposite "
    "foot.",
    unit="m",
    precision=3,
)
STEP_LEN_ASYM_DEF = RawDataValueDefinition(
    id_="step_len_asym",
    name="step length asymmetry",
    data_type="float64",
    description="Step length asymmetry is defined as the "
    "absolute difference of the step length "
    "between the right and left foot.",
    unit="m",
    precision=3,
)
STEP_LEN_ASYM_LN_DEF = RawDataValueDefinition(
    id_="step_len_asym_ln",
    name="step length asymmetry ln",
    data_type="float64",
    description="Step length asymmetry ln is defined as the natural logarithm"
    "of the ratio in the length between consecutive steps.",
    precision=3,
)
STRIDE_LEN_DEF = RawDataValueDefinition(
    id_="stride_len",
    name="stride length",
    data_type="float64",
    description="Stride length is the distance covered when "
    "you take two steps, one with each foot.",
    unit="m",
    precision=3,
)
STRIDE_LEN_ASYM_DEF = RawDataValueDefinition(
    id_="stride_len_asym",
    name="stride length asymmetry",
    data_type="float64",
    description="Stride length asymmetry is defined as the "
    "absolute difference of the stride length "
    "between the right and left foot.",
    unit="m",
    precision=3,
)
STRIDE_LEN_ASYM_LN_DEF = RawDataValueDefinition(
    id_="stride_len_asym_ln",
    name="stride length asymmetry ln",
    data_type="float64",
    description="Stride length asymmetry ln is defined as the natural"
    "logarithm of the ratio in the length between consecutive "
    "strides.",
    precision=3,
)
GAIT_SPEED_DEF = RawDataValueDefinition(
    id_="gait_speed",
    name="gait speed",
    data_type="float64",
    description="Gait speed is defined as the instantaneous "
    "walking speed in m/s. It is computed as the "
    "length of a stride divided by the duration.",
    unit="m/s",
    precision=3,
)
CWT_MEASURES_DESC_MAPPING = {
    str(STEPS_DEF.id): STEPS_DEF.description,
    str(STRIDE_DUR_DEF.id): STRIDE_DUR_DEF.description,
    str(STRIDE_DUR_ASYM_DEF.id): STRIDE_DUR_ASYM_DEF.description,
    str(STRIDE_DUR_ASYM_LN_DEF.id): STRIDE_DUR_ASYM_LN_DEF.description,
    str(STEP_DUR_DEF.id): STEP_DUR_DEF.description,
    str(STEP_DUR_ASYM_DEF.id): STEP_DUR_ASYM_DEF.description,
    str(STEP_DUR_ASYM_LN_DEF.id): STEP_DUR_ASYM_LN_DEF.description,
    str(CADENCE_DEF.id): CADENCE_DEF.description,
    str(INIT_DOUBLE_SUPP_DEF.id): INIT_DOUBLE_SUPP_DEF.description,
    str(INIT_DOUBLE_SUPP_ASYM_DEF.id): INIT_DOUBLE_SUPP_ASYM_DEF.description,
    str(TERM_DOUBLE_SUPP_DEF.id): TERM_DOUBLE_SUPP_DEF.description,
    str(TERM_DOUBLE_SUPP_ASYM_DEF.id): TERM_DOUBLE_SUPP_ASYM_DEF.description,
    str(DOUBLE_SUPP_DEF.id): DOUBLE_SUPP_DEF.description,
    str(DOUBLE_SUPP_ASYM_DEF.id): DOUBLE_SUPP_ASYM_DEF.description,
    str(SINGLE_LIMB_SUPP_DEF.id): SINGLE_LIMB_SUPP_DEF.description,
    str(SINGLE_LIMB_SUPP_ASYM_DEF.id): SINGLE_LIMB_SUPP_ASYM_DEF.description,
    str(STANCE_DEF.id): STANCE_DEF.description,
    str(STANCE_ASYM_DEF.id): STANCE_ASYM_DEF.description,
    str(SWING_DEF.id): SWING_DEF.description,
    str(SWING_ASYM_DEF.id): SWING_ASYM_DEF.description,
    str(STEP_LEN_DEF.id): STEP_LEN_DEF.description,
    str(STEP_LEN_ASYM_DEF.id): STEP_LEN_ASYM_DEF.description,
    str(STEP_LEN_ASYM_LN_DEF.id): STEP_LEN_ASYM_LN_DEF.description,
    str(STRIDE_LEN_DEF.id): STRIDE_LEN_DEF.description,
    str(STRIDE_LEN_ASYM_DEF.id): STRIDE_LEN_ASYM_DEF.description,
    str(STRIDE_LEN_ASYM_LN_DEF.id): STRIDE_LEN_ASYM_LN_DEF.description,
    str(GAIT_SPEED_DEF.id): GAIT_SPEED_DEF.description,
}
r"""Mapping measures to description to help fill the description of cwt
measures when aggregating columns."""
CWT_MEASURES_PRECISION_MAPPING = {
    str(STEPS_DEF.id): STEPS_DEF.precision,
    str(STRIDE_DUR_DEF.id): STRIDE_DUR_DEF.precision,
    str(STRIDE_DUR_ASYM_DEF.id): STRIDE_DUR_ASYM_DEF.precision,
    str(STRIDE_DUR_ASYM_LN_DEF.id): STRIDE_DUR_ASYM_LN_DEF.precision,
    str(STEP_DUR_DEF.id): STEP_DUR_DEF.precision,
    str(STEP_DUR_ASYM_DEF.id): STEP_DUR_ASYM_DEF.precision,
    str(STEP_DUR_ASYM_LN_DEF.id): STEP_DUR_ASYM_LN_DEF.precision,
    str(CADENCE_DEF.id): CADENCE_DEF.precision,
    str(INIT_DOUBLE_SUPP_DEF.id): INIT_DOUBLE_SUPP_DEF.precision,
    str(INIT_DOUBLE_SUPP_ASYM_DEF.id): INIT_DOUBLE_SUPP_ASYM_DEF.precision,
    str(TERM_DOUBLE_SUPP_DEF.id): TERM_DOUBLE_SUPP_DEF.precision,
    str(TERM_DOUBLE_SUPP_ASYM_DEF.id): TERM_DOUBLE_SUPP_ASYM_DEF.precision,
    str(DOUBLE_SUPP_DEF.id): DOUBLE_SUPP_DEF.precision,
    str(DOUBLE_SUPP_ASYM_DEF.id): DOUBLE_SUPP_ASYM_DEF.precision,
    str(SINGLE_LIMB_SUPP_DEF.id): SINGLE_LIMB_SUPP_DEF.precision,
    str(SINGLE_LIMB_SUPP_ASYM_DEF.id): SINGLE_LIMB_SUPP_ASYM_DEF.precision,
    str(STANCE_DEF.id): STANCE_DEF.precision,
    str(STANCE_ASYM_DEF.id): STANCE_ASYM_DEF.precision,
    str(SWING_DEF.id): SWING_DEF.precision,
    str(SWING_ASYM_DEF.id): SWING_ASYM_DEF.precision,
    str(STEP_LEN_DEF.id): STEP_LEN_DEF.precision,
    str(STEP_LEN_ASYM_DEF.id): STEP_LEN_ASYM_DEF.precision,
    str(STEP_LEN_ASYM_LN_DEF.id): STEP_LEN_ASYM_LN_DEF.precision,
    str(STRIDE_LEN_DEF.id): STRIDE_LEN_DEF.precision,
    str(STRIDE_LEN_ASYM_DEF.id): STRIDE_LEN_ASYM_DEF.precision,
    str(STRIDE_LEN_ASYM_LN_DEF.id): STRIDE_LEN_ASYM_LN_DEF.precision,
    str(GAIT_SPEED_DEF.id): GAIT_SPEED_DEF.precision,
}
r"""Mapping measures to precision to map the precision of cwt
measures when aggregating columns."""
CWT_MEASURES_DEF = [
    STEPS_DEF,
    STRIDE_DUR_DEF,
    STRIDE_DUR_ASYM_DEF,
    STRIDE_DUR_ASYM_LN_DEF,
    STEP_DUR_DEF,
    STEP_DUR_ASYM_DEF,
    STEP_DUR_ASYM_LN_DEF,
    CADENCE_DEF,
    INIT_DOUBLE_SUPP_DEF,
    INIT_DOUBLE_SUPP_ASYM_DEF,
    TERM_DOUBLE_SUPP_DEF,
    TERM_DOUBLE_SUPP_ASYM_DEF,
    DOUBLE_SUPP_DEF,
    DOUBLE_SUPP_ASYM_DEF,
    SINGLE_LIMB_SUPP_DEF,
    SINGLE_LIMB_SUPP_ASYM_DEF,
    STANCE_DEF,
    STANCE_ASYM_DEF,
    SWING_DEF,
    SWING_ASYM_DEF,
    STEP_LEN_DEF,
    STEP_LEN_ASYM_DEF,
    STEP_LEN_ASYM_LN_DEF,
    STRIDE_LEN_DEF,
    STRIDE_LEN_ASYM_DEF,
    STRIDE_LEN_ASYM_LN_DEF,
    GAIT_SPEED_DEF,
]
def _detect_peaks(y: Iterable, prominence: float) -> Iterable[int]:
    """
    Find peaks inside a signal based on peak properties.
    This function takes a 1-D array and finds all local maxima by
    simple comparison of neighboring values. Then a subset of these
    peaks are selected by specifying conditions on the prominence of the peak.
    Parameters
    ----------
    y
        A signal with peaks.
    prominence
        The prominence of a peak measures how much a peak stands out from the
        surrounding baseline of the signal and is defined as the vertical
        distance between the peak and its lowest contour line.
    Returns
    -------
    Iterable[int]
        Indices of peaks in `y` that satisfy the prominence conditions.
    """
    peaks, _ = find_peaks(y, prominence=prominence)
    return peaks
def _detrend_and_low_pass(data: pd.Series, sampling_rate: float) -> pd.Series:
    """
    Detrend and eventually filter the data.
    Before computing the wavelet, in cwt library the authors decide
    to always detrend the vertical acceleration signal and eventually
    low-pass filter if the sampling frequency is greater than 40 Hz.
    Parameters
    ----------
    data
        Any pandas series regularly sampled.
    sampling_rate
        The sampling rate of the time series.
    Returns
    -------
    pandas.Series
        The same series detrended and eventually low-pass filtered if its
        sampling frequency is greater than 40 Hz.
    """
    # Detrend data
    detrended_data = detrend_signal(data)
    # Low pass filter if greater than 40 hz
    if sampling_rate >= MIN_FREQ_TO_FILTER:
        return butterworth_low_pass_filter(
            data=detrended_data, cutoff=20, order=4, freq=sampling_rate, zero_phase=True
        )
    return detrended_data
def _continuous_wavelet_transform(
    data: pd.Series, ic_prom: float, fc_prom: float
) -> Tuple[Iterable, Iterable, Iterable, Iterable]:
    """
    Find the indices of the Initial contact or Final contact of the foot.
    Parameters
    ----------
    data
        The vertical acceleration
    ic_prom
        Initial contact prominence. For more info, see _detect_peaks prominence
        definition.
    fc_prom
        Final contact prominence. For more info, see _detect_peaks prominence
        definition.
    Returns
    -------
    Tuple[Iterable, Iterable, Iterable, Iterable]:
        The indices of the initial contacts and final contacts in `y_accel` and
        the wavelet transforms.
    """
    # Get sampling rate
    sampling_rate = get_sampling_rate_idx(data)
    # CWT wavelet parameters from GaitPy
    # The smaller the scale factor, the more “compressed” the wavelet.
    # Conversely, the larger the scale, the more stretched the wavelet.
    scale_cwt_ic = float(sampling_rate) / 5.0
    scale_cwt_fc = float(sampling_rate) / 6.0
    wavelet_name = "gaus1"
    # Detrend and low pass
    filtered_data = _detrend_and_low_pass(data, sampling_rate)
    # Numerical integration of vertical acceleration
    integrated_data = integrate.cumtrapz(-filtered_data)
    # Gaussian continuous wavelet transform
    cwt_ic, _ = pywt.cwt(
        data=integrated_data, scales=scale_cwt_ic, wavelet=wavelet_name
    )
    differentiated_data = cwt_ic[0]
    # initial contact (heel strike) peak detection
    ic_peaks = _detect_peaks(pd.Series(-differentiated_data), ic_prom)
    # Gaussian continuous wavelet transform
    cwt_fc, _ = pywt.cwt(
        data=-differentiated_data, scales=scale_cwt_fc, wavelet=wavelet_name
    )
    re_differentiated_data = cwt_fc[0]
    # final contact (toe off) peak detection
    fc_peaks = _detect_peaks(pd.Series(re_differentiated_data), fc_prom)
    return ic_peaks, fc_peaks, differentiated_data, re_differentiated_data
[docs]
def compute_asymmetry(data: pd.Series) -> pd.Series:
    """Compute absolute asymmetry.
    Parameters
    ----------
    data
        The cyclic property dataset to be analyzed.
    Returns
    -------
    pandas.Series
        A pandas Series containing the absolute difference between consecutive
        items of the dataset.
    """
    return abs(data.shift(-1) - data) 
[docs]
def compute_ln_asymmetry(data: pd.Series) -> pd.Series:
    """Compute natural logarithm of asymmetry ratio.
    Parameters
    ----------
    data
        The cyclic property dataset to be analyzed.
    Returns
    -------
    pandas.Series
        A pandas Series containing the natural logarithm of the ratio of the
        minimum and maximum cyclic property, denoting normalized asymmetry.
    """
    shifted = pd.concat([data.shift(-1), data], axis=1)
    min_ = shifted.min(axis=1)
    max_ = shifted.max(axis=1)
    return pd.Series(np.log(min_ / max_), index=data.index) 
[docs]
def cwt_step_detection(data: pd.DataFrame) -> pd.DataFrame:
    """
    Detect Initial and Final contact of the foot in vertical acceleration.
    This method is based on CWT algorithm which consists in using wavelets
    transforms to decompose the vertical acceleration signal and the velocity
    to detect initial and final contacts. It is then formatted to the step
    data set format.
    Parameters
    ----------
    data
        The vertical acceleration.
    Returns
    -------
    pandas.DataFrame
        A data frame of the different gait event annotated under the step
        data set format with three columns:`event`, `foot` and `timestamp`.
    """
    ic_peaks_grav, fc_peaks_grav, *_ = _continuous_wavelet_transform(
        data["acceleration_x"],
        ic_prom=INITIAL_CONTACT_PROMINENCE,
        fc_prom=FINAL_CONTACT_PROMINENCE,
    )
    df_ic = pd.DataFrame(
        {
            "event": StepEvent.INITIAL_CONTACT,
            "foot": FootUsed.UNKNOWN,
            "timestamp": data.iloc[ic_peaks_grav].index,
        }
    ).set_index(keys="timestamp")
    df_fc = pd.DataFrame(
        {
            "event": StepEvent.FOOT_OFF,
            "foot": FootUsed.UNKNOWN,
            "timestamp": data.iloc[fc_peaks_grav].index,
        }
    ).set_index(keys="timestamp")
    return pd.concat([df_ic, df_fc]).sort_index() 
[docs]
class CWTDetectStepsWithoutBout(DetectStepsWithoutBoutsBase):
    """
    Detect Initial and Final contact of the foot without walking bouts.
    For more information about the step detection methodology, see:
    :class:`CWTDetectSteps`.
    """
    new_data_set_id = "cwt"
    transform_function = cwt_step_detection 
[docs]
class CWTDetectSteps(DetectStepsProcessingBase):
    """
    Detect Initial and Final contact of the foot in vertical acceleration.
    This method is based on CWT algorithm which consists in using wavelets
    transforms to decompose the vertical acceleration signal and the velocity
    to detect initial and final contacts. It is ran on each of the detected
    walking bout and then aggregated in a common data frame with an extra
    columns named `bout_id` indicating the id of the walking bout.
    It expects two data set id, the first one linking to the acceleration
    dataset containing the acceleration with gravity rotated resampled and
    detrended. The second one linking to the walking bout data set.
    """
    new_data_set_id = "cwt_with_walking_bouts"
[docs]
    @staticmethod
    def step_detection_method(data: pd.DataFrame) -> pd.DataFrame:
        """Define and declare the step detection as a static method."""
        return cwt_step_detection(data) 
 
def _optimize_cwt_step_dataset(cwt_step: pd.DataFrame) -> pd.DataFrame:
    """
    Apply physiological constraints to filter impossible gait events.
    Several optimization steps are applied to the cwt step dataset. The
    first optimization implies constraint on times. Maximum allowable time are
    defined for: loading phases (each initial contact requires 1 forward final
    contact within 0.225 seconds) and stance phases (each initial contact
    requires at least 2 forward final contact's within 1.35 second) and entire
    gait cycles (each initial contact requires at least 2 initial contacts
    within 2.25 seconds after).
    Parameters
    ----------
    cwt_step
        The step dataset computed applying CWT wavelet transforms and
        peak detection.
    Returns
    -------
    pandas.DataFrame
        A data frame with optimized gait cycle.
    """
    ic_times = cwt_step.loc[cwt_step["event"] == StepEvent.INITIAL_CONTACT].index
    fc_times = cwt_step.loc[cwt_step["event"] == StepEvent.FOOT_OFF].index
    initial_contacts = []
    final_contacts = []
    final_contacts_opp_foot = []
    loading_forward_max_vect = ic_times + pd.Timedelta(
        LOADING_FORWARD_IC * 1000.0, unit="ms"
    )
    stance_forward_max_vect = ic_times + pd.Timedelta(
        STANCE_FORWARD_IC * 1000.0, unit="ms"
    )
    for i, current_ic in enumerate(ic_times):
        loading_forward_max = loading_forward_max_vect[i]
        stance_forward_max = stance_forward_max_vect[i]
        # Loading Response Constraint
        # Each IC requires 1 forward FC within 0.225 seconds
        # (opposite foot toe off)
        loading_forward_fcs = fc_times[
            (fc_times > current_ic) & (fc_times < loading_forward_max)
        ]
        # Stance Phase Constraint
        # Each IC requires at least 2 forward FC's within 1.35 second
        # (2nd FC is current IC's matching FC)
        stance_forward_fcs = fc_times[
            (fc_times > current_ic) & (fc_times < stance_forward_max)
        ]
        if len(loading_forward_fcs) == 1 and len(stance_forward_fcs) >= 2:
            initial_contacts.append(current_ic)
            final_contacts.append(stance_forward_fcs[1])
            final_contacts_opp_foot.append(stance_forward_fcs[0])
    res = pd.DataFrame(
        {
            "IC": initial_contacts,
            "FC": final_contacts,
            "FC_opp_foot": final_contacts_opp_foot,
        }
    ).reset_index(drop=True)
    if len(res) == 0:
        res["Gait_Cycle"] = False  # pylint: disable=E1137
        return res
    # Gait Cycles Constraint
    # Each ic requires at least 2 ics within 2.25 seconds after
    interval_1 = res.IC.diff(-1).dt.total_seconds().abs()
    interval_2 = res.IC.diff(-2).dt.total_seconds().abs()
    full_gait_cycle = (interval_1 <= GAIT_CYCLE_FORWARD_IC) & (
        interval_2 <= GAIT_CYCLE_FORWARD_IC
    )
    res["Gait_Cycle"] = full_gait_cycle  # pylint: disable=E1137
    return res
[docs]
class OptimizeCwtStepDataset(TransformStep):
    """
    Apply physiological constraints to filter impossible gait events.
    It expects a unique data set id, linking to the cwt step data set.
    """
    definitions = [IC_DEF, FC_DEF, FC_OPP_FOOT_DEF, GAIT_CYCLE_DEF, DEF_BOUT_ID]
[docs]
    def get_new_data_set_id(self) -> str:
        """Overwrite new_data_set_id."""
        return f"{self.data_set_ids}_optimized" 
    @transformation
    def _transform_function(self, data: pd.DataFrame) -> pd.DataFrame:
        """Optimize cwt dataset for each walking bout."""
        bout_ids = data["bout_id"].unique()
        list_df = []
        for bout_id in bout_ids:
            res = _optimize_cwt_step_dataset(data[data.bout_id == bout_id])
            res["bout_id"] = bout_id  # pylint: disable=E1137
            if len(res) > 0:
                list_df.append(res)
        if len(list_df) > 0:
            return pd.concat(list_df).reset_index(drop=True).sort_values("IC")
        return pd.DataFrame(
            columns=["IC", "FC", "FC_opp_foot", "Gait_Cycle", "bout_id"]
        ) 
[docs]
class OptimizeCwtStepDatasetWithoutWalkingBout(TransformStep):
    """
    Apply physiological constraints to filter impossible gait events.
    It expects a unique data set id, linking to the cwt step data set
    without walking bouts.
    """
    definitions = [IC_DEF, FC_DEF, FC_OPP_FOOT_DEF, GAIT_CYCLE_DEF]
[docs]
    def get_new_data_set_id(self) -> str:
        """Overwrite new_data_set_id."""
        return f"{self.data_set_ids}_optimized" 
    transform_function = _optimize_cwt_step_dataset 
def _height_change_com(
    optimized_gait_ic: pd.Series, vertical_acc: pd.Series
) -> pd.Series:
    """Compute height change of the center of mass when walking.
    Parameters
    ----------
    optimized_gait_ic
        A series of the data frame with optimized gait initial contacts.
    vertical_acc
        A series of the vertical acceleration.
    Returns
    -------
    pandas.Series
        A pandas series of the center of mass height changes.
    """
    # Get the sampling rate
    sampling_rate = get_sampling_rate_idx(vertical_acc)
    # Initialize the height changes of the center of mass
    height_change_center_of_mass = [None] * len(optimized_gait_ic)
    # Scale the minimum number of samples to compute the height change of the
    # Center of Mass with the sampling frequency
    min_samples_hccom = MIN_SAMPLES_HEIGHT_CHANGES_COM * sampling_rate / 50
    # Changes in Height of the Center of Mass
    for i in range(0, len(optimized_gait_ic) - 1):
        ic_index = optimized_gait_ic.iloc[i]
        post_ic_index = optimized_gait_ic.iloc[i + 1]
        step_raw = vertical_acc[ic_index:post_ic_index]
        if len(step_raw) <= min_samples_hccom:
            continue
        # Filter and low pass
        step_filtered = _detrend_and_low_pass(step_raw, sampling_rate)
        # Compute vertical displacement
        step_integrate_1 = integrate.cumtrapz(step_filtered) / sampling_rate
        step_integrate_2 = integrate.cumtrapz(step_integrate_1) / sampling_rate
        # Keep the max
        height_change_center_of_mass[i] = max(step_integrate_2) - min(step_integrate_2)
    return pd.Series(height_change_center_of_mass)
[docs]
class HeightChangeCOM(TransformStep):
    """Compute the height changes of the center of mass.
    It expects two data set ids, first: the data set id mapping the output of
    FormatAccelerationCWT and second an id linking to the acceleration
    dataset containing the acceleration with gravity rotated resampled and
    detrended.
    """
    definitions = [
        RawDataValueDefinition(
            id_="height_com",
            name="height change center of mass",
            unit="m",
            data_type="float64",
        )
    ]
    @transformation
    def _transform_function(self, optimized_gait, vertical_acc):
        return _height_change_com(optimized_gait["IC"], vertical_acc["acceleration_x"]) 
[docs]
def compute_stride_duration(data: pd.DataFrame) -> pd.DataFrame:
    """Compute stride duration and stride duration asymmetry.
    Stride duration is defined as the time elapsed between the first contact
    of two consecutive footsteps of the same foot.
    The asymmetry is defined as the difference of the measure between the right
    and left foot. It is given as an absolute difference as we don't know
    which foot is which.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        A data frame with stride duration and stride duration asymmetry for
        each valid gait cycle.
    """
    # gait cycle duration
    data.loc[data.Gait_Cycle, "stride_dur"] = (
        data.IC.shift(-2) - data.IC
    ).dt.total_seconds()
    # asymmetry
    data.loc[data.Gait_Cycle, "stride_dur_asym"] = compute_asymmetry(data["stride_dur"])
    # asymmetry ln
    data.loc[data.Gait_Cycle, "stride_dur_asym_ln"] = compute_ln_asymmetry(
        data["stride_dur"]
    )
    return data 
[docs]
def compute_step_duration_and_cadence(data: pd.DataFrame) -> pd.DataFrame:
    """Compute step duration, step duration asymmetry and cadence.
    Step duration is the time elapsed between two consecutive footsteps. More
    specifically between two consecutive initial contact events.
    The asymmetry is defined as the difference of the measure
    between the right and left foot. It is given as an absolute difference as
    we don't know which foot is which.
    Cadence is defined as the instantaneous number of steps per minutes.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        A data frame with step duration, step duration asymmetry and cadence
        for each valid gait cycle.
    """
    # step duration
    data.loc[data.Gait_Cycle, "step_dur"] = (
        data.IC.shift(-1) - data.IC
    ).dt.total_seconds()
    # asymmetry
    data.loc[data.Gait_Cycle, "step_dur_asym"] = compute_asymmetry(data["step_dur"])
    # asymmetry ln
    data.loc[data.Gait_Cycle, "step_dur_asym_ln"] = compute_ln_asymmetry(
        data["step_dur"]
    )
    # cadence
    data.loc[data.Gait_Cycle, "cadence"] = 60 / data.step_dur
    return data 
[docs]
def compute_stance(data: pd.DataFrame) -> pd.DataFrame:
    """Compute stance and stance asymmetry.
    The stance phase of gait begins when the foot first touches the ground and
    ends when the same foot leaves the ground. The stance phase makes up
    approximately 60% of the gait cycle. It can also be seen as the time
    elapsed between the final contact and initial contact.
    The asymmetry is defined as the difference of the measure
    between the right and left foot. It is given as an absolute difference as
    we don't know which foot is which.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with stride duration and stride duration asymmetry
        added for each valid gait cycle.
    """
    # stance
    data.loc[data.Gait_Cycle, "stance"] = (data.FC - data.IC).dt.total_seconds()
    # asymmetry
    data.loc[data.Gait_Cycle, "stance_asym"] = compute_asymmetry(data["stance"])
    return data 
[docs]
def compute_initial_double_support(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute initial double support and initial double support asymmetry.
    Initial double support phase is the sub-phase between heel contact of the
    phase to contralateral foot-off. This phase makes up approximately 14-20%
    of the stance phase.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with initial double support and initial double support
        asymmetry added for each valid gait cycle.
    """
    # Initial double support
    data.loc[data.Gait_Cycle, "init_double_supp"] = (
        data.FC_opp_foot - data.IC
    ).dt.total_seconds()
    # Asymmetry
    data.loc[data.Gait_Cycle, "init_double_supp_asym"] = compute_asymmetry(
        data["init_double_supp"]
    )
    return data 
[docs]
def compute_terminal_double_support(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute terminal double support and terminal double support asymmetry.
    Terminal double support phase is the sub-phase from contralateral foot-on
    to the toe-off. This phase makes up approximately 14-20% of the stance
    phase.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with terminal double support and terminal double
        support asymmetry added for each valid gait cycle.
    """
    # terminal double support
    data.loc[data.Gait_Cycle, "term_double_supp"] = (
        data.FC - data.IC.shift(-1)
    ).dt.total_seconds()
    # asymmetry
    data.loc[data.Gait_Cycle, "term_double_supp_asym"] = compute_asymmetry(
        data["term_double_supp"]
    )
    return data 
[docs]
def compute_double_support(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute double support and double support asymmetry.
    Double support phase is defined as the sum of the initial and terminal
    double support phase. This makes up approximately 28-40% of the stance
    phase.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with double support and double support asymmetry added
        for each valid gait cycle.
    """
    # double support
    data.loc[data.Gait_Cycle, "double_supp"] = (
        data.init_double_supp + data.term_double_supp
    )
    # asymmetry
    data.loc[data.Gait_Cycle, "double_supp_asym"] = compute_asymmetry(
        data["double_supp"]
    )
    return data 
[docs]
def compute_single_limb_support(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute single limb support and single limb support asymmetry.
    Single limb support represents a phase in the gait cycle when the body
    weight is entirely supported by one limb, while the contra-lateral limb
    swings forward.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with single limb support and single limb support
        asymmetry added for each valid gait cycle.
    """
    # single limb support
    data.loc[data.Gait_Cycle, "single_limb_supp"] = (
        data.IC.shift(-1) - data.FC.shift(1)
    ).dt.total_seconds()
    # asymmetry
    data.loc[data.Gait_Cycle, "single_limb_supp_asym"] = compute_asymmetry(
        data["single_limb_supp"]
    )
    return data 
[docs]
def compute_swing(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute swing and swing asymmetry.
    The swing phase of gait begins when the foot first leaves the ground and
    ends when the same foot touches the ground again. The swing phase makes up
    40% of the gait cycle.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with swing and swing asymmetry added for each valid
        gait cycle.
    """
    data.loc[data.Gait_Cycle, "swing"] = (
        data.IC.shift(-2) - data.FC
    ).dt.total_seconds()
    # asymmetry
    data.loc[data.Gait_Cycle, "swing_asym"] = compute_asymmetry(data["swing"])
    return data 
[docs]
def compute_step_length(
    data: pd.DataFrame, height_changes_com: pd.Series, sensor_height: float
) -> pd.DataFrame:
    """Compute step length and step length asymmetry.
    Step length is the distance between the point of initial contact of one
    foot and the point of initial contact of the opposite foot.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    height_changes_com
        A panda series with the changes in height of the center of mass.
    sensor_height
        The height of the smartphone sensor in meters when located at the
        lumbar position.
    Returns
    -------
    pandas.DataFrame
        Input data frame with step length and step length asymmetry added for
        each valid gait cycle.
    """
    try:
        res = 2 * sensor_height * height_changes_com - height_changes_com**2
        # if negative set to nan to prevent errors in sqrt computation
        # this typically occurs due to drift due to long intervals introduced
        # by steps belonging to different walking bouts
        res[res < 0] = np.nan
        step_length = 2 * (np.sqrt(res))
    except TypeError:
        step_length = np.nan
    data.loc[data.Gait_Cycle, "step_len"] = step_length
    # asymmetry
    data.loc[data.Gait_Cycle, "step_len_asym"] = compute_asymmetry(data["step_len"])
    # asymmetry ln
    data.loc[data.Gait_Cycle, "step_len_asym_ln"] = compute_ln_asymmetry(
        data["step_len"]
    )
    return data 
[docs]
def compute_stride_length(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute stride length and stride length asymmetry.
    Stride length is the distance covered when you take two steps,
    one with each foot.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with stride length and stride length asymmetry added
        for each valid gait cycle.
    """
    # stride length
    data.loc[data.Gait_Cycle, "stride_len"] = data.step_len.shift(-1) + data.step_len
    # asymmetry
    data.loc[data.Gait_Cycle, "stride_len_asym"] = compute_asymmetry(data["stride_len"])
    # asymmetry ln
    data.loc[data.Gait_Cycle, "stride_len_asym_ln"] = compute_ln_asymmetry(
        data["stride_len"]
    )
    return data 
[docs]
def compute_gait_speed(data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute gait speed.
    Gait speed is defined as the instantaneous walking speed in m/s. It is
    computed as the length of a stride divided by the duration.
    Parameters
    ----------
    data
        The cwt_optimized with step events output of the TransformStep
        OptimizeCwtStepDataset.
    Returns
    -------
    pandas.DataFrame
        Input data frame with step velocity added for each valid gait cycle.
    """
    data.loc[data.Gait_Cycle, "gait_speed"] = data.stride_len / data.stride_dur
    return data 
def _cwt_measure_extraction(
    data: pd.DataFrame, height_changes_com: pd.DataFrame, sensor_height: float
) -> pd.DataFrame:
    """
    Compute several temporal and spatial measures for each gait cycle.
    Parameters
    ----------
    data
        The data frame of the cwt step dataset after optimization, with
        step annotated with IC, FC, FC_opp_foot and GaitCycle.
    height_changes_com
        A data frame with the changes in height of the center of mass.
    sensor_height
        The height of the smartphone sensor in meters when located at the
        lumbar position.
    Returns
    -------
    pandas.DataFrame
        The optimized gait steps data frame with temporal and spatial measures
        computed for each gait cycle.
    """
    def _compute_cwt_measures(
        data: pd.DataFrame, height_changes_com: pd.Series
    ) -> pd.DataFrame:
        """Compute all the cwt measures."""
        # work on a copy of the dataframe
        res = data.copy()
        # steps
        res["steps"] = len(res)
        # gait cycle duration measures
        res = compute_stride_duration(res)
        # step time and cadence
        res = compute_step_duration_and_cadence(res)
        # initial double support
        res = compute_initial_double_support(res)
        # terminal double support
        res = compute_terminal_double_support(res)
        # double support
        res = compute_double_support(res)
        # single limb support
        res = compute_single_limb_support(res)
        # stance
        res = compute_stance(res)
        # swing
        res = compute_swing(res)
        # step length
        res = compute_step_length(res, height_changes_com, sensor_height)
        # stride length
        res = compute_stride_length(res)
        # step velocity
        res = compute_gait_speed(res)
        return res
    if "bout_id" not in data.columns:
        measures = _compute_cwt_measures(data, height_changes_com["height_com"])
        return measures.set_index("FC", drop=False)
    list_df_measures = []
    for bout_id in data.bout_id.unique():
        # Filter on bout
        bout_data = data[data.bout_id == bout_id]
        bout_data = _compute_cwt_measures(
            bout_data, height_changes_com.loc[data.bout_id == bout_id, "height_com"]
        )
        # append to the list of df_measures
        list_df_measures.append(bout_data)
    if len(list_df_measures) > 0:
        return pd.concat(list_df_measures).set_index("FC", drop=False)
    column_names = ["IC", "FC", "FC_opp_foot", "Gait_Cycle", "bout_id"]
    column_names += list(CWT_MEASURES_DESC_MAPPING)
    return pd.DataFrame(columns=column_names)
[docs]
def get_subject_height(context: Context) -> float:
    """Get subject height from context if available else return default."""
    if "subject_height" in context:
        return context["subject_height"].value
    warnings.warn(
        "Subject height is not available in context "
        "using DEFAULT_SENSOR_HEIGHT: "
        f"{DEFAULT_SUBJECT_HEIGHT}."
    )
    return DEFAULT_SUBJECT_HEIGHT 
[docs]
def get_sensor_height(context: Context) -> float:
    """Get sensor height from context if available else return default."""
    if "subject_height" in context:
        return SENSOR_HEIGHT_RATIO * context["subject_height"].value
    warnings.warn(
        "Subject height is not available in context to compute "
        "sensor_height: using DEFAULT_SENSOR_HEIGHT: "
        f"{DEFAULT_SENSOR_HEIGHT}."
    )
    return DEFAULT_SENSOR_HEIGHT 
[docs]
class AggregateCWTMeasure(GaitBoutAggregateStep):
    """An Aggregation of cwt measures on walking bouts."""
[docs]
    def __init__(self, data_set_id: str, column_id: str, **kwargs):
        # type: ignore
        description = (
            f"The {{aggregation}} of {column_id} "
            "with the bout strategy {bout_strategy_repr}. "
            f"{CWT_MEASURES_DESC_MAPPING[column_id]} ."
        )
        definition = MeasureValueDefinitionPrototype(
            measure_name=AV(column_id.replace("_", " "), column_id),
            data_type="float64",
            description=description,
            precision=CWT_MEASURES_PRECISION_MAPPING[column_id],
        )
        super().__init__(
            ["walking_placement_no_turn_bouts", data_set_id],
            column_id,
            DEFAULT_AGGREGATIONS_Q95_CV,
            definition,
            **kwargs,
        ) 
 
[docs]
class AggregateCWTMeasureWithoutBout(AggregateRawDataSetColumn):
    """An Aggregation of cwt measures not on walking bouts."""
[docs]
    def __init__(self, data_set_id: str, column_id: str, **kwargs):
        description = (
            f"The {{aggregation}} of {column_id}."
            f"{CWT_MEASURES_DESC_MAPPING[column_id]} ."
        )
        definition = MeasureValueDefinitionPrototype(
            measure_name=AV(column_id.replace("_", " "), column_id),
            data_type="float64",
            description=description,
            precision=CWT_MEASURES_PRECISION_MAPPING[column_id],
        )
        super().__init__(
            data_set_id, column_id, DEFAULT_AGGREGATIONS_Q95_CV, definition, **kwargs
        ) 
 
[docs]
class CWTCountSteps(GaitBoutExtractStep):
    """Extract the total number of steps counted with cwt."""
[docs]
    def __init__(self, data_set_id, **kwargs):
        definition = MeasureValueDefinitionPrototype(
            measure_name=AV("step count", "sc"),
            data_type="int16",
            validator=GREATER_THAN_ZERO,
            description="The number of steps detected with cwt algorithm "
            "with the bout strategy {bout_strategy_repr}.",
        )
        super().__init__(
            ["walking_placement_no_turn_bouts", data_set_id], len, definition, **kwargs
        ) 
 
[docs]
class CWTPowerBoutDivSteps(ExtractPowerBoutDivSteps):
    """Extract step power from CWT with walking bouts."""
[docs]
    def __init__(self, **kwargs):
        data_set_ids = [
            "walking_placement_no_turn_bouts",
            "vertical_acceleration",
            "cwt_with_walking_bouts",
        ]
        super().__init__(data_set_ids=data_set_ids, **kwargs) 
 
[docs]
class CWTPowerBoutDivStepsWithoutBout(ExtractStep):
    """Extract step power with CWT dataset without walking bouts."""
[docs]
    def __init__(self, **kwargs):
        data_set_ids = ["vertical_acceleration", "cwt"]
        definition = MeasureValueDefinitionPrototype(
            measure_name=AV("step power", "sp"),
            data_type="int16",
            validator=GREATER_THAN_ZERO,
            description="The integral of the centered acceleration magnitude "
            "between the first and last step divided by the "
            "number of steps computed with the CWT algorithm.",
        )
        super().__init__(
            data_set_ids=data_set_ids,
            transform_function=power_bout_div_steps,
            definition=definition,
            **kwargs,
        ) 
 
[docs]
class CWTStepCountWithoutBout(ExtractStep):
    """Extract step count with CWT dataset without walking bouts."""
[docs]
    def __init__(self, **kwargs):
        data_set_ids = "cwt_measures"
        description = (
            "The number of steps detected with cwt"
            " algorithm not using walking bouts."
        )
        definition = MeasureValueDefinitionPrototype(
            measure_name=AV("step count", "sc"),
            data_type="int16",
            validator=GREATER_THAN_ZERO,
            description=description,
        )
        super().__init__(
            data_set_ids=data_set_ids,
            transform_function=len,
            definition=definition,
            **kwargs,
        ) 
 
[docs]
class AggregateAllCWTMeasures(ProcessingStepGroup):
    """A group of gait processing steps for cwt measures on walking bouts."""
[docs]
    def __init__(self, data_set_id: str, **kwargs):
        measure_ids = [
            "stride_dur",
            "stride_dur_asym",
            "step_dur",
            "step_dur_asym",
            "stride_dur_asym_ln",
            "step_dur_asym_ln",
            "cadence",
            "init_double_supp",
            "init_double_supp_asym",
            "term_double_supp",
            "term_double_supp_asym",
            "double_supp",
            "double_supp_asym",
            "single_limb_supp",
            "single_limb_supp_asym",
            "stance",
            "stance_asym",
            "swing",
            "swing_asym",
            "step_len",
            "step_len_asym",
            "stride_len",
            "stride_len_asym",
            "gait_speed",
            "stride_len_asym_ln",
            "step_len_asym_ln",
        ]
        steps = [
            AggregateCWTMeasure(data_set_id, measure_id, **kwargs)
            for measure_id in measure_ids
        ]
        super().__init__(steps=steps, **kwargs) 
 
[docs]
class AggregateAllCWTMeasuresWithoutBout(ProcessingStepGroup):
    """Group of gait processing steps for cwt measures not on walking bout."""
[docs]
    def __init__(self, data_set_id: str, **kwargs):
        measure_ids = [
            "stride_dur",
            "stride_dur_asym",
            "step_dur",
            "step_dur_asym",
            "stride_dur_asym_ln",
            "step_dur_asym_ln",
            "cadence",
            "init_double_supp",
            "init_double_supp_asym",
            "term_double_supp",
            "term_double_supp_asym",
            "double_supp",
            "double_supp_asym",
            "single_limb_supp",
            "single_limb_supp_asym",
            "stance",
            "stance_asym",
            "swing",
            "swing_asym",
            "step_len",
            "step_len_asym",
            "stride_len",
            "stride_len_asym",
            "gait_speed",
            "stride_len_asym_ln",
            "step_len_asym_ln",
        ]
        steps = [
            AggregateCWTMeasureWithoutBout(data_set_id, measure_id, **kwargs)
            for measure_id in measure_ids
        ]
        super().__init__(steps=steps, **kwargs) 
 
[docs]
class CWTMeasures(ProcessingStepGroup):
    """Extract Del Din measures based on CWT Steps and a bout strategy."""
[docs]
    def __init__(self, bout_strategy: BoutStrategyModality, **kwargs):
        data_set_id = "cwt_measures_with_walking_bouts"
        steps: List[ProcessingStep] = [
            AggregateAllCWTMeasures(
                data_set_id=data_set_id, bout_strategy=bout_strategy.bout_cls
            ),
            CWTCountSteps(
                data_set_id=data_set_id, bout_strategy=bout_strategy.bout_cls
            ),
            CWTStepPower(bout_strategy=bout_strategy.bout_cls),
            CWTStepIntensity(bout_strategy=bout_strategy.bout_cls),
            CWTStepRegularity(bout_strategy=bout_strategy.bout_cls),
            CWTStrideRegularity(bout_strategy=bout_strategy.bout_cls),
        ]
        super().__init__(steps, **kwargs) 
 
[docs]
class CWTMeasuresWithoutBout(ProcessingStepGroup):
    """Extract Del Din measures based on CWT Steps and a bout strategy."""
[docs]
    def __init__(self, **kwargs):
        data_set_id = "cwt_measures"
        steps: List[ProcessingStep] = [
            AggregateAllCWTMeasuresWithoutBout(data_set_id=data_set_id),
            CWTStepCountWithoutBout(),
            CWTStepPowerWithoutBout(),
            CWTStepIntensityWithoutBout(),
            CWTStepRegularityWithoutBout(),
            CWTStrideRegularityWithoutBout(),
        ]
        super().__init__(steps, **kwargs) 
 
[docs]
class CWTStepPowerWithoutBout(AggregateRawDataSetColumn):
    """Extract step power without walking bout."""
    data_set_ids = "cwt_optimized_step_vigor"
    column_id = "step_power"
    aggregations = DEFAULT_AGGREGATIONS_Q95
    definition = MeasureValueDefinitionPrototype(
        measure_name=AV("step power", "sp"),
        data_type="float64",
        unit="m^2/s^3",
        validator=GREATER_THAN_ZERO,
        description="The {aggregation} step power across detected steps.",
    ) 
[docs]
class CWTStepPower(ExtractStepPowerAll):
    """Extract step power for cwt."""
[docs]
    def __init__(self, **kwargs):
        super().__init__(
            data_set_ids=[
                "walking_placement_no_turn_bouts",
                "cwt_with_walking_bouts_optimized_step_vigor",
            ],
            **kwargs,
        ) 
 
[docs]
class CWTStepIntensityWithoutBout(AggregateRawDataSetColumn):
    """Extract step intensity without walking bout."""
    data_set_ids = "cwt_optimized_step_vigor"
    column_id = "step_intensity"
    aggregations = DEFAULT_AGGREGATIONS
    definition = MeasureValueDefinitionPrototype(
        measure_name=AV("step intensity", "si"),
        data_type="float64",
        unit="m/s^2",
        validator=GREATER_THAN_ZERO,
        description="The {aggregation} step intensity across detected steps.",
    ) 
[docs]
class CWTStepIntensity(ExtractStepIntensityAll):
    """Extract step intensity for cwt."""
[docs]
    def __init__(self, **kwargs):
        super().__init__(
            data_set_ids=[
                "walking_placement_no_turn_bouts",
                "cwt_with_walking_bouts_optimized_step_vigor",
            ],
            **kwargs,
        ) 
 
[docs]
class CWTStepRegularityWithoutBout(AggregateRawDataSetColumn):
    """Extract step regularity without walking bout."""
    data_set_ids = "cwt_optimized_gait_regularity"
    column_id = "step_regularity"
    aggregations = DEFAULT_AGGREGATIONS
    definition = MeasureValueDefinitionPrototype(
        measure_name=AV("step regularity", "step_regularity"),
        data_type="float64",
        unit="s",
        validator=BETWEEN_MINUS_ONE_AND_ONE,
        description="The {aggregation} step regularity across all steps.",
    ) 
[docs]
class CWTStrideRegularityWithoutBout(AggregateRawDataSetColumn):
    """Extract stride regularity without walking bout."""
    data_set_ids = "cwt_optimized_gait_regularity"
    column_id = "stride_regularity"
    aggregations = DEFAULT_AGGREGATIONS
    definition = MeasureValueDefinitionPrototype(
        measure_name=AV("stride regularity", "stride_regularity"),
        data_type="float64",
        unit="s",
        validator=BETWEEN_MINUS_ONE_AND_ONE,
        description="The {aggregation} stride regularity across all steps.",
    ) 
[docs]
class CWTStepRegularity(ExtractStepRegularity):
    """Extract step regularity for cwt."""
[docs]
    def __init__(self, **kwargs):
        super().__init__(
            data_set_ids=[
                "walking_placement_no_turn_bouts",
                "cwt_with_walking_bouts_optimized_gait_regularity",
            ],
            **kwargs,
        ) 
 
[docs]
class CWTStrideRegularity(ExtractStrideRegularity):
    """Extract stride regularity for cwt."""
[docs]
    def __init__(self, **kwargs):
        super().__init__(
            data_set_ids=[
                "walking_placement_no_turn_bouts",
                "cwt_with_walking_bouts_optimized_gait_regularity",
            ],
            **kwargs,
        )