"""All generic transformation steps related to the placement."""
from typing import List
import numpy as np
import pandas as pd
from dispel.data.raw import RawDataValueDefinition
from dispel.processing.transform import TransformStep
THRES_BELT_MIN_ABS_MEAN_GRAVITY_X_1 = np.cos(np.radians(53))
"""belt threshold 1: [0, 53] deg X axes max."""
THRES_PANT_MIN_ABS_MEAN_GRAVITY_Y_1 = np.cos(np.radians(45))
"""pant threshold 1: [0, 45] deg Y axes max."""
THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_1 = np.cos(np.radians(18))
"""table threshold 1: [0, 18] deg Z axis max (to be combined with low std)."""
THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_2 = np.cos(np.radians(8))
"""table threshold 2: [0, 8] deg Z axes max."""
THRES_HAND_MAX_MEAN_GRAVITY_Z_1 = -np.cos(np.radians(50))
"""handheld threshold 1: [8, 50] (or [18, 50]) deg Z axis max."""
THRES_TABL_MAX_STD_ACC_NORM_1 = 0.01
"""table threshold 1: standard deviation of acc norm."""
ROLLING_WINDOW_SIZE = "4s"
"""This is the size of the sliding window, denoting the duration between begin
and end of sliding window."""
ROLLING_STEP_SIZE = "100ms"
"""This is the step of the sliding window, denoting the size of the "sliding"
action, which is the length of sequence you move between each window."""
PLACEMENT_DEFINITIONS: List[RawDataValueDefinition] = [
RawDataValueDefinition(
id_="start_time",
name="start time",
description="A series of the start time for each merged time window.",
data_type="datetime64",
),
RawDataValueDefinition(
id_="end_time",
name="end time",
description="A series of the end time for each merged time window.",
data_type="datetime64",
),
RawDataValueDefinition(
id_="duration",
name="duration",
description="A series of the duration for each merged time window.",
data_type="float64",
),
RawDataValueDefinition(
id_="placement",
name="detected placement",
description="A string time series indicating the predicted"
"placement for each merged time window.",
data_type="str",
),
]
[docs]
def placement_classification_one_window(window_measures: pd.Series) -> str:
"""Classify placement for a window of measure data.
Parameters
----------
window_measures
A pd.Series containing the measure for a specific time
Returns
-------
str
The label of the predicted class
"""
if abs(window_measures.loc["mean_gravityX"]) > THRES_BELT_MIN_ABS_MEAN_GRAVITY_X_1:
# if phone placed roughly landscape -> belt
res = "belt"
elif (
abs(window_measures.loc["mean_gravityY"]) > THRES_PANT_MIN_ABS_MEAN_GRAVITY_Y_1
):
# if phone placed landscape -> pants
res = "pants"
elif (
abs(window_measures.loc["mean_gravityZ"]) > THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_1
) & (window_measures.loc["std_norm"] < THRES_TABL_MAX_STD_ACC_NORM_1):
# if phone placed face up/down and not moving -> table
res = "table"
elif (
abs(window_measures.loc["mean_gravityZ"]) > THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_2
):
# if phone placed precisely face up/down -> table
res = "table"
elif window_measures.loc["mean_gravityZ"] < THRES_HAND_MAX_MEAN_GRAVITY_Z_1:
# if phone placed face up with a tilt -> handheld
res = "handheld"
else:
# in all other cases -> else
res = "else"
return res
[docs]
def placement_classification(
data: pd.DataFrame,
rolling_window_size: str = ROLLING_WINDOW_SIZE,
rolling_step_size: str = ROLLING_STEP_SIZE,
) -> pd.DataFrame:
"""Extract measures and predict labels given the gravity and norm data.
Parameters
----------
data
A pd.DataFrame containing the gravity and accelerometer norm data
rolling_window_size
A string containing the size of the sliding window
(duration between begin and end of sliding window)
rolling_step_size
A string containing the step of the sliding window
(duration between centers of consecutive windows)
Returns
-------
pandas.DataFrame
A dataframe containing the extracted measures and label per window
"""
if "ts" in data.columns:
data = data.set_index("ts")
# extract placement measures
mean = (
data.loc[:, ["gravityX", "gravityY", "gravityZ"]]
.rolling(window=rolling_window_size, center=True)
.mean()
)
std = data.loc[:, ["norm"]].rolling(window=rolling_window_size, center=True).std()
# rename columns
mean.columns = "mean_" + mean.columns
std.columns = "std_" + std.columns
measures = pd.concat([mean, std], axis=1)
# downsampling here to introduce step size in the sliding window
# The sliding window computes a moving mean and moving standard deviation
# for each sample. We need to have a single measure value for each step
# in the overlapping sliding window process so we take the first.
measures = measures.resample(rolling_step_size).first()
predictions = measures.apply(placement_classification_one_window, axis=1)
measures_predictions = measures.copy()
measures_predictions["placement"] = predictions
return measures_predictions
[docs]
def merge_adjacent_annotations_rolling(
measures_predictions: pd.DataFrame, rolling_step_size: str = ROLLING_STEP_SIZE
) -> pd.DataFrame:
"""Merge adjacent annotations into segments of continuous labels.
Parameters
----------
measures_predictions
A pd.DataFrame containing the measures and predicted class
rolling_step_size
A str indicating the size of the sliding window
Returns
-------
pandas.DataFrame
contains start time, end time and label of each continuous segment
"""
annotations = measures_predictions[["placement"]].copy()
annotations["start_time"] = annotations.index
# add the end times as the start times plus rolling step size
annotations["end_time"] = annotations.index + pd.Timedelta(rolling_step_size)
annotations = annotations.astype({"placement": "category"})
# in the fixed window dataframe we look if there are differences between
# consecutive windows and if so we derive a new adjacent window index
annotations["window_index_adjacent"] = (
annotations.placement.cat.codes.diff().abs() > 0
).cumsum()
start = annotations.groupby("window_index_adjacent").start_time.first()
end = annotations.groupby("window_index_adjacent").end_time.last()
duration = (end - start).dt.total_seconds()
duration = duration.rename("duration")
placement = annotations.groupby("window_index_adjacent").placement.first()
# create the final dataframe
annotations_adjacent = pd.DataFrame(
data={
"start_time": start,
"end_time": end,
"duration": duration,
"placement": placement,
}
)
return annotations_adjacent
[docs]
def placement_classification_merged(
acc_ts: pd.DataFrame, acc_ts_euclidean_norm: pd.DataFrame
):
"""Concatenate accelerometer and norm data and call placement function.
Parameters
----------
acc_ts
A pd.DataFrame containing the acceleration and gravity axis
acc_ts_euclidean_norm
A pd.DataFrame containing the norm of the acceleration
Returns
-------
pandas.DataFrame
contains start time, end time and label of each continuous segment
"""
data = pd.concat([acc_ts, acc_ts_euclidean_norm], axis=1)
return merge_adjacent_annotations_rolling(placement_classification(data))
[docs]
class ClassifyPlacement(TransformStep):
"""Classify placement in 100 ms windows."""
data_set_ids = ["acc_ts_resampled", "acc_ts_resampled_euclidean_norm"]
transform_function = placement_classification_merged
definitions = PLACEMENT_DEFINITIONS
new_data_set_id = "placement_bouts"