"""Transform module for transform step specific to io format.
The transform step below are steps only modifying a reading coming from a
BDH input.
"""
from abc import ABC
from typing import Any, Dict, List
import pandas as pd
from dispel.data.core import Reading
from dispel.data.levels import Level
from dispel.data.raw import DEFAULT_COLUMNS, RawDataValueDefinition
from dispel.processing.core import ProcessingStep, ProcessResultType
from dispel.processing.data_set import transformation
from dispel.processing.level import LevelFilter, LevelIdFilter, ProcessingStepGroup
from dispel.processing.transform import TransformStep
from dispel.providers.bdh.data import BDHReading
[docs]
class ProcessBDHOnlyMixIn(ABC, TransformStep):
"""An abstract class to process only if an instance is a BDH Reading."""
[docs]
def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType:
"""Overwrite process reading."""
if isinstance(reading, BDHReading):
yield from super().process_reading(reading, **kwargs)
[docs]
class ProcessBDHProcessingStepGroupOnly(ABC):
"""An abstract class to process only if an instance is a BDH Reading."""
[docs]
def __init__(self, **kwargs):
self.kwargs = kwargs or self.kwargs
super().__init__(kwargs)
[docs]
def get_kwargs(self) -> Dict[str, Any]:
"""Get keyword arguments to be added to the processing."""
return self.kwargs
# pylint: disable=no-member
[docs]
def process_reading(self, reading: Reading, **kwargs) -> ProcessResultType:
"""Overwrite process_reading."""
(updated_kwargs := kwargs.copy()).update(self.get_kwargs())
if isinstance(reading, BDHReading):
# type: ignore
yield from super().process_reading( # type: ignore
reading, **updated_kwargs
)
[docs]
class TruncateBDHVoiceInstructions(ProcessBDHOnlyMixIn):
"""
Truncate voice instructions in UTT test for BDH format.
For BDH Format, IMUs are recorded during the entire session of SBT-UTT
test, even during the voice instructions. To match ADS Format of the UTT
and only get IMUs during the actual UTT, the data has to be truncated after
the third voice instruction.
"""
level_filter: LevelFilter = LevelIdFilter("utt")
[docs]
def get_new_data_set_id(self) -> str:
"""Overwrite new_data_set_id with current data_set_id."""
assert len(data_set_ids := list(self.get_data_set_ids())) == 1
return data_set_ids[0]
[docs]
@staticmethod
def get_voice_instruction_end(level: Level) -> pd.Timestamp:
"""Get the timestamp at which the voice instructions end."""
data = level.get_raw_data_set("voice_instructions").data
return pd.to_datetime(data["start_timestamp"].iloc[2], unit="ms")
[docs]
@transformation
def truncate(self, data: pd.DataFrame, level: Level):
"""Truncate the data frame removing voice instructions."""
return data[data.ts >= self.get_voice_instruction_end(level)].copy()
[docs]
class TruncateLastThirtySecondsBDH(ProcessBDHOnlyMixIn):
"""
Truncate BDH sensor data to keep the last 30 seconds.
For BDH Format, IMUs are recorded during the entire session of SBT-UTT
test, even during the voice instructions. To match ADS Format of the SBT
and only get IMUs during the actual SBT, the data has to be truncated to
keep the last 30 seconds.
"""
level_filter: LevelFilter = LevelIdFilter("sbt")
[docs]
def get_new_data_set_id(self) -> str:
"""Overwrite new_data_set_id with current data_set_id."""
assert len(data_set_ids := list(self.get_data_set_ids())) == 1
return data_set_ids[0]
[docs]
@transformation
def truncate_last_30_seconds(self, data: pd.DataFrame):
"""Truncate the data frame to keep 30 last Seconds."""
return data[data.ts >= data.iloc[-1]["ts"] - pd.Timedelta(30, unit="s")].copy()
[docs]
class TruncateFirstTwoMinBDH(ProcessBDHOnlyMixIn):
"""
Truncate BDH sensor data to keep only the first two minutes.
Due to the fact the SBT-UTT test does not have a maximum time allowed
by Konectom during recording this is to prevent that extremely long
recordings are being processed and takes into account only the first
two minutes. In practice, this also addresses a Konectom isssue where
there is a gap in timestamps in UTT that can span to hours.
"""
[docs]
def get_new_data_set_id(self) -> str:
"""Overwrite new_data_set_id with current data_set_id."""
assert len(data_set_ids := list(self.get_data_set_ids())) == 1
return data_set_ids[0]
[docs]
@transformation
def truncate_first_two_minutes(self, data: pd.DataFrame):
"""Truncate the data frame to keep first two minutes."""
return data[data.ts <= data.iloc[0]["ts"] + pd.Timedelta(120, unit="s")].copy()
DEFINITIONS = [
RawDataValueDefinition(column, column) for column in DEFAULT_COLUMNS + ["ts"]
]
TRUNCATE_SENSORS: List[ProcessingStep] = [
TruncateBDHVoiceInstructions(
data_set_ids="accelerometer", definitions=DEFINITIONS, storage_error="overwrite"
),
TruncateBDHVoiceInstructions(
data_set_ids="gyroscope", definitions=DEFINITIONS, storage_error="overwrite"
),
TruncateBDHVoiceInstructions(
data_set_ids="gravity", definitions=DEFINITIONS, storage_error="overwrite"
),
TruncateFirstTwoMinBDH(
data_set_ids="accelerometer", definitions=DEFINITIONS, storage_error="overwrite"
),
TruncateFirstTwoMinBDH(
data_set_ids="gyroscope", definitions=DEFINITIONS, storage_error="overwrite"
),
TruncateFirstTwoMinBDH(
data_set_ids="gravity", definitions=DEFINITIONS, storage_error="overwrite"
),
]