Source code for dispel.providers.generic.flags.generic

"""A module to store the technical flags."""

from dispel.data.flags import FlagSeverity, FlagType
from dispel.data.levels import Level
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing.flags import flag
from dispel.processing.level import FlagLevelStep

MAX_TIME_INCR = 2
r"""Maximum time increment allowed in data in seconds."""
MIN_FREQ_GAIT = 40
r"""Minimum sampling frequency allowed in gait data in Hz."""
MIN_FREQ_SBT = 40
r"""Minimum sampling frequency allowed in SBT data in Hz."""


[docs] def detect_fs_below_threshold(level: Level, min_freq: int, **kwargs) -> bool: """Detect a sampling frequency violation.""" acc = level.get_raw_data_set("accelerometer").data fs = 1 / acc.ts.diff().dt.total_seconds().median() return fs > min_freq
[docs] class FrequencyLowerThanThres(FlagLevelStep): """Flag record with sampling rate less than a threshold.""" flag_name = AV("Median frequency lower than {min_freq}Hz", "freq_low_{min_freq}Hz") flag_type = FlagType.TECHNICAL flag_severity = FlagSeverity.INVALIDATION reason = "The median frequency is lower than {min_freq}Hz."
[docs] class FrequencyLowerThanGaitThres(FrequencyLowerThanThres): """Flag gait record with sampling rate less than a threshold.""" @flag(min_freq=MIN_FREQ_GAIT) def _median_instant_freq_lower_than_thres( self, level: Level, **kwargs # pylint: disable=all ) -> bool: self.set_flag_kwargs(min_freq=MIN_FREQ_GAIT) return detect_fs_below_threshold(level, min_freq=MIN_FREQ_GAIT)
[docs] class FrequencyLowerThanSBTThres(FrequencyLowerThanThres): """Flag SBT record with sampling rate less than a threshold.""" @flag(min_freq=MIN_FREQ_SBT) def _median_instant_freq_lower_than_thres( self, level: Level, **kwargs # pylint: disable=all ) -> bool: self.set_flag_kwargs(min_freq=MIN_FREQ_SBT) return detect_fs_below_threshold(level, min_freq=MIN_FREQ_SBT)
[docs] class MaxTimeIncrement(FlagLevelStep): """Flag record with maximum time increment larger than threshold.""" flag_name = AV( f"Maximum time increment larger than {MAX_TIME_INCR} seconds", f"max_time_inc_greater_than_{MAX_TIME_INCR}_sec", ) flag_type = FlagType.TECHNICAL flag_severity = FlagSeverity.INVALIDATION reason = f"The maximum time increment is higher than " f"{MAX_TIME_INCR} seconds." @flag def _max_time_increment_larger_than( self, level: Level, **kwargs # pylint: disable=all ) -> bool: acc = level.get_raw_data_set("accelerometer").data time_inc = acc.ts.diff().dt.total_seconds().max() return time_inc < MAX_TIME_INCR