Source code for dispel.providers.bdh.tasks.sbt_utt
"""Processing functionality for SBT/UTT task."""
from dispel.processing.assertions import AssertEvaluationFinished
from dispel.processing.level import ProcessingStepGroup
from dispel.providers.bdh.data import BDHReading
from dispel.providers.bdh.transform import TRUNCATE_SENSORS, TruncateSensorsSBT
from dispel.providers.generic.tasks.sbt_utt import TASK_NAME
from dispel.providers.generic.tasks.sbt_utt.sbt import (
    SBTProcessingSteps as GenericSBTProcessingSteps,
)
from dispel.providers.generic.tasks.sbt_utt.utt import (
    UTTProcessingSteps as GenericUTTProcessingSteps,
)
from dispel.providers.registry import process_factory
[docs]
class SBTProcessingSteps(ProcessingStepGroup):
    """SBT processing steps."""
    steps = [
        TruncateSensorsSBT(),
        GenericSBTProcessingSteps(),
    ] 
[docs]
class UTTProcessingSteps(ProcessingStepGroup):
    """UTT processing steps."""
    steps = [
        *TRUNCATE_SENSORS,
        GenericUTTProcessingSteps(),
    ] 
[docs]
class SBTUTTProcessingSteps(ProcessingStepGroup):
    """Combined SBT and UTT processing steps."""
    steps = [
        AssertEvaluationFinished(),
        SBTProcessingSteps(),
        UTTProcessingSteps(),
    ] 
process_sbt_utt = process_factory(
    task_name=TASK_NAME,
    steps=SBTUTTProcessingSteps(),
    codes=("sbtUtt-activity", "sbut-activity"),
    supported_type=BDHReading,
)