Source code for dispel.providers.mobilized.io

"""Functionality to read Mobilize-D YAR files."""

from collections.abc import MutableMapping
from typing import Dict, Tuple

import numpy as np
import pandas as pd
import scipy.io

from dispel.data.core import Evaluation, Reading
from dispel.data.epochs import EpochDefinition
from dispel.data.levels import Context, Level
from dispel.data.raw import (
    RawDataSet,
    RawDataSetDefinition,
    RawDataSetSource,
    RawDataValueDefinition,
)
from dispel.data.values import Value, ValueDefinition

# Define required CONSTANTS
RECORDING_CONTEXT_KEYS = {"StartDateTime", "TimeZone"}
SET_META_INFO = {"Fs", "Presence"}
SET_UNICOLUMN = {"Timestamp", "Bar"}
SET_REMAINING_MEASURES = {"Distance", "NormalizedPressure"}
SET_XYZ = {"Acc", "Gyr", "Mag"}


# Functions to read and unwrap matlab yar files
[docs] def unwrap_mat(data: np.ndarray): """Unwrap array generated by scipy load mat.""" if data.dtype.names is None: return data return {n: unwrap_mat(data[n][0, 0]) for n in data.dtype.names}
[docs] def read_matlab_file(path: str) -> dict: """Format matlab file to a dictionary.""" if not path.endswith(".mat"): raise NotImplementedError("Only .mat files are supported.", path) mat = scipy.io.loadmat(path) res = { "__header__": mat["__header__"], "__version__": mat["__version__"], "__globals__": mat["__globals__"], "data": unwrap_mat(mat["data"]), } return res
# Function to create a Context from dictionary format
[docs] def flatten(dictionary, parent_key="", separator="_"): """Flatten a dictionary.""" items = [] for key, value in dictionary.items(): new_key = parent_key + separator + key if parent_key else key if isinstance(value, MutableMapping): items.extend(flatten(value, new_key, separator=separator).items()) else: items.append((new_key, value)) return dict(items)
[docs] def parse_context(context: Dict) -> Context: """Parse the context information available. Parameters ---------- context A dictionary extracted from a mobilize-D .mat file Returns ------- Context The context representation of the passed ``data``. """ values = [Value(ValueDefinition(item, item), context[item]) for item in context] return Context(values)
[docs] def context_from_dict(dictionary: Dict) -> Context: """Flatten and cast to dictionary.""" return parse_context(flatten(dictionary, separator="."))
[docs] def df_from_source(sub_dict, sensor_location): """Create a dataframe from a nested dictionary and a sensor location.""" # Define sensor with multiple columns set_sensors = set(sub_dict[sensor_location].keys()) - SET_META_INFO set_sensors_uni = set_sensors & SET_UNICOLUMN set_sensors_xyz = set_sensors & SET_XYZ data_to_cat = [ *[sub_dict[sensor_location][sensor_uni] for sensor_uni in set_sensors_uni], *[sub_dict[sensor_location][sensor_xyz] for sensor_xyz in set_sensors_xyz], ] columns = [ *[sensor_uni for sensor_uni in set_sensors_uni], *[ f"{sensor_xyz}_{axis}" for sensor_xyz in set_sensors_xyz for axis in ["x", "y", "z"] ], ] for remaining_measures in SET_REMAINING_MEASURES: incl_remaining = remaining_measures in set_sensors if incl_remaining: data_to_cat.append(sub_dict[sensor_location][remaining_measures]) columns += [ f"{remaining_measures}_{n}" for n in range(sub_dict[sensor_location][remaining_measures].shape[1]) ] df = pd.DataFrame(np.concatenate(data_to_cat, axis=1), columns=columns) return df
[docs] def pre_formatting_yar(dict_mat: Dict) -> Tuple[str, Dict]: """Pre-format a YAR files.""" # Instantiate the data dictionary to use to create the reading data_t1 = dict_mat["data"]["TimeMeasure1"] # Give a name to the source here we choose YAR source = "YAR" return source, data_t1
[docs] def parse_mobilized_yar(path: str, verbose: bool = True) -> Reading: """Create a reading from mobilize-d .mat yar file.""" # Read the .mat file dict_mat = read_matlab_file(path) # Instantiate the reading start and end, they will be updated with recording min # and max timestamps reading_start = np.nan reading_end = np.nan # Instantiate the data dictionary and source source, data_t1 = pre_formatting_yar(dict_mat) # Instantiate an empty list of levels list_level = [] # Go through the recordings for it_level, (level_name, recording) in enumerate(data_t1.items()): # Instantiate required variables start = np.nan end = np.nan context = {} raw_data_sets = [] if verbose: print("___________") print(f"Reading Level {level_name}") # Go through assessments in the recording for assessment, item in recording.items(): # If variable are contextual add them to context if assessment in RECORDING_CONTEXT_KEYS: context[assessment] = item.squeeze() continue # Else create a new level in the context to store information linked to # the assessment context[assessment] = {} if verbose: print("- - - - -") print(f"{level_name}: assessment {assessment}") # Specific case of Standards (here it is not about Acc, Gyr, Mag but # pressure) if assessment == "Standards": # Go through the sources for source in item.keys(): if verbose: print( f"{level_name}: assessment {assessment} - source {source}" ) # Create a sub_dict at the level of the source sub_dict = data_t1[level_name][assessment][source] # create a new level in the context to store information linked # to the source context[assessment][source] = {} # Usual case if source != "INDIP": # Go through sensor locations for sensor_location in sub_dict.keys(): if verbose: print( f"{level_name}: assessment {assessment} - " f"source {source} - " f"sensor_location {sensor_location}" ) # Storing contextual sensor frequency information context[assessment][source][sensor_location] = {} context[assessment][source][sensor_location][ "Fs" ] = sub_dict[sensor_location]["Fs"] # Create a dataframe out of the source and sensor location df = df_from_source(sub_dict, sensor_location) # Create an identifier for the dataset dataset_id = f"{assessment}-{source}-{sensor_location}" # Create the definitions definitions = [ RawDataValueDefinition(column, column.upper()) for column in df.columns ] # Create the raw_data_set objects raw_data_sets += [ RawDataSet( definition=RawDataSetDefinition( dataset_id, RawDataSetSource(source), definitions, ), data=df, ) ] # Specific case of INDIP elif source == "INDIP": # Storing contextual sensor frequency information context[assessment][source]["Fs"] = sub_dict["Fs"] # Go through the possible scenarios for scenario in sub_dict.keys(): if scenario == "Fs": continue # Storing contextual assessment, source and scenario # information context[assessment][source][scenario] = {} # Go through the event of the specific scenario and add # to context for event in sub_dict[scenario].keys(): context[assessment][source][scenario][event] = data_t1[ level_name ][assessment][source][scenario][event].squeeze() else: raise NotImplementedError(f"{source} is not supported yet.") continue # Usual case not Standards for sensor_location in item.keys(): if verbose: print( f"{level_name}: assessment {assessment} - " f"sensor_location {sensor_location}" ) # create a new level in the context for assessment and sensor_location context[assessment][sensor_location] = {} # Create an identifier for the dataset dataset_id = f"{assessment}-{sensor_location}" # Create a dataframe out of the source and sensor location df = df_from_source(data_t1[level_name][assessment], sensor_location) # update start and end for the level if "Timestamp" in df.columns: start = min(df.Timestamp.min(), start) end = max(df.Timestamp.max(), end) # Create the definitions definitions = [ RawDataValueDefinition(column, column.upper()) for column in df.columns ] # Create the raw_data_set objects raw_data_sets += [ RawDataSet( definition=RawDataSetDefinition( dataset_id, RawDataSetSource(source), definitions ), data=df, ) ] # Update reading start and end if it_level == 0: reading_start = start reading_end = end else: reading_start = min(reading_start, start) reading_end = max(reading_end, end) # Flatten the level context with . separator and cast it to a Context object formatted_context = context_from_dict(context) # Append the level with the raw_data_sets and the context to the level_list list_level.append( Level( id_=level_name, start=start, end=end, raw_data_sets=raw_data_sets, context=formatted_context, ) ) # Create the reading path_split = path.split("/Data/")[1].split("/") evaluation_code = path_split[0] uuid = "/".join(path_split) # Create the evaluation evaluation = Evaluation( start=reading_start, end=reading_end, finished=True, definition=EpochDefinition(id_=evaluation_code), uuid=uuid, ) reading = Reading(evaluation, levels=list_level) return reading