# pylint: disable=duplicate-code
"""Drawing test related functionality.
This module contains functionality to extract measures for the *Drawing* test
(DRAW).
"""
import warnings
from abc import ABCMeta
from functools import partial
from itertools import product
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Sequence,
Set,
Tuple,
Union,
cast,
)
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
from dispel.data.core import Reading
from dispel.data.flags import FlagSeverity, FlagType
from dispel.data.levels import Level
from dispel.data.measures import MeasureValue, MeasureValueDefinitionPrototype
from dispel.data.raw import (
DEFAULT_COLUMNS,
PRESSURE_VALIDATOR,
USER_ACC_MAP,
RawDataSet,
RawDataSetDefinition,
RawDataSetSource,
RawDataValueDefinition,
)
from dispel.data.validators import GREATER_THAN_ZERO, RangeValidator
from dispel.data.values import AbbreviatedValue as AV
from dispel.data.values import ValueDefinition
from dispel.io.raw import generate_raw_data_value_definition
from dispel.processing.core import ErrorHandling, ProcessingStep, ProcessResultType
from dispel.processing.data_set import RawDataSetProcessingResult, transformation
from dispel.processing.extract import (
BASIC_AGGREGATIONS,
DEFAULT_AGGREGATIONS,
DEFAULT_AGGREGATIONS_Q95_CV,
AggregateMeasures,
AggregateModalities,
AggregateRawDataSetColumn,
ExtractMultipleStep,
ExtractStep,
MeasureDefinitionMixin,
)
from dispel.processing.flags import flag
from dispel.processing.level import (
FlagLevelStep,
LevelFilter,
LevelIdFilter,
LevelProcessingResult,
LevelProcessingStep,
LevelProcessingStepProtocol,
ProcessingStepGroup,
)
from dispel.processing.level_filters import NotEmptyDatasetFilter
from dispel.processing.modalities import (
HandModality,
HandModalityFilter,
SensorModality,
)
from dispel.processing.transform import ConcatenateLevels, TransformStep
from dispel.providers.generic.activity.orientation import UpperLimbOrientationFlagger
from dispel.providers.generic.flags.ue_flags import OnlyOneHandPerformed
from dispel.providers.generic.sensor import (
FREQ_20HZ,
FREQ_60HZ,
RenameColumns,
Resample,
SetTimestampIndex,
TransformUserAcceleration,
)
from dispel.providers.generic.tasks.draw.modalities import (
AttemptModality,
AttemptModalityFilter,
ShapeModality,
ShapeModalityFilter,
)
from dispel.providers.generic.tasks.draw.shapes import get_user_path, get_valid_path
from dispel.providers.generic.tasks.draw.touch import DrawShape, DrawTouch
from dispel.providers.generic.tremor import TremorMeasures
from dispel.signal.core import euclidean_norm, sparc
from dispel.stats.core import variation
TASK_NAME = AV("Drawing test", "DRAW")
# The definition of the DrawShape raw data set.
_SHAPE_DEFINITION = [generate_raw_data_value_definition("shape")]
#: The new names for reference and user trajectories (x and y coordinates).
_TRAJECTORY = ["x", "y"]
_UNITS = ["float64", "float64"]
_TRAJECTORY_DEFINITIONS = list(
map(generate_raw_data_value_definition, _TRAJECTORY, _UNITS)
)
RADIUS = 8
r"""The radius of the circle used to consider if a point from the reference
path has been covered by the user trajectory. The value of 8 corresponds to
the size of a fingertip."""
SHAPE_TO_ABBR = {
"square_clock": "sc",
"square_counter_clock": "scc",
"infinity": "inf",
"spiral": "spi",
}
DRAWING_SIM_MEDIAN_Q95_MINUS_Q05 = {
"inf": 8.096,
"sc": 8.641,
"scc": 8.952,
"spi": 6.674,
}
r"""The distance between quantile 95 and quantile 5 of similarity median on
healthy population for the different shape."""
DRAWING_SIM_MEDIAN_MEAN = {"inf": 6.353, "sc": 5.753, "scc": 5.989, "spi": 5.821}
r"""Mean of similarity median on healthy population for the different shape."""
DRAWING_USER_DURATION_Q95_MINUS_Q05 = {
"inf": 6012,
"sc": 5699,
"scc": 5738,
"spi": 8671,
}
r"""The distance between quantile 95 and quantile 5 of user duration on
healthy population for the different shape."""
DRAWING_USER_DURATION_MEAN = {
"inf": 4004,
"sc": 3877,
"scc": 3985,
"spi": 5571,
}
r"""Mean of user duration on healthy population for the different shape."""
[docs]
def get_user_duration(data: pd.DataFrame) -> float:
"""Compute the duration of the total interaction of the user with the test.
Also compute the reaction time of the user between the beginning of the
test and his first interaction.
Parameters
----------
data
A pandas data frame composed of at least the user path and associated
timestamps as index.
Returns
-------
float
The total duration (in milliseconds) of the user drawing action.
"""
timestamps = data.index
return (timestamps.max() - timestamps.min()).total_seconds() * 1e3
[docs]
def get_instant_speed(data: pd.DataFrame) -> pd.DataFrame:
"""Compute the instantaneous speed of the drawing.
Parameters
----------
data
A pandas data frame composed of at least the user path and associated
timestamps as index.
Returns
-------
numpy.float64
The instantaneous speed of the drawing.
"""
# Get rid of duplicated timestamps
shape_data = data["shape"][0]
data = getattr(shape_data, "valid_data")
data = data[~data.index.duplicated(keep="last")]
dist = euclidean_norm(data[["x", "y"]].diff()).astype(float)
speed = dist / data.index.to_series().diff().dt.total_seconds()
return pd.DataFrame(dict(distance=dist, speed=speed))
[docs]
def get_speed_accuracy(data: pd.DataFrame, mean_dist: float) -> float:
"""Compute the speed accuracy of the user for a given level.
Parameters
----------
data
A pandas data frame corresponding to the
:class:`~dispel.providers.generic.tasks.draw.touch.DrawShape` data of the given
level.
mean_dist
The mean dtw minimum distance for the given level.
Returns
-------
float
The speed accuracy for the given level (unit: point-1.ms-1).
Raises
------
AssertionError
If ``speed * accuracy`` is equal to zero and ends up with a
ZeroDivisionError for the ratio: ``1 / (speed * accuracy)``.
"""
duration_params = get_user_duration(data)
# explicit time depending on accuracy
try:
speed_accuracy = 1 / (duration_params * mean_dist)
except ZeroDivisionError as exception:
raise AssertionError(
"``speed * accuracy`` cannot be equal to zero."
) from exception
return speed_accuracy
[docs]
def reaction_time(data: pd.DataFrame, level: Level) -> float:
"""Compute the reaction time.
The reaction time of the user between the shape appearance and the first
touch event.
Parameters
----------
data
pandas data frame containing at least 'tsTouch' pd Series.
level
The level to be processed.
Returns
-------
float
the user's reaction time for the given level (in milliseconds).
"""
first_touch = data.tsTouch.min()
level_start = level.start
return (first_touch - level_start).total_seconds() * 1e3
[docs]
def wrap_reaction_time(data: pd.DataFrame, level: Level) -> pd.Series:
"""Wrap reaction time in a Series for a better aggregation.
Parameters
----------
data
pandas data frame containing at least 'tsTouch' pd Series.
level
The level to be processed.
Returns
-------
pandas.Series
A pandas Series of the user's reaction time for the given level
(in milliseconds).
"""
return pd.Series({"reaction_time": reaction_time(data, level)})
[docs]
class CreateShapes(LevelProcessingStep):
"""A LevelProcessingStep to create a ``DrawShape`` per level."""
[docs]
def process_level(
self, level: Level, reading: Reading, **kwargs
) -> ProcessResultType:
"""Process the provided Level.
Parameters
----------
level
The level to be processed
reading
The reading to be processed
kwargs
Additional arguments passed by :meth:`process_level`.
Yields
------
ProcessResultType
Passes through anything that is yielded from the
:meth:`process_level` function.
"""
def _create_draw_shapes(
level: Level, reading: Reading
) -> Tuple[pd.DataFrame, bool]:
try:
shape_data = DrawShape.from_level(level, reading)
except AssertionError as assertion_error:
warnings.warn(
f"When creating shape for level {str(level.id)} the"
f"post init raised an error: {assertion_error}",
Warning,
)
return pd.DataFrame(columns=["shape"]), False
return pd.DataFrame({"shape": [shape_data]}), True
data, error = _create_draw_shapes(level, reading)
# second condition
level.context.set(
error,
ValueDefinition(
"is_creatable_shape",
"If a shape does not raise an error when creating from level.",
description="True if the shape is creatable False otherwise.",
),
)
raw_data_set = RawDataSet(
definition=RawDataSetDefinition(
id="shape",
source=RawDataSetSource("ads"),
value_definitions_list=_SHAPE_DEFINITION,
),
data=data,
)
yield RawDataSetProcessingResult(
step=self,
sources=level.get_raw_data_set("screen"),
level=level,
result=raw_data_set,
)
[docs]
class ValidPathAssertionMixin(LevelProcessingStepProtocol, metaclass=ABCMeta):
"""Assertion mixin to ensure a valid path is present."""
#: The error handling should no valid path be obtained
missing_path_error_handling = ErrorHandling.IGNORE
[docs]
def assert_valid_level(self, level: Level, reading: Reading, **kwargs):
"""Assert that there are valid paths."""
if not level.context.get_raw_value("is_valid_path"):
raise AssertionError("Invalid user path", self.missing_path_error_handling)
[docs]
class CreatableShape(LevelFilter):
"""A level filter to fetch level with creatable shapes only."""
[docs]
def repr(self) -> str:
"""Get representation of the filter."""
return "Creatable shapes"
[docs]
def filter(self, levels: Iterable[Level]) -> Union[Set, Set[Level]]:
"""Keep level with a creatable shape from level."""
out = set()
for level in levels:
if (
"is_creatable_shape" in level.context
and level.context.get_raw_value("is_creatable_shape") is True
):
out.add(level)
return out
def _flag_level_is_continuous(level: Level):
"""Return False if the level include a non-continuous shape."""
if not level.has_raw_data_set("screen"):
return True
screen = level.get_raw_data_set("screen").data
# Flag there is not several touchPathId
if "inEndZone" not in screen.columns:
return len(screen["touchPathId"].unique()) == 1
# Flag there is not several down touchAction
condition_down = (screen["touchAction"] == "down").sum() == 1
# Flag there is not several up touchAction
condition_up = (screen["touchAction"] == "up").sum() <= 1
return condition_down & condition_up
[docs]
class ContinuousLevel(LevelFilter):
"""Filter for continuous drawing shape."""
[docs]
def repr(self):
"""Get representation of the filter."""
return "only continuously drawn shapes"
[docs]
def filter(self, levels: Iterable[Level]) -> Set[Level]:
"""Filter levels with continuous drawn shapes."""
return set(filter(_flag_level_is_continuous, levels))
[docs]
class ValidUserPath(LevelFilter):
"""A level filter to fetch level with valid user path only."""
[docs]
def repr(self) -> str:
"""Get representation of the filter."""
return "valid user path"
[docs]
def filter(self, levels: Iterable[Level]) -> Union[Set, Set[Level]]:
"""Keep level with a valid user path."""
out = set()
for level in levels:
if "is_valid_shape" in level.context and level.context.get_raw_value(
"is_valid_shape"
):
out.add(level)
return out
[docs]
class FlagContinuousDrawing(FlagLevelStep):
"""Flag the user do not lift the finger while drawing."""
task_name = TASK_NAME
flag_name = AV("continous drawing", "continuous_drawing")
flag_type = FlagType.BEHAVIORAL
flag_severity = FlagSeverity.DEVIATION
reason = (
"The drawing is not continuous, the user has lifted the finger"
"during level {level_id}."
)
@flag
def _check_continuous_drawing(self, level: Level, **kwargs) -> bool:
self.set_flag_kwargs(level_id=level.id, **kwargs)
return _flag_level_is_continuous(level)
[docs]
class InfinityShapes(LevelFilter):
"""A level filter to fetch level from infinity shapes."""
[docs]
def repr(self) -> str:
"""Get representation of the filter."""
return "infinity shapes"
[docs]
def filter(self, levels: Iterable[Level]) -> Union[Set, Set[Level]]:
"""Get infinity shapes."""
out = set()
for level in levels:
if "inf" in str(level.id):
out.add(level)
return out
[docs]
class CreatableShapeFlag(FlagLevelStep):
"""A Transform step to determine if the shape is creatable in a level."""
[docs]
@staticmethod
def shape_is_creatable(level: Level, **_kwargs):
"""Assert that the shape can be created from the level."""
if "is_creatable_shape" in level.context:
return level.context.get_raw_value("is_creatable_shape")
return False
task_name = TASK_NAME
flag_name = AV("draw creatable shape", "draw_creatable_shape")
flag_type = FlagType.TECHNICAL
flag_severity = FlagSeverity.DEVIATION
reason = "The shape was impossible to create from the level. "
flagging_function = shape_is_creatable
[docs]
class AggregateInstantSpeed(AggregateRawDataSetColumn):
"""Extract instant speed measures."""
data_set_ids = "instantaneous_speed"
[docs]
def __init__(self) -> None:
definition = MeasureValueDefinitionPrototype(
measure_name=AV("instant speed", "speed"),
data_type="float64",
unit="point.s-1",
description="The {aggregation} of the instantaneous speed while "
"drawing.",
)
super().__init__(
data_set_id="instantaneous_speed",
column_id="speed",
aggregations=DEFAULT_AGGREGATIONS_Q95_CV,
definition=definition,
)
def _extract_speed_acc(
path: pd.DataFrame,
matches: pd.DataFrame,
agg: str,
) -> np.float64:
"""Extract speed accuracy."""
return get_speed_accuracy(path, float(_dtw_agg_dist(agg, matches))) # type: ignore
def _extract_dur_acc_normed_combined(
data: pd.DataFrame, matches: pd.DataFrame, shape: str
) -> float:
"""Extract duration accuracy normed and combined."""
shape_abbr = SHAPE_TO_ABBR[shape.split("-")[0]]
duration_params = get_user_duration(data)
normed_score_duration = (
duration_params / DRAWING_USER_DURATION_Q95_MINUS_Q05[shape_abbr]
)
similarity_median = float(_dtw_agg_dist("median", matches))
normed_sim_median = similarity_median / DRAWING_SIM_MEDIAN_Q95_MINUS_Q05[shape_abbr]
return normed_score_duration + normed_sim_median
def _dtw_agg_dist(agg: str, matches: pd.DataFrame):
"""Extract aggregated minimum distance."""
if agg == "mean":
return np.mean(matches["min_distance"])
if agg == "median":
return np.median(matches["min_distance"])
raise ValueError(f"Aggregation -{agg} is not defined for similarity")
[docs]
class DrawTremorMeasures(ProcessingStepGroup):
"""A group of drawing processing steps for tremor measures.
Parameters
----------
hand
The hand on which the tremor measures are to be computed.
sensor
The sensor on which the tremor measures are to be computed.
"""
[docs]
def __init__(self, hand: HandModality, sensor: SensorModality):
data_set_id = str(sensor)
steps = [
RenameColumns(data_set_id, **USER_ACC_MAP),
SetTimestampIndex(
f"{data_set_id}_renamed", DEFAULT_COLUMNS, duplicates="last"
),
Resample(
data_set_id=f"{data_set_id}_renamed_ts",
aggregations=["mean", "ffill"],
columns=DEFAULT_COLUMNS,
freq=FREQ_20HZ,
),
TremorMeasures(
sensor=sensor, data_set_id=f"{data_set_id}_renamed_ts_resampled"
),
]
super().__init__(
steps,
task_name=TASK_NAME,
modalities=[hand.av],
hand=hand,
level_filter=LevelIdFilter(f"{hand.abbr}-all")
& NotEmptyDatasetFilter(data_set_id),
)
[docs]
class DrawIntentionalTremorMeasures(ProcessingStepGroup):
"""A group of drawing processing steps for tremor measures."""
[docs]
def __init__(self) -> None:
data_set_id = "deceleration"
new_column_names = {
"x": "x_traj",
"y": "y_traj",
"min_distance": "diss",
"tsTouch": "ts",
}
steps = [
RenameColumns(data_set_id, **new_column_names),
SetTimestampIndex(
f"{data_set_id}_renamed",
["x_traj", "y_traj", "diss"],
duplicates="last",
),
Resample(
f"{data_set_id}_renamed_ts",
aggregations=["mean", "ffill"],
columns=["x_traj", "y_traj", "diss"],
freq=FREQ_60HZ,
),
TremorMeasures(
sensor=SensorModality.INTENTIONAL,
data_set_id=f"{data_set_id}_renamed_ts_resampled",
add_norm=False,
add_average_signal=False,
columns=["diss", "x_traj", "y_traj"],
),
]
super().__init__(steps, level_filter=NotEmptyDatasetFilter(data_set_id))
[docs]
class DRAWProcessingStepsGroupAll(ProcessingStepGroup):
"""Processing group for all aggregated levels."""
[docs]
def __init__(self) -> None:
steps = [ExtractPressureAll(), ExtractPressureAllCV(), ExtractReactionTimeAll()]
super().__init__(
steps, task_name=TASK_NAME, level_filter=LevelIdFilter("all_levels")
)
[docs]
class ExtractCornerMeanDistance(ExtractShapeMixIn, ValidPathAssertionMixin):
"""Extract c accuracy."""
[docs]
@staticmethod
def extract_mean_corner_max_dist(
distances: Tuple[float, float, float],
) -> np.float64:
"""Extract mean maximum corner Frechet distance."""
return np.mean(distances) # type: ignore
properties = "corners_max_dist"
extract = extract_mean_corner_max_dist
definition = MeasureValueDefinitionPrototype(
measure_name=AV("mean corner max distance", "corner"),
data_type="float64",
unit="point",
validator=RangeValidator(
lower_bound=-812,
upper_bound=812,
),
description="The mean maximum distances from corners of the subject"
"while drawing {shape} shape with their {hand} hand for "
"the {attempt} attempt.",
)
[docs]
class AggregateCornerMeanDistance(AggregateMeasures):
"""Aggregate mean corner max distance measure over all attempts."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-corner"]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("mean corner max distance", "corner"),
data_type="float64",
unit="point",
aggregation="mean",
validator=RangeValidator(
lower_bound=-812,
upper_bound=812,
),
description="The mean maximum distances from corners of the subject"
"while drawing over all attempts.",
)
[docs]
class ExtractAxesMeanDistance(ExtractShapeMixIn, ValidPathAssertionMixin):
"""Extract mean overshoot distance."""
[docs]
@staticmethod
def extract_mean_axes_dist(
distances: Tuple[float, float, float],
) -> np.float64:
"""Extract mean overshoot distances."""
return np.mean(distances) # type: ignore
properties = "axis_overshoots"
extract = extract_mean_axes_dist
definition = MeasureValueDefinitionPrototype(
measure_name=AV("mean axes overshoots", "axes_over"),
data_type="float64",
unit="point",
validator=RangeValidator(
lower_bound=-812,
upper_bound=812,
),
description="The mean overshoot distance from axes of the subject "
"while drawing {shape} shape with their {hand} hand for "
"the {attempt}. To ensure a unbiased value distribution, "
"if the user does not go beyond an axis (no overshoot), "
"the value will be negative. A user performing the "
"drawing close to perfectly will thus present a average "
"score close to zero.",
)
[docs]
class AggregateAxesMeanDistance(AggregateMeasures):
"""Aggregate mean corner max distance measure over all attempts."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [
f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-axes_over"
]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("mean axes overshoots", "axes_over"),
data_type="float64",
unit="point",
aggregation="mean",
validator=RangeValidator(lower_bound=-812, upper_bound=812),
description="The mean overshoot distance from axes of the subject "
"while drawing over all attempts. To ensure a unbiased "
"value distribution, if the user does not go beyond an "
"axis (no overshoot), the value will be negative. A user "
"performing the drawing close to perfectly will thus "
"present a average score close to zero.",
)
[docs]
class DrawAggregateModalitiesByHand(AggregateModalities):
"""Base step to aggregate measures by hand for DRAW task.
From the definition of the measure, all the measures for the different
shapes and attempts are retrieved (see get_modalities).
"""
[docs]
def __init__(self, hand: HandModality):
self.hand = hand
super().__init__()
[docs]
def get_definition(self, **kwargs) -> ValueDefinition:
"""Get the definition."""
return cast(MeasureValueDefinitionPrototype, self.definition).create_definition(
modalities=[self.hand.av], hand=self.hand.av
)
[docs]
def get_modalities(self) -> List[List[Union[str, AV]]]:
"""Retrieve all modalities combinations for a given hand."""
ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
ids.append([self.hand.av, shape.av, attempt.av])
return ids
[docs]
class DrawAggregateModalitiesByHandAndShape(AggregateModalities):
"""Base step to aggregate measures by hand and shape for DRAW task.
From the definition of the measure, all the measures for the different
attempts are retrieved (see get_modalities).
"""
[docs]
def __init__(self, hand: HandModality, shape: ShapeModality):
self.hand = hand
self.shape = shape
super().__init__()
[docs]
def get_definition(self, **kwargs) -> ValueDefinition:
"""Get the definition."""
return cast(MeasureValueDefinitionPrototype, self.definition).create_definition(
modalities=[self.hand.av, self.shape.av],
hand=self.hand.av,
shape=self.shape,
)
[docs]
def get_modalities(self) -> List[List[Union[str, AV]]]:
"""Retrieve all modalities combinations for a given hand and shape."""
ids = []
for attempt in AttemptModality:
ids.append([self.hand.av, self.shape.av, attempt.av])
return ids
[docs]
class AggregateSimilarityByHand(DrawAggregateModalitiesByHand):
"""Average similarity values by hand."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("similarity", "sim"),
data_type="float64",
unit="point",
aggregation="mean",
validator=GREATER_THAN_ZERO,
description="The mean coupling distance between the ideal target "
"and the trajectory drawn with the {hand} hand for all "
"shapes and attempts. Coupling distance is measured "
"using dynamic time warping.",
)
[docs]
class AggregateSpeedSimilarityByHand(DrawAggregateModalitiesByHand):
"""Average speed/similarity values by hand."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("accuracy-normalized duration", "dur_acc"),
data_type="float64",
unit="point-1.ms-1",
aggregation="mean",
validator=GREATER_THAN_ZERO,
description="The accuracy of the subject while drawing with their "
"{hand} hand for all attempts normalized by the time "
"spend between the first and last interaction of the "
"subject with the screen. Accuracy is one over the "
"dissimilarity between reference shape and drawn shape "
"measured using dynamic time warping. See "
":func:`~dispel.signal.dtw.get_dtw_distance`.",
)
[docs]
class AggregateSparcByHand(DrawAggregateModalitiesByHand):
"""Average sparc values by hand."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("smoothness score", "smooth"),
data_type="float64",
validator=RangeValidator(upper_bound=0),
aggregation="mean",
description="A digital score of tremor using spectral arc length "
"(SPARC) measurement algorithm for all shapes and "
"attempts drawn with their {hand} hand.",
)
[docs]
class AggregateDurationByHand(DrawAggregateModalitiesByHand):
"""Average duration of drawing by hand."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("drawing duration", "user_dur"),
data_type="float64",
unit="ms",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The average time spend between the first and last "
"interaction of the subject with the screen while drawing "
"with their {hand} hand for all shapes and attempts.",
)
[docs]
class AggregateIntersectionsByHand(DrawAggregateModalitiesByHand):
"""Average total number of intersections by hand."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("intersections", "cross"),
data_type="int32",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The average number of times the user cross the shape "
"line with his finger while drawing with their {hand} "
"hand for all the shapes and attempts.",
)
[docs]
class AggregateIntersectionsPerSecondsByHand(DrawAggregateModalitiesByHand):
"""Average the mean number of intersections per second by hand."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("intersections per second", "cross_per_sec"),
data_type="float64",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The average number of intersection per second the "
"user performs with his finger while drawing all shapes "
"with their {hand} hand for all shapes and attempts.",
)
[docs]
class AggregateSimilarity(AggregateMeasures):
"""Average similarity values by hand."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [
f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-sim-mean"
]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("similarity", "sim"),
data_type="float64",
unit="point",
aggregation="mean",
validator=GREATER_THAN_ZERO,
description="The mean coupling distance between the ideal target "
"and the trajectory drawn with both hands for all "
"shapes and attempts. Coupling distance is measured "
"using dynamic time warping.",
)
[docs]
class AggregateSpeedSimilarity(AggregateMeasures):
"""Average speed/similarity globally."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [
f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-dur_acc-mean"
]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("accuracy-normalized duration", "dur_acc"),
data_type="float64",
unit="point-1.ms-1",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The mean of the accuracy of the subject while drawing "
"with both hands for all shapes and attempts normalized "
"by the time spend between the first and last interaction "
"of the subject with the screen. Accuracy is one over the "
"dissimilarity between reference shape and drawn shape "
"measured using dynamic time warping. See "
":func:`~dispel.signal.dtw.get_dtw_distance`.",
)
[docs]
class AggregateSparc(AggregateMeasures):
"""Average smoothness scores globally."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-smooth"]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("smoothness score", "smooth"),
data_type="float64",
validator=RangeValidator(upper_bound=0),
aggregation="mean",
description="A digital score of tremor using spectral arc length "
"(SPARC) measurement algorithm for all shapes and "
"attempts drawn with both hands.",
)
[docs]
class AggregateDuration(AggregateMeasures):
"""Average duration of drawing on all shapes, attempts and hands."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [
f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-user_dur"
]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("drawing duration", "user_dur"),
data_type="float64",
unit="ms",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The mean time spend between the first and last "
"interaction of the subject with the screen while drawing "
"all shapes and attempts with both hands.",
)
[docs]
class AggregateIntersectionsPerSeconds(AggregateMeasures):
"""Average the number of intersections per second globally."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [
f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-cross_per_sec"
]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("intersections per second", "cross_per_sec"),
data_type="float64",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The average number of intersection per second the "
"user performs with his finger while drawing with both "
"hands for all shapes and attempts.",
)
[docs]
class AggregateIntersections(AggregateMeasures):
"""Average the number of intersections globally."""
measure_ids = []
for shape in ShapeModality:
for attempt in AttemptModality:
for hand in HandModality:
measure_ids += [f"draw-{hand.abbr}_{shape.abbr}_{attempt.abbr}-cross"]
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("intersections", "cross"),
data_type="int32",
validator=GREATER_THAN_ZERO,
aggregation="mean",
description="The number of times the user cross the shape line "
"with his finger while drawing with both hand for all the "
"attempts.",
)
[docs]
class AggregateDistanceThresholdByHandAndShape(DrawAggregateModalitiesByHandAndShape):
"""Aggregate Distance threshold flag values by hand and shape."""
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("distance threshold", "dist_thresh"),
data_type="bool",
description="Whether the user completes at least 80% of the expected "
"shape {shape} with their {hand} hand for both attempts.",
)
[docs]
class DrawUserDistanceThreshold(FlagLevelStep):
"""Flag if a drawing distance ratio is within expected range."""
task_name = TASK_NAME
flag_name = AV("distance threshold", "dist_thresh")
flag_type = FlagType.BEHAVIORAL
flag_severity = FlagSeverity.DEVIATION
reason = (
"The user drawing distance ratio is outside the expected "
"range of 0.8 - 1.20 for the level {level_id}."
)
level_filter = ValidUserPath() & CreatableShape()
@flag
def _check_distance_threshold(self, level: Level, **_kwargs) -> bool:
self.set_flag_kwargs(level_id=level.id)
return (
level.get_raw_data_set("shape")
.data["shape"][0]
.check_dist_thresh(RangeValidator(lower_bound=0.8, upper_bound=1.20))
)
[docs]
class FlagCompleteDrawing(FlagLevelStep):
"""Flag the drawing is complete."""
task_name = TASK_NAME
flag_name = AV("complete drawing", "complete_drawing")
flag_type = FlagType.TECHNICAL
flag_severity = FlagSeverity.DEVIATION
reason = (
"The drawing is not complete, the user has not reached the"
"endZone or is not considered completed in the context during "
"level {level_id}."
)
@flag
def _check_complete_drawing(self, level: Level, **kwargs) -> bool:
self.set_flag_kwargs(level_id=level.id, **kwargs)
screen = level.get_raw_data_set("screen").data
if "inEndZone" in screen:
return screen.inEndZone.any()
return level.context.get_raw_value("drawing_figure_completed")
[docs]
class DrawingFlag(ProcessingStepGroup):
"""Processing group for all drawing flag."""
[docs]
def __init__(self) -> None:
steps = [
TransformUserAcceleration(),
UpperLimbOrientationFlagger(),
OnlyOneHandPerformed(task_name=TASK_NAME),
]
super().__init__(steps, task_name=TASK_NAME)
[docs]
def compute_pacman_score(
shape_dataset: pd.DataFrame,
) -> float:
"""Compute the pacman score for a level.
Parameters
----------
shape_dataset: pd.DataFrame
A data frame containing the DrawShape object
Returns
-------
float
The pacman score.
"""
# Get the up-sampled path and the reference from the shape dataset
user_path = shape_dataset["shape"][0].aggregate_valid_touches.valid_up_sampled_path
reference = shape_dataset["shape"][0].reference
user_x = user_path["x"].dropna().to_numpy()
user_y = user_path["y"].dropna().to_numpy()
ref_x = reference["x"].to_numpy()
ref_y = reference["y"].to_numpy()
path = np.column_stack((user_x, user_y))
ref = np.column_stack((ref_x, ref_y))
# Is the distance between target and ref points within the radius range
eaten_by_user = (cdist(path, ref) < RADIUS).any(axis=0)
return np.sum(eaten_by_user) / len(eaten_by_user)
[docs]
class AddRawPacmanScore(ExtractStep, ValidPathAssertionMixin):
"""
Add the raw pacman score.
A target's point is considered as 'eaten' if that point is within the
radius range of any other user path's point. The pacman score is the ratio
between the number of 'eaten' points and the total number of target's
point.
"""
[docs]
def __init__(self, level_filter: LevelFilter, **kwargs: object) -> None:
data_set_ids = "shape"
transform_function = compute_pacman_score
definition = MeasureValueDefinitionPrototype(
task_name=TASK_NAME,
measure_name=AV("raw pacman score", "raw_pacman_score"),
description="The raw pacman score is the ratio between the number "
"of eaten points and the total number of target points"
". A target point is considered eaten if that point is"
"within a radius range of any other point of the user "
"path. See Radius definition for more information.",
data_type="float",
**kwargs,
)
super().__init__(
data_set_ids=data_set_ids,
transform_function=transform_function,
definition=definition,
level_filter=level_filter,
)
[docs]
class DrawOppositeDirection(FlagLevelStep):
"""Flag infinity shape drawn clockwise."""
task_name = TASK_NAME
flag_name = AV("opposite direction", "opp_direction")
flag_type = FlagType.BEHAVIORAL
flag_severity = FlagSeverity.DEVIATION
reason = "The user is drawing in the opposite direction for the level {level_id}."
level_filter = InfinityShapes() & CreatableShape() & ContinuousLevel()
@flag
def _assess_valid_direction(self, level: Level, **_kwargs) -> bool:
self.set_flag_kwargs(level_id=level.id)
# This flag only applies to infinity shapes
if "inf" not in str(level.id):
raise ValueError("This flag should run for infinity shape only.")
# Get the shape
_shape = level.get_raw_data_set("shape").data["shape"][0]
# Compute the x coordinate of the user_path of the first quarter
all_data = _shape.all_data
x_quarter = all_data[: len(all_data) // 4]["x"].median()
# center_x
x_min = _shape.reference.x.min()
x_max = _shape.reference.x.max()
center_x = x_min + (x_max - x_min) / 2
return x_quarter < center_x
[docs]
class DrawOvershootRemoval(FlagLevelStep):
"""Flag drawing for which we removed more than 10% of user path."""
level_filter = CreatableShape() & ValidUserPath() & ContinuousLevel()
task_name = TASK_NAME
flag_name = AV("overshoot removal", "excessive_overshoot_removal")
flag_type = FlagType.TECHNICAL
flag_severity = FlagSeverity.DEVIATION
reason = (
"The algorithm detected an overshoot of more than 10% for"
" the level {level_id}."
)
@flag
def _assess_overshoot_size(self, level: Level, **_kwargs) -> bool:
self.set_flag_kwargs(level_id=level.id)
# Get the shape
_shape = level.get_raw_data_set("shape").data["shape"]
if len(_shape) == 0:
return False
_shape = _shape[0]
len_up_wo_overshoot = len(_shape.up_sampled_data_without_overshoot)
len_up = len(_shape.up_sampled_valid_data)
return (abs(len_up - len_up_wo_overshoot) / len_up) < 0.10
STEPS: List[ProcessingStep] = []
STEPS += [
FlagCompleteDrawing(),
CreateShapes(),
CreatableShapeFlag(),
TransformValidUserPath(),
FlagContinuousDrawing(),
DrawOvershootRemoval(),
DrawOppositeDirection(),
DrawUserDistanceThreshold(),
DrawingFlag(),
]
for _hand in HandModality:
for _shape, _attempt in product(ShapeModality, AttemptModality):
STEPS += [DrawTransformAndExtract(_hand, _shape, _attempt)]
STEPS += [
ConcatenateLevels(
new_level_id=f"{_hand.abbr}-all",
data_set_id=["accelerometer", "gyroscope"],
level_filter=HandModalityFilter(_hand)
& ValidUserPath()
& CreatableShape()
& ContinuousLevel(),
)
]
STEPS += [
DrawTremorMeasures(_hand, sensor)
for sensor in [SensorModality.ACCELEROMETER, SensorModality.GYROSCOPE]
]
STEPS += [
AggregateSimilarityByHand(_hand),
AggregateSpeedSimilarityByHand(_hand),
AggregateSparcByHand(_hand),
AggregateDurationByHand(_hand),
AggregateIntersectionsPerSecondsByHand(_hand),
AggregateIntersectionsByHand(_hand),
]
STEPS += [
AggregateSimilarity(),
AggregateSpeedSimilarity(),
AggregateSparc(),
AggregateDuration(),
AggregateIntersectionsPerSeconds(),
AggregateIntersections(),
]
for _hand in HandModality:
for _shape in ShapeModality:
STEPS += [AggregateDistanceThresholdByHandAndShape(_hand, _shape)]
STEPS += [
ConcatenateLevels(
new_level_id="all_levels",
data_set_id=["screen", "reaction-time"],
level_filter=ValidUserPath()
& CreatableShape()
& ContinuousLevel()
& (
ShapeModalityFilter(ShapeModality.SPIRAL)
| ShapeModalityFilter(ShapeModality.INFINITY)
| ShapeModalityFilter(ShapeModality.SQUARE)
| ShapeModalityFilter(ShapeModality.SQUARE_COUNTER_CLOCK)
),
),
DRAWProcessingStepsGroupAll(),
AggregateCornerMeanDistance(),
AggregateAxesMeanDistance(),
]
# Extra pacman score measures
for hand in HandModality:
for shape, attempt in product(ShapeModality, AttemptModality):
modalities = [hand.av, shape.av, attempt.av]
STEPS.append(
ProcessingStepGroup(
[
AddRawPacmanScore(
level_filter=HandModalityFilter(hand)
& ShapeModalityFilter(shape)
& AttemptModalityFilter(attempt)
& ValidUserPath()
& CreatableShape()
& ContinuousLevel()
)
],
modalities=[hand.av, shape.av, attempt.av],
hand=hand.av,
shape=shape.av,
attempt=attempt.av,
)
)