"""Functionality to read Mobilize-D YAR files."""
from collections.abc import MutableMapping
from typing import Dict, Tuple
import numpy as np
import pandas as pd
import scipy.io
from dispel.data.core import Evaluation, Reading
from dispel.data.epochs import EpochDefinition
from dispel.data.levels import Context, Level
from dispel.data.raw import (
RawDataSet,
RawDataSetDefinition,
RawDataSetSource,
RawDataValueDefinition,
)
from dispel.data.values import Value, ValueDefinition
# Define required CONSTANTS
RECORDING_CONTEXT_KEYS = {"StartDateTime", "TimeZone"}
SET_META_INFO = {"Fs", "Presence"}
SET_UNICOLUMN = {"Timestamp", "Bar"}
SET_REMAINING_MEASURES = {"Distance", "NormalizedPressure"}
SET_XYZ = {"Acc", "Gyr", "Mag"}
# Functions to read and unwrap matlab yar files
[docs]
def unwrap_mat(data: np.ndarray):
"""Unwrap array generated by scipy load mat."""
if data.dtype.names is None:
return data
return {n: unwrap_mat(data[n][0, 0]) for n in data.dtype.names}
[docs]
def read_matlab_file(path: str) -> dict:
"""Format matlab file to a dictionary."""
if not path.endswith(".mat"):
raise NotImplementedError("Only .mat files are supported.", path)
mat = scipy.io.loadmat(path)
res = {
"__header__": mat["__header__"],
"__version__": mat["__version__"],
"__globals__": mat["__globals__"],
"data": unwrap_mat(mat["data"]),
}
return res
# Function to create a Context from dictionary format
[docs]
def flatten(dictionary, parent_key="", separator="_"):
"""Flatten a dictionary."""
items = []
for key, value in dictionary.items():
new_key = parent_key + separator + key if parent_key else key
if isinstance(value, MutableMapping):
items.extend(flatten(value, new_key, separator=separator).items())
else:
items.append((new_key, value))
return dict(items)
[docs]
def parse_context(context: Dict) -> Context:
"""Parse the context information available.
Parameters
----------
context
A dictionary extracted from a mobilize-D .mat file
Returns
-------
Context
The context representation of the passed ``data``.
"""
values = [Value(ValueDefinition(item, item), context[item]) for item in context]
return Context(values)
[docs]
def context_from_dict(dictionary: Dict) -> Context:
"""Flatten and cast to dictionary."""
return parse_context(flatten(dictionary, separator="."))
[docs]
def df_from_source(sub_dict, sensor_location):
"""Create a dataframe from a nested dictionary and a sensor location."""
# Define sensor with multiple columns
set_sensors = set(sub_dict[sensor_location].keys()) - SET_META_INFO
set_sensors_uni = set_sensors & SET_UNICOLUMN
set_sensors_xyz = set_sensors & SET_XYZ
data_to_cat = [
*[sub_dict[sensor_location][sensor_uni] for sensor_uni in set_sensors_uni],
*[sub_dict[sensor_location][sensor_xyz] for sensor_xyz in set_sensors_xyz],
]
columns = [
*[sensor_uni for sensor_uni in set_sensors_uni],
*[
f"{sensor_xyz}_{axis}"
for sensor_xyz in set_sensors_xyz
for axis in ["x", "y", "z"]
],
]
for remaining_measures in SET_REMAINING_MEASURES:
incl_remaining = remaining_measures in set_sensors
if incl_remaining:
data_to_cat.append(sub_dict[sensor_location][remaining_measures])
columns += [
f"{remaining_measures}_{n}"
for n in range(sub_dict[sensor_location][remaining_measures].shape[1])
]
df = pd.DataFrame(np.concatenate(data_to_cat, axis=1), columns=columns)
return df
[docs]
def parse_mobilized_yar(path: str, verbose: bool = True) -> Reading:
"""Create a reading from mobilize-d .mat yar file."""
# Read the .mat file
dict_mat = read_matlab_file(path)
# Instantiate the reading start and end, they will be updated with recording min
# and max timestamps
reading_start = np.nan
reading_end = np.nan
# Instantiate the data dictionary and source
source, data_t1 = pre_formatting_yar(dict_mat)
# Instantiate an empty list of levels
list_level = []
# Go through the recordings
for it_level, (level_name, recording) in enumerate(data_t1.items()):
# Instantiate required variables
start = np.nan
end = np.nan
context = {}
raw_data_sets = []
if verbose:
print("___________")
print(f"Reading Level {level_name}")
# Go through assessments in the recording
for assessment, item in recording.items():
# If variable are contextual add them to context
if assessment in RECORDING_CONTEXT_KEYS:
context[assessment] = item.squeeze()
continue
# Else create a new level in the context to store information linked to
# the assessment
context[assessment] = {}
if verbose:
print("- - - - -")
print(f"{level_name}: assessment {assessment}")
# Specific case of Standards (here it is not about Acc, Gyr, Mag but
# pressure)
if assessment == "Standards":
# Go through the sources
for source in item.keys():
if verbose:
print(
f"{level_name}: assessment {assessment} - source {source}"
)
# Create a sub_dict at the level of the source
sub_dict = data_t1[level_name][assessment][source]
# create a new level in the context to store information linked
# to the source
context[assessment][source] = {}
# Usual case
if source != "INDIP":
# Go through sensor locations
for sensor_location in sub_dict.keys():
if verbose:
print(
f"{level_name}: assessment {assessment} - "
f"source {source} - "
f"sensor_location {sensor_location}"
)
# Storing contextual sensor frequency information
context[assessment][source][sensor_location] = {}
context[assessment][source][sensor_location][
"Fs"
] = sub_dict[sensor_location]["Fs"]
# Create a dataframe out of the source and sensor location
df = df_from_source(sub_dict, sensor_location)
# Create an identifier for the dataset
dataset_id = f"{assessment}-{source}-{sensor_location}"
# Create the definitions
definitions = [
RawDataValueDefinition(column, column.upper())
for column in df.columns
]
# Create the raw_data_set objects
raw_data_sets += [
RawDataSet(
definition=RawDataSetDefinition(
dataset_id,
RawDataSetSource(source),
definitions,
),
data=df,
)
]
# Specific case of INDIP
elif source == "INDIP":
# Storing contextual sensor frequency information
context[assessment][source]["Fs"] = sub_dict["Fs"]
# Go through the possible scenarios
for scenario in sub_dict.keys():
if scenario == "Fs":
continue
# Storing contextual assessment, source and scenario
# information
context[assessment][source][scenario] = {}
# Go through the event of the specific scenario and add
# to context
for event in sub_dict[scenario].keys():
context[assessment][source][scenario][event] = data_t1[
level_name
][assessment][source][scenario][event].squeeze()
else:
raise NotImplementedError(f"{source} is not supported yet.")
continue
# Usual case not Standards
for sensor_location in item.keys():
if verbose:
print(
f"{level_name}: assessment {assessment} - "
f"sensor_location {sensor_location}"
)
# create a new level in the context for assessment and sensor_location
context[assessment][sensor_location] = {}
# Create an identifier for the dataset
dataset_id = f"{assessment}-{sensor_location}"
# Create a dataframe out of the source and sensor location
df = df_from_source(data_t1[level_name][assessment], sensor_location)
# update start and end for the level
if "Timestamp" in df.columns:
start = min(df.Timestamp.min(), start)
end = max(df.Timestamp.max(), end)
# Create the definitions
definitions = [
RawDataValueDefinition(column, column.upper())
for column in df.columns
]
# Create the raw_data_set objects
raw_data_sets += [
RawDataSet(
definition=RawDataSetDefinition(
dataset_id, RawDataSetSource(source), definitions
),
data=df,
)
]
# Update reading start and end
if it_level == 0:
reading_start = start
reading_end = end
else:
reading_start = min(reading_start, start)
reading_end = max(reading_end, end)
# Flatten the level context with . separator and cast it to a Context object
formatted_context = context_from_dict(context)
# Append the level with the raw_data_sets and the context to the level_list
list_level.append(
Level(
id_=level_name,
start=start,
end=end,
raw_data_sets=raw_data_sets,
context=formatted_context,
)
)
# Create the reading
path_split = path.split("/Data/")[1].split("/")
evaluation_code = path_split[0]
uuid = "/".join(path_split)
# Create the evaluation
evaluation = Evaluation(
start=reading_start,
end=reading_end,
finished=True,
definition=EpochDefinition(id_=evaluation_code),
uuid=uuid,
)
reading = Reading(evaluation, levels=list_level)
return reading