Source code for dispel.providers.bdh.io.sbt_utt

"""Function for converting sbt_utt BDH JSON files into a reading."""
from typing import Any, Dict

import pandas as pd

from dispel.data.levels import LevelId
from dispel.data.raw import RawDataSet, RawDataSetDefinition
from dispel.data.values import DefinitionId
from dispel.providers.bdh.io.core import convert_dataset, parse_raw_data_set


[docs] def get_level_id(config: dict) -> LevelId: """Parse level id from level type and configuration. Parameters ---------- config The level configuration Returns ------- LevelId Level id for the level. Raises ------ NotImplementedError If the given mode parsing has not been implemented. """ if config["mode"] == "static-balance": return LevelId("sbt") if config["mode"] == "u-turn": return LevelId("utt") raise NotImplementedError(f"Level Id is not implemented for mode: {config['mode']}")
[docs] def convert_timestamp( id_: str, data: Dict[str, Any], definition: RawDataSetDefinition ) -> RawDataSet: """Convert timestamp in ADS format.""" ref = {"timestamp": "ts"} data["timestamp"] = pd.to_datetime( data["timestamp"], unit=definition.get_value_definition(DefinitionId("timestamp")).unit, ) data, new_definitions = convert_dataset(data, definition, ref) definition_acc = RawDataSetDefinition( id_, definition.source, new_definitions, is_computed=True ) return parse_raw_data_set(data, definition_acc)