Source code for dispel.providers.bdh.tasks.sbt_utt

"""Processing functionality for SBT/UTT task."""

from dispel.processing.assertions import AssertEvaluationFinished
from dispel.processing.level import ProcessingStepGroup
from dispel.providers.bdh.data import BDHReading
from dispel.providers.bdh.transform import TRUNCATE_SENSORS, TruncateSensorsSBT
from dispel.providers.generic.tasks.sbt_utt import TASK_NAME
from dispel.providers.generic.tasks.sbt_utt.sbt import (
    SBTProcessingSteps as GenericSBTProcessingSteps,
)
from dispel.providers.generic.tasks.sbt_utt.utt import (
    UTTProcessingSteps as GenericUTTProcessingSteps,
)
from dispel.providers.registry import process_factory


[docs] class SBTProcessingSteps(ProcessingStepGroup): """SBT processing steps.""" steps = [ TruncateSensorsSBT(), GenericSBTProcessingSteps(), ]
[docs] class UTTProcessingSteps(ProcessingStepGroup): """UTT processing steps.""" steps = [ *TRUNCATE_SENSORS, GenericUTTProcessingSteps(), ]
[docs] class SBTUTTProcessingSteps(ProcessingStepGroup): """Combined SBT and UTT processing steps.""" steps = [ AssertEvaluationFinished(), SBTProcessingSteps(), UTTProcessingSteps(), ]
process_sbt_utt = process_factory( task_name=TASK_NAME, steps=SBTUTTProcessingSteps(), codes=("sbtUtt-activity", "sbut-activity"), supported_type=BDHReading, )