Source code for dispel.data.core

"""Core data model for the analysis library."""
from collections import defaultdict
from dataclasses import dataclass
from functools import singledispatchmethod
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, ValuesView, cast

import pandas as pd

from dispel.data.devices import Device
from dispel.data.epochs import Epoch
from dispel.data.flags import Flag, FlagMixIn
from dispel.data.levels import Level, LevelEpoch, LevelId, LevelIdType
from dispel.data.measures import MeasureSet, MeasureValue
from dispel.data.raw import RawDataSet
from dispel.data.values import ValueDefinition
from dispel.utils import plural


[docs] @dataclass(frozen=True) class ReadingSchema: """Schema definition for reading.""" #: The namespace of the schema namespace: str #: The name of the schema name: str #: The version of the schema version: str
[docs] class Evaluation(Epoch): """Evaluation information for a :class:`Reading`. The evaluation corresponds to the json related task, whereas the session corresponds to the group of tasks that the evaluation finds itself in. FIXME: DOC Attributes ---------- uuid The unique unified identifier of the evaluation finished ``True`` if the concerned task has been finished normally. ``False`` otherwise. exit_reason The exit condition. It determines the type of interruption if the test was interrupted, as well as the reason for the end of the test if the test has been completed. user_id The identifier of the user """
[docs] def __init__( self, *args, uuid: str, finished: Optional[bool] = None, exit_reason: Optional[str] = None, user_id: Optional[str] = None, **kwargs, ): super().__init__(*args, **kwargs) if self.is_incomplete: raise ValueError("Evaluation epoch must always be complete") self.uuid = uuid self.finished = finished self.exit_reason = exit_reason self.user_id = user_id
[docs] def to_dict(self): """Retrieve values of evaluation as dictionary.""" return { "evaluation_code": str(self.id), "start_date": str(self.start), "end_date": str(self.end), "uuid": self.uuid, "user_id": self.user_id if self.user_id else "", "is_finished": self.finished if self.finished else "", "exit_reason": self.exit_reason if self.exit_reason else "", }
[docs] class Session(Epoch): """Session information for a :class:`Reading`. The session corresponds to the group of tasks that the evaluation finds itself in. FIXME: DOC Attributes ---------- uuid The unique unified identifier of the session evaluation_codes An iterable of task types available in the session. Ordered by display order. """
[docs] def __init__( self, *args, uuid: Optional[str] = None, evaluation_codes: Optional[Iterable[str]] = None, **kwargs, ): super().__init__(*args, **kwargs) self.uuid = uuid self.evaluation_codes = evaluation_codes
[docs] class Reading(FlagMixIn): """A data capture from an experiment. Attributes ---------- evaluation The evaluation information for this reading session The session information for this reading measure_set A list of measures already processed on the device schema The schema of the reading date The time the reading was recorded device The device that captured the reading Parameters ---------- evaluation The evaluation information for this reading session The session information for this reading levels An iterable of Level measure_set A list of measures already processed on the device schema The schema of the reading date The time the reading was recorded device The device that captured the reading """
[docs] def __init__( self, evaluation: Evaluation, session: Optional[Session] = None, levels: Optional[Iterable[Level]] = None, measure_set: Optional[MeasureSet] = None, schema: Optional[ReadingSchema] = None, date: Any = None, device: Optional[Device] = None, ): super().__init__() self.evaluation = evaluation self.session = session self.measure_set: MeasureSet = measure_set or MeasureSet() self.schema = schema self.date = pd.Timestamp(date) if date else None self.device = device self._attempt: Dict[str, int] = defaultdict(int) # verify time frame compatibility if ( self.session and not self.session.is_incomplete and not self.session.contains(self.evaluation) ): raise ValueError("Evaluation start and end must be within session") # create dictionary of levels self._levels: Dict[LevelId, Level] = {} # set level if arg is provided if levels: for level in levels: self.set(level)
[docs] def get_level(self, level_id: Optional[LevelIdType] = None) -> Level: """Get level for a given level_id. Parameters ---------- level_id The id identifying the level. Returns ------- Level The level identified by ``level_id``. If no level id is provided and the reading contains only one level it will be returned. Otherwise, the function will raise a :class:`ValueError`. Raises ------ ValueError If the given id does not match any existing level within the reading. ValueError If no id has been provided, and there are multiple levels withing the reading. """ # check if an arg is provided if level_id: if isinstance(level_id, str): level_id = LevelId.from_str(level_id) # type: ignore # check that this is a correct id if level_id not in self._levels: raise ValueError( f"{level_id=} does not match any Level in {self._levels.keys()}" ) return self._levels[level_id] # type: ignore # if no level_id provided, check if there is only one level if len(self._levels) == 1: return next(iter(self._levels.values())) # if not, ask user for a level_id raise ValueError( f"There are {len(self._levels)} levels, please provide a level_id in" f" {self._levels.keys()}" )
def __repr__(self) -> str: return f'<Reading: {plural("level", len(self))} ({self.flag_count_repr})>' def __iter__(self) -> Iterable[Tuple[LevelIdType, Level]]: yield from self._levels.items() def __len__(self) -> int: return len(self._levels) @property def empty(self) -> bool: """Check whether the reading is empty.""" return len(self) == 0 @property def levels(self) -> ValuesView[Level]: """Get a list of all Level in the reading.""" return self._levels.values() @property def level_ids(self) -> List[LevelId]: """Get the list of level_id keys.""" return [level.id for level in self._levels.values()]
[docs] def has_raw_data_set( self, data_set_id: str, level_id: LevelIdType, ) -> bool: """Check whether the reading contains the desired raw data set. Parameters ---------- data_set_id The id of the raw data set that will be searched for. level_id The level id in which the raw data set is to searched for. Returns ------- bool ``True`` if the raw data set exists inside the given level. ``False`` otherwise. """ return self.get_level(level_id).has_raw_data_set(data_set_id)
[docs] def get_raw_data_set( self, data_set_id: str, level_id: LevelIdType, ) -> RawDataSet: """Get the raw data set for a given data set id and a level. Parameters ---------- data_set_id The id of the raw data set that will be retrieved. level_id The level id from which the raw data set is to retrieved. Returns ------- RawDataSet The raw data set with the matching id. """ return self.get_level(level_id).get_raw_data_set(data_set_id)
[docs] def get_measure_set(self, level_id: Optional[LevelIdType] = None) -> MeasureSet: """Get measure_set from level identified with level_id.""" if not level_id: return self.measure_set return self.get_level(level_id).measure_set
[docs] def get_merged_measure_set(self) -> MeasureSet: """Get a measure set containing all the reading's measure values.""" return sum( (self.measure_set, *(level.measure_set for level in self.levels)), MeasureSet(), )
[docs] @singledispatchmethod def set(self, value, **kwargs): """Set a value inside a reading.""" raise TypeError(f"Unsupported set type: {type(value)}")
def _get_level(self, level: Optional[Union[LevelIdType, Level]] = None) -> Level: """Get level from id or level itself.""" if isinstance(level, Level): return level return self.get_level(level) @set.register(MeasureSet) def _measure_set( self, value: MeasureSet, level: Optional[Union[LevelIdType, Level]] = None, ): if level is None: self.measure_set += value else: self._get_level(level).set(value) @set.register(MeasureValue) def _measure_value( self, value: MeasureValue, level: Optional[Union[LevelIdType, Level]] = None, epoch: Optional[LevelEpoch] = None, ): if epoch is not None: epoch.set(value) else: if level is None: measure_set = self.measure_set else: measure_set = self._get_level(level).measure_set measure_set.set(value) @set.register(RawDataSet) def _raw_data_set( self, value: RawDataSet, level: Union[LevelIdType, Level], concatenate: bool = False, overwrite: bool = False, ): self._get_level(level).set(value, concatenate=concatenate, overwrite=overwrite) @set.register(LevelEpoch) def _epoch_measure_set(self, value: LevelEpoch, level: Union[LevelIdType, Level]): self._get_level(level).set(value) @set.register(Level) def _level(self, value: Level): """Set a level.""" level_id_str = str(value.id) for lev in self._levels: if str(lev).startswith(level_id_str) and level_id_str in self._attempt: self._attempt[level_id_str] += 1 break if level_id_str not in self._attempt: new_level = LevelId.from_str(level_id_str) self._levels[new_level] = value # type: ignore self._attempt[str(new_level.id)] = 1 else: new_level_id_str = "-".join( [level_id_str, str(self._attempt[level_id_str]).zfill(2)] ) value.id = cast(LevelId, LevelId.from_str(new_level_id_str)) self._levels[value.id] = value # TODO: use sorting by effective time frame to ensure orders to # attempts : # level_ids = sorted(level_ids, key=lambda x: # reading.get_level(x).effective_time_frame.start ) self._levels[value.id].context.set( value=self._attempt[level_id_str], definition=ValueDefinition( id_="attempt", name=f"The attempt number: {self._attempt[level_id_str]}" ), ) @set.register(Flag) def _set_flag(self, value: Flag): self.add_flag(value)
EntityType = Union[Reading, Level, RawDataSet, MeasureValue, LevelEpoch]