Source code for dispel.providers.bdh.io.sbt_utt
"""Function for converting sbt_utt BDH JSON files into a reading."""
from typing import Any, Dict
import pandas as pd
from dispel.data.levels import LevelId
from dispel.data.raw import RawDataSet, RawDataSetDefinition
from dispel.data.values import DefinitionId
from dispel.providers.bdh.io.core import convert_dataset, parse_raw_data_set
[docs]
def get_level_id(config: dict) -> LevelId:
"""Parse level id from level type and configuration.
Parameters
----------
config
The level configuration
Returns
-------
LevelId
Level id for the level.
Raises
------
NotImplementedError
If the given mode parsing has not been implemented.
"""
if config["mode"] == "static-balance":
return LevelId("sbt")
if config["mode"] == "u-turn":
return LevelId("utt")
raise NotImplementedError(f"Level Id is not implemented for mode: {config['mode']}")
[docs]
def convert_timestamp(
id_: str, data: Dict[str, Any], definition: RawDataSetDefinition
) -> RawDataSet:
"""Convert timestamp in ADS format."""
ref = {"timestamp": "ts"}
data["timestamp"] = pd.to_datetime(
data["timestamp"],
unit=definition.get_value_definition(DefinitionId("timestamp")).unit,
)
data, new_definitions = convert_dataset(data, definition, ref)
definition_acc = RawDataSetDefinition(
id_, definition.source, new_definitions, is_computed=True
)
return parse_raw_data_set(data, definition_acc)