"""A module dedicated to drawing intersection detections."""
from datetime import datetime
from typing import List, Optional, Tuple
import numpy as np
import pandas as pd
from bentley_ottmann.planar import segments_intersect
from ground.core import geometries
from dispel.providers.generic.tasks.draw.shapes import get_valid_path
from dispel.signal.core import euclidean_distance, euclidean_norm
[docs]
class Point(geometries.Point):
"""Wrap Point object in a custom class."""
@property
def coordinates(self) -> Tuple[float, float]:
"""Return the coordinates of the ``Point`` object."""
return self.x, self.y
[docs]
def to_numpy(self) -> np.ndarray:
"""Transform point to numpy array."""
return np.array(self.coordinates)
def __sub__(self, other):
x = self.x - other.x
y = self.y - other.y
return Point(x, y)
[docs]
class Segment(geometries.Segment):
"""Wrap Segment object in a custom class."""
[docs]
def __init__(self, start: Point, end: Point): # pylint: disable=W0231
self._start, self._end = start, end
@property
def segment(self) -> Tuple[Tuple[float, float], Tuple[float, float]]:
"""Return the coordinates of the two points forming the segment."""
return self._start.coordinates, self._end.coordinates
@property
def distance(self) -> float:
"""Return the segment length."""
return euclidean_distance(*self.segment)
[docs]
def diff(self) -> Point:
"""Compute the difference between segment points."""
return self._end - self._start
def _create_segment(row: pd.Series) -> Segment:
"""Create a Segment object for model path."""
return Segment(Point(row["x"], row["y"]), Point(row["x-1"], row["y-1"]))
def _create_dist(row: pd.Series) -> float:
"""Extract distances from a specific model segment object."""
return row["seg"].distance
[docs]
def get_segment(data: pd.DataFrame) -> pd.DataFrame:
"""Get all `Segment` of all data points."""
new_data = data.copy()
new_data[["x-1", "y-1"]] = new_data[["x", "y"]].shift(1)
series = {"seg": _create_segment, "dist": _create_dist}
for ser, func in series.items():
new_data[ser] = new_data.apply(func, axis=1)
new_data["tot_length"] = new_data["dist"].cumsum()
return new_data[["seg", "dist", "tot_length"]]
[docs]
def get_ratio(data: pd.DataFrame) -> pd.Series:
"""Add the traveled distance ratio of a segment.
This get the traveled distance ratio of all user and model segments. This
ratio corresponds to the 'traveled' distance from zero to the segment over
the total distance of the drawn/ground truth shape.
Parameters
----------
data
A pandas data frame containing at least `dist` and `tot_length` pandas
Series.
Returns
-------
pandas.Series
The pandas series corresponding to the `ratio` defined as the total
length of a specific point from the origin of the draw over the total
length of the draw.
"""
return pd.Series(data["tot_length"] / data["dist"].sum(), name="ratio")
[docs]
def get_intersection_data(
user: pd.DataFrame, ref: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Transform `{level_id}_user_paths` data to study intersections.
This transform the `{level_id}_user_paths` data in order to get the
proper data to capture the number of intersections between the user path
and the model path.
Parameters
----------
user
The `{level_id}_user_paths` data frame.
ref
The reference trajectory corresponding to the current shape.
Returns
-------
Tuple[pandas.DataFrame, pandas.DataFrame]
The proper user data frame to compute tremor-related measures and the
proper reference data to compute tremor-related measures.
"""
new_user = user[["x", "y", "tsTouch"]].copy()
new_ref = ref.copy()
new_user = new_user.drop_duplicates(
subset=["x", "y"], keep="last", ignore_index=True
)
#: Get the euclidean relative norm of position-related model and user data.
new_user = new_user.join(euclidean_norm(new_user[["x", "y"]]).rename("norm"))
new_ref = new_ref.join(euclidean_norm(new_ref[["x", "y"]]).rename("norm"))
#: Get Segments, segment lengths and traveled distance for each pairs of
# consecutive points.
new_user = new_user.join(get_segment(new_user))
new_ref = new_ref.join(get_segment(new_ref))
#: Get the traveled distance over total length of a shape ratios for each
# point of the user and model paths.
new_ref = new_ref.join(get_ratio(new_ref))
new_user = new_user.join(get_ratio(new_user))
columns = ["seg", "norm", "ratio"]
columns_user = ["tsTouch", "seg", "norm", "ratio"]
return new_user[columns_user], new_ref[columns]
def _get_ratio_flag(
user: pd.DataFrame,
ref: pd.DataFrame,
index_user,
index_model,
threshold: Optional[float] = 0.30,
) -> float:
"""Check whether the intersection occurs in a meaningful area of shapes."""
return abs(ref["ratio"][index_model] - user["ratio"][index_user]) < threshold
def _get_angle_flag(
seg_user: Segment, seg_model: Segment, threshold: Optional[float] = 0.1
) -> bool:
"""Check if the segments involved in the intersection are collinear."""
v = seg_model.diff().to_numpy()
u = seg_user.diff().to_numpy()
c = np.dot(u, v) / np.linalg.norm(u) / np.linalg.norm(v)
angle = np.degrees(np.arccos(np.clip(c, -1, 1)))
return angle > threshold
def _data_filter_selection(
user: pd.DataFrame,
ref: pd.DataFrame,
user_index: int,
ratio_filter: Optional[float] = 0.1,
norm_filter: Optional[float] = 20.0,
) -> pd.DataFrame:
"""Apply filter on the norm and the distance ratio to get selected data."""
#: Compute the norm of the barycenter of the segment
norm_bary = (user["norm"][user_index] + user["norm"][user_index - 1]) / 2
ratio_user = user["ratio"][user_index]
#: Use the norm of the barycenter of the segment and the traveled
# distance ratio as filters to select only relevant segments to
# compare to.
model_data = ref.loc[
(ref["norm"] > norm_bary - norm_filter)
& (ref["norm"] < norm_bary + norm_filter)
& (
(ref["ratio"] < ratio_user + ratio_filter)
& (ref["ratio"] > ratio_user - ratio_filter)
),
"seg",
]
return model_data
[docs]
def get_intersection_measures(
user: pd.DataFrame,
ref: pd.DataFrame,
) -> pd.DataFrame:
"""
Compute the number of intersections between user and model paths.
Parameters
----------
user
The `{level_id}_intersection_detection` data frame.
ref
The reference trajectory corresponding to the current shape.
Returns
-------
pandas.DataFrame
A pandas data frame containing tremor measures related to path
intersection.
"""
def _check_intersections(model_seg: Segment, user_seg: Segment) -> bool:
"""Check if two given segments intersects."""
return segments_intersect([user_seg, model_seg])
intersections: List[datetime] = []
#: Loop over all segment (index == 0 belongs only to the first segment)
for id_user in range(1, user["norm"].size - 1):
seg_user = user["seg"][id_user]
model_data = _data_filter_selection(user, ref, user_index=id_user)
#: loop over all the selected model segments to compare to the
# current user segment.
checked = model_data.apply(lambda x, y=seg_user: _check_intersections(x, y))
if checked.any():
#: Get the index of the intersected model segment
id_model_seg = checked.loc[checked].index.values
else:
continue
for intersection in id_model_seg:
#: Check if the intersection is time consistent
ratio = _get_ratio_flag(user, ref, id_user, intersection)
#: Check if the user and model segments are
# non-collinear (approximated to angle < 0.1 degree)
angle = _get_angle_flag(seg_user, ref["seg"][intersection])
#: Check if the two previous conditions are respected.
if angle and ratio:
#: Store the first intersection.
if len(intersections) == 0:
intersections.append(user["tsTouch"][id_user])
#: Store the current intersection only if it is
# consistent with the sampling rate (50Hz ?).
# (Avoid non consistent intersections to be
# considered)
elif len(intersections) != 0 and (
(user["tsTouch"][id_user] - intersections[-1]).total_seconds()
> 0.02
):
intersections.append(user["tsTouch"][id_user])
#: Store the results into a pandas data frame.
res = pd.DataFrame(columns=["tsRaw", "tsDiff", "cross_per_sec", "freqDiff"])
res.tsRaw = pd.Series(intersections)
#: If no crossing, returns res with only 0 as number of intersection per
# second, np.nan for the rest
if len(res) == 0:
res.loc[0, "cross_per_sec"] = 0
return res
res["tsDiff"] = (res["tsRaw"].diff()).dt.total_seconds()
res["freqDiff"] = 1 / res["tsDiff"]
res["cross_per_sec"] = (
len(intersections) / (user.tsTouch.max() - user.tsTouch.min()).total_seconds()
)
return res
[docs]
def compute_intersection_analysis(
user: pd.DataFrame, reference: pd.DataFrame
) -> pd.DataFrame:
"""Compute the tremor-related measures according to intersections.
First get only the valid user trajectory, then format the data needed, and
then extract the measures.
Parameters
----------
user
A pandas data frame obtained via a
:class:`dispel.providers.generic.tasks.draw.touch.DrawShape`
reference
The reference trajectory corresponding to the current shape.
Returns
-------
pandas.DataFrame
The proper pandas data frame to compute tremor measures.
"""
new_data = get_valid_path(user)
new_user, new_ref = get_intersection_data(new_data, reference)
return get_intersection_measures(new_user, new_ref)