Source code for dispel.providers.bdh.io

"""Functionality to read files from BDH apps.

Examples
--------
To read a BDH json file and work with the contained data one can read the file:

.. testsetup:: bdh

    >>> import pkg_resources
    >>> path = pkg_resources.resource_filename('tests.providers.bdh',
    ...                                        '_resources/DRAW/uat_drawing.json')

.. doctest:: bdh

    >>> from dispel.providers.bdh.io import read_bdh
    >>> reading = read_bdh(path)

And access raw sensor data for the first level

.. doctest:: bdh
    :options: +NORMALIZE_WHITESPACE

    >>> id_ = reading.level_ids[0]
    >>> reading.get_level(id_).get_raw_data_set('screen').data.head().tsTouch
    0   2021-03-21 10:33:26.666
    1   2021-03-21 10:33:26.674
    2   2021-03-21 10:33:26.690
    3   2021-03-21 10:33:26.698
    4   2021-03-21 10:33:26.716
    Name: tsTouch, dtype: datetime64[ns]

For further details on :class:`~dispel.data.core.Reading` and the data model take a look at
:mod:`dispel.data.core`.
"""

from typing import Any, Callable, Dict, Iterable, List, Optional

from dispel.data.collections import MeasureSet
from dispel.data.core import Device, Reading, ReadingSchema, Session
from dispel.data.devices import AndroidPlatform, IOSPlatform, PlatformType, Screen
from dispel.data.epochs import EpochDefinition
from dispel.data.levels import Context, Level, LevelId
from dispel.data.measures import MeasureValue
from dispel.data.raw import RawDataSet, RawDataSetDefinition, RawDataValueDefinition
from dispel.data.values import Value, ValueDefinition
from dispel.io.utils import flatten, load_json
from dispel.providers.bdh.data import BDHEvaluation, BDHRawDataSetSource, BDHReading
from dispel.providers.bdh.io.core import (
    KEYS,
    get_level_id_two_hands,
    parse_epoch,
    parse_raw_data_sets,
)
from dispel.providers.bdh.io.cps import (
    convert_activity_sequence as convert_activity_sequence_cps,
)
from dispel.providers.bdh.io.cps import get_level_id as get_level_id_cps
from dispel.providers.bdh.io.cps import (
    translate_reference_table_type,
    translate_sequence_type,
)
from dispel.providers.bdh.io.drawing import convert_touch_events
from dispel.providers.bdh.io.drawing import get_level_id as get_level_id_drawing
from dispel.providers.bdh.io.gait import convert_gps
from dispel.providers.bdh.io.gait import get_level_id as get_level_id_gait
from dispel.providers.bdh.io.msis29 import (
    convert_activity_sequence as convert_activity_sequence_msis,
)
from dispel.providers.bdh.io.msis29 import create_levels as create_levels_msis
from dispel.providers.bdh.io.msis29 import get_level_id as get_level_id_msis
from dispel.providers.bdh.io.neuroqol import (
    convert_activity_sequence as convert_activity_sequence_neuroqol,
)
from dispel.providers.bdh.io.pinch import create_levels as create_levels_pinch
from dispel.providers.bdh.io.pinch import get_level_id as get_level_id_pinch
from dispel.providers.bdh.io.pinch import (
    update_raw_data_definition as update_raw_data_definition_pinch,
)
from dispel.providers.bdh.io.sbt_utt import convert_timestamp
from dispel.providers.bdh.io.sbt_utt import get_level_id as get_level_id_sbt
from dispel.providers.bdh.io.survey import convert_flagged_answers
from dispel.providers.bdh.io.survey import create_levels as create_levels_mood
from dispel.providers.bdh.io.survey import get_level_id as get_level_id_mood
from dispel.providers.bdh.io.voice import get_level_id as get_level_id_voice
from dispel.providers.generic.tasks.cps.utils import (
    EXPECTED_DURATION_D2D,
    EXPECTED_DURATION_S2D,
    LEVEL_DURATION_DEF,
)
from dispel.providers.registry import register_reader

DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MAPPING_SHAPE_LEVEL_TYPE = {
    "infinity_loop": "infinity",
    "rectangle_counter_clockwise": "squareCounterClock",
    "rectangle_clockwise": "squareClock",
    "spiral": "spiral",
}


def _convert_activity_sequence(
    data: Dict[str, Any], definition: RawDataSetDefinition
) -> RawDataSet:
    """Convert activity sequence dataset to userInput."""
    # CPS case
    if "presented_symbol_timestamp" in data:
        return convert_activity_sequence_cps(data, definition)
    # NEUROQOL Case
    if "form_item_id" in data:
        return convert_activity_sequence_neuroqol(data, definition)
    # MSIS29 Case
    return convert_activity_sequence_msis(data, definition)


TO_CONVERT: Dict[str, Callable] = {
    "touch_events": convert_touch_events,
    "activity_sequence": _convert_activity_sequence,
    "validated_answers": convert_flagged_answers,
    "accelerometer": lambda x, y: convert_timestamp("accelerometer", x, y),
    "raw_accelerometer": lambda x, y: convert_timestamp("raw_accelerometer", x, y),
    "calibrated_accelerometer": lambda x, y: convert_timestamp("accelerometer", x, y),
    "gyroscope": lambda x, y: convert_timestamp("gyroscope", x, y),
    "raw_gyroscope": lambda x, y: convert_timestamp("raw_gyroscope", x, y),
    "calibrated_gyroscope": lambda x, y: convert_timestamp("gyroscope", x, y),
    "gravity": lambda x, y: convert_timestamp("gravity", x, y),
    "gps": convert_gps,
}


[docs] def parse_schema(data: Dict) -> ReadingSchema: """Parse the schema of a reading. Parameters ---------- data A dictionary containing the schema attributes Returns ------- ReadingSchema The created reading schema. """ return ReadingSchema(**data)
[docs] def update_raw_data_definition( data: Dict[str, Any], schema: ReadingSchema ) -> Dict[str, Any]: """Update raw data definitions. Parameters ---------- data Raw data definitions schema Data scheme Returns ------- Dict[str, Any] Level data """ if "pinch" in schema.name: return update_raw_data_definition_pinch(data) return data
[docs] def create_levels(data: Dict[str, Any], schema: ReadingSchema) -> Dict[str, Any]: """Create levels from uni-level activity data. The BDH format does not split data in levels in the same way the ADS format does. For example, in the mood test, the two questions are lumped in the same level. This function splits the data in the way `dispel` expects. Parameters ---------- data Level data schema Data scheme Returns ------- Dict[str, Any] Level data """ if "mood" in schema.name: return create_levels_mood(data) if "msis" in schema.name: return create_levels_msis(data) if "pinch" in schema.name: return create_levels_pinch(data) return data
[docs] def get_level_id(config: dict, schema: ReadingSchema) -> LevelId: """Parse level id from level type and configuration. Parameters ---------- config The level configuration schema The schema from data header Returns ------- LevelId Level id for the level. Raises ------ NotImplementedError If the schema name parsing has not been implemented. """ id_functions = { "drawing": get_level_id_drawing, "cps": get_level_id_cps, "sb": get_level_id_sbt, "pinch": get_level_id_pinch, "6mw": get_level_id_gait, "mood": get_level_id_mood, "msis": get_level_id_msis, "sp-activity": get_level_id_two_hands, "fingertap-activity": get_level_id_two_hands, "voice-activity": get_level_id_voice, } for k, v in id_functions.items(): if k in schema.name: return v(config) raise NotImplementedError(f"{schema.name} is not implemented yet.")
def _parse_value_definition(id_: str, data: Dict, cls): return cls( id_=id_, name=id_, description=data.get(KEYS.description, None), unit=data.get(KEYS.unit, None), )
[docs] def parse_measure_definition(id_: str, data: Dict) -> ValueDefinition: """Parse a measure definition. Parameters ---------- id_ The id of the measure value definition data The measure definition in BDH json format Returns ------- ValueDefinition The created value definition. """ return _parse_value_definition(id_, data, ValueDefinition)
[docs] def parse_raw_data_source(data: Dict) -> BDHRawDataSetSource: """Parse a raw data source definition. Parameters ---------- data The BDH raw data source definition dictionary in BDH json format. Returns ------- BDHRawDataSetSource The created BDH format raw data set source. """ return BDHRawDataSetSource( data[KEYS.manufacturer], data.get(KEYS.chipset, None), data.get(KEYS.reference, None), )
[docs] def parse_raw_data_value_definition(id_: str, data: Dict) -> RawDataValueDefinition: """Parse a raw data value definition. Parameters ---------- id_ The id of the raw data value definition. data The raw data value definition in BDH json format. Returns ------- RawDataValueDefinition The created raw data value definition. """ return _parse_value_definition(id_, data, RawDataValueDefinition)
[docs] def parse_screen(device: Dict[str, Any]) -> Screen: """Parse a screen dictionary into a Screen class. Parameters ---------- device Device information. Returns ------- Screen The screen information. """ width_pixels = int(device["screen_width_pixels"]) height_pixels = int(device["screen_height_pixels"]) try: return Screen( width_pixels=width_pixels, height_pixels=height_pixels, density_dpi=(width_pixels / device["screen_width_mm"]) * 25.4, width_dp_pt=int(device["screen_width_pt_dp"]), height_dp_pt=int(device["screen_height_pt_dp"]), ) except KeyError: return Screen(width_pixels=width_pixels, height_pixels=height_pixels)
def _parse_platform(model_name: str) -> PlatformType: """Get the platform from the model name.""" if model_name is None: return AndroidPlatform() if "iPhone" in model_name: return IOSPlatform() return AndroidPlatform()
[docs] def parse_device(device_dict: dict) -> Device: """Parse a device dictionary into a Device class. Parameters ---------- device_dict The device information dictionary. Returns ------- Device The device information. """ platform = _parse_platform(device_dict.get(KEYS.model_name, None)) screen = parse_screen(device_dict) version = device_dict.get(KEYS.os_version, None) return Device( None, platform, device_dict.get(KEYS.model_name, None), device_dict.get(KEYS.model_number, None), version, screen=screen, )
[docs] def parse_session(header: dict) -> Session: """Parse the header into a Session class. Parameters ---------- header The header dictionary. Returns ------- Session Session data. """ epoch = parse_epoch(header["effective_time_frame"]) session_cluster_id = None cluster_activity_codes = None if "session_cluster_id" in header: session_cluster_id = header["session_cluster_id"] if "cluster_activity_codes" in header: cluster_activity_codes = header["cluster_activity_codes"] code = "InClinic" if header["inclinic_mode"] else "Daily" return Session( start=epoch.start, end=epoch.end, definition=EpochDefinition(id_=code), uuid=session_cluster_id, evaluation_codes=cluster_activity_codes, )
[docs] def parse_raw_data_set_definition(id_: str, data: Dict) -> RawDataSetDefinition: """Parse a raw data set definition for a reading. Parameters ---------- id_ The id of the raw data set data The definition of the raw data set in BDH json format. Returns ------- RawDataSetDefinition The created raw data set definition. Raises ------ ValueError If no source is defined for the given raw data source. ValueError If no values are defined for the given raw data source. """ if KEYS.source not in data or not data[KEYS.source]: raise ValueError(f"No source defined for raw data source {id_}") source = parse_raw_data_source(data[KEYS.source]) if KEYS.values not in data: raise ValueError(f"No values defined for raw data source {id_}") definitions = [] for data_name, data_def in data[KEYS.values].items(): # FIXME once values have ids too then they can be replaced data_def[KEYS.name] = data_name definitions.append(parse_raw_data_value_definition(data_name, data_def)) return RawDataSetDefinition( id=id_, source=source, value_definitions_list=definitions, is_computed=data[KEYS.computed], )
[docs] def parse_measures( data: Dict[str, Any], definitions: Iterable[ValueDefinition] ) -> MeasureSet: """Parse measures from a reading. Parameters ---------- data The data dictionary for measures in BDH json format definitions The definitions for the measures Returns ------- MeasureSet The created measure set. """ # create dictionary of definitions to match def_dict = {str(x.id): x for x in definitions} return MeasureSet([MeasureValue(def_dict[k], v) for k, v in data.items()])
[docs] def parse_evaluation(data: Dict) -> BDHEvaluation: """Parse the evaluation information for a reading. Parameters ---------- data The header of the BDH json file Returns ------- BDHEvaluation The evaluation information for the reading Raises ------ ValueError If the evaluation id is missing from the data. """ # TODO support interruptions if KEYS.id not in data: raise ValueError("Missing evaluation id") evaluation_id = data[KEYS.id] if KEYS.user_id not in data: raise ValueError("Missing user id") user_id = data[KEYS.user_id] if KEYS.effective_time_frame not in data: raise ValueError("Missing effective time frame") epoch = parse_epoch(data[KEYS.effective_time_frame]) if KEYS.schema_id not in data or KEYS.name not in data[KEYS.schema_id]: raise ValueError("Missing task or schema information") task = data[KEYS.schema_id][KEYS.name] finished = data[KEYS.completion] == "completed" try: exit_reason = data[KEYS.interruption_reason] except KeyError: exit_reason = None # flatten header information header_meta = data.copy() header_meta.pop("configuration", None) header_meta.pop("raw_data", None) header_meta_flat = flatten(header_meta) if (key := "cluster_activity_codes") in header_meta_flat: if len(header_meta_flat[key]) > 0: header_meta_flat[key] = header_meta_flat[key][0] if (key := "effective_time_frame_idle_times") in header_meta_flat: header_meta_flat[key] = str(header_meta_flat[key]) return BDHEvaluation( start=epoch.start, end=epoch.end, definition=EpochDefinition(id_=task), uuid=evaluation_id, finished=finished, user_id=user_id, exit_reason=exit_reason, header_meta=header_meta_flat, )
[docs] def get_context(config: Optional[Dict[str, Any]], schema_name: str) -> Context: """ Create a context from a config dictionary. Parameters ---------- config An optional dict that contains the raw information about the context schema_name The name of the schema Returns ------- Context The parsed context """ if not config: return Context() schema_name.split("-")[0] _context = [ Value(ValueDefinition(key, key), value) for key, value in config.items() ] ref_table = 2 if "reference_table_type" in config: ref_table = config["reference_table_type"] _context += translate_reference_table_type(ref_table) _context += translate_sequence_type("random") if "drawing_hand" in config: _context.append( Value(ValueDefinition("usedHand", "usedHand"), config["drawing_hand"]) ) _context.append( Value( ValueDefinition("levelType", "levelType"), MAPPING_SHAPE_LEVEL_TYPE[config["drawing_figure_name"]], ) ) return Context(_context)
[docs] def enrich_context(schema, level_id, context): """Enrich the context with level specific information.""" if "cps" in schema.name: if level_id == "digit_to_digit": duration = EXPECTED_DURATION_D2D elif level_id == "symbol_to_digit": duration = EXPECTED_DURATION_S2D else: raise ValueError(f"Unknown level id: {level_id}") context.set(duration, LEVEL_DURATION_DEF)
[docs] def parse_level( data: dict, schema: ReadingSchema, measure_definitions: List[ValueDefinition], raw_data_definitions: Dict[Any, RawDataSetDefinition], ) -> Level: """Parse level id from dictionary containing level info. Parameters ---------- data Dictionary containing level info schema The schema from data header measure_definitions list of measure definitions raw_data_definitions Dict of raw data definitions Returns ------- Level The parsed level """ # pylint: disable=unused-argument # TODO investigate how to use measure_definitions raw_data_sets = [] config = data.get(KEYS.configuration) if not config: id_ = data["name"] if id_ == "6mw": id_ = "6mwt" else: id_ = get_level_id(config, schema) # Initialize measure_set to None measure_set = None # Specific case of NeuroQol where we already have computed measures. if schema.name == "neuroqol-activity": measure_set = MeasureSet( values=[ float(data["mobile_computed_measures"]["t_score"]), float(data["mobile_computed_measures"]["standard_error"]), ], definitions=[ ValueDefinition( f"mobile_computed_theta_score_{id_}", f"The mobile computed theta score of the subtest {id_}.", ), ValueDefinition( f"mobile_computed_standard_error_{id_}", f"The mobile computed standard error of the subtest {id_}.", ), ], ) if KEYS.raw_data in data: raw_data_sets = parse_raw_data_sets(data[KEYS.raw_data], raw_data_definitions) for key, function in filter( lambda item: item[0] in raw_data_definitions, TO_CONVERT.items() ): new_data_set = function(data[KEYS.raw_data][key], raw_data_definitions[key]) # Drop key raw_data_sets = [x for x in raw_data_sets if x.id != key] raw_data_sets.append(new_data_set) epoch = parse_epoch(data[KEYS.effective_time_frame]) context = get_context(config, schema.name) enrich_context(schema, id_, context) return Level( id_=id_, start=epoch.start, end=epoch.end, context=context, raw_data_sets=raw_data_sets, measure_set=measure_set, )
[docs] def parse_levels( data: dict, schema: ReadingSchema, measure_definitions: List[ValueDefinition], raw_data_definitions: Dict[Any, RawDataSetDefinition], ) -> List[Level]: """Parse levels from dict. Parameters ---------- data Dict containing data body schema The schema from data header measure_definitions list of measure definitions raw_data_definitions Dict of raw data definitions Returns ------- List[Level] list of levels Raises ------ ValueError If a level property is missing in ``data``. """ if KEYS.levels not in data: raise ValueError(f"Missing {KEYS.levels} property") data = create_levels(data, schema) raw_data_definitions = update_raw_data_definition(raw_data_definitions, schema) return [ parse_level(level_data, schema, measure_definitions, raw_data_definitions) for level_data in data[KEYS.levels] ]
[docs] def parsable_bdh_json(value: Any) -> bool: """Test if a value is a dictionary and contains BDH specific keys.""" if not isinstance(value, dict): return False return ("header" in value.keys()) & ("body" in value.keys())
[docs] @register_reader(parsable_bdh_json, BDHReading) def parse_bdh_reading(data: Dict) -> BDHReading: """Get class representation of dictionary representation. Parameters ---------- data The dictionary containing the information about the reading Returns ------- Reading The class representation of the record passed. Raises ------ ValueError If header, schema or body information is missing in ``data``. """ if KEYS.header not in data: raise ValueError("Missing header information") data_header = data[KEYS.header] if KEYS.schema_id not in data_header: raise ValueError("Missing schema information") schema = parse_schema(data_header[KEYS.schema_id]) evaluation = parse_evaluation(data_header) measure_definitions = [] if KEYS.measures in data_header: header_measures = data_header[KEYS.measures] if "mobile_computed_measures" in header_measures: mcf = header_measures["mobile_computed_measures"] if "activity_measures" in mcf: for id_, data_def in mcf["activity_measures"].items(): measure_definitions.append( parse_measure_definition("mobile_" + id_, data_def) ) if "activity_measures" in header_measures: for id_, data_def in header_measures["activity_measures"].items(): measure_definitions.append( parse_measure_definition("pre-existing_" + id_, data_def) ) raw_data_definitions = {} if KEYS.raw_data in data_header: for id_, data_def in data_header[KEYS.raw_data].items(): raw_data_definitions[id_] = parse_raw_data_set_definition(id_, data_def) if KEYS.body not in data: raise ValueError("Missing body") data_body = data[KEYS.body] device = parse_device(data_header[KEYS.acquisition_provenance][KEYS.source_device]) parsed_levels = parse_levels( data_body, schema, measure_definitions, raw_data_definitions ) session = parse_session(data_header) res = BDHReading( evaluation=evaluation, schema=schema, levels=parsed_levels, measure_set=None, date=data_header.get(KEYS.creation_date_time, None), device=device, session=session, ) return res
[docs] def read_bdh(path: str) -> Reading: """Read a *BDH* data record. Parameters ---------- path The path to the reading to be parsed Returns ------- Reading The class representation of the record parsed. """ data = load_json(path, encoding="utf-8") return parse_bdh_reading(data)