Source code for dispel.providers.generic.activity.placement

"""All generic transformation steps related to the placement."""
from typing import List

import numpy as np
import pandas as pd

from dispel.data.raw import RawDataValueDefinition
from dispel.processing.transform import TransformStep

THRES_BELT_MIN_ABS_MEAN_GRAVITY_X_1 = np.cos(np.radians(53))
"""belt threshold 1: [0, 53] deg X axes max."""
THRES_PANT_MIN_ABS_MEAN_GRAVITY_Y_1 = np.cos(np.radians(45))
"""pant threshold 1: [0, 45] deg Y axes max."""
THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_1 = np.cos(np.radians(18))
"""table threshold 1: [0, 18] deg Z axis max (to be combined with low std)."""
THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_2 = np.cos(np.radians(8))
"""table threshold 2: [0, 8] deg Z axes max."""
THRES_HAND_MAX_MEAN_GRAVITY_Z_1 = -np.cos(np.radians(50))
"""handheld threshold 1: [8, 50] (or [18, 50]) deg Z axis max."""
THRES_TABL_MAX_STD_ACC_NORM_1 = 0.01
"""table threshold 1: standard deviation of acc norm."""

ROLLING_WINDOW_SIZE = "4s"
"""This is the size of the sliding window, denoting the duration between begin
 and end of sliding window."""

ROLLING_STEP_SIZE = "100ms"
"""This is the step of the sliding window, denoting the size of the "sliding"
 action, which is the length of sequence you move between each window."""


PLACEMENT_DEFINITIONS: List[RawDataValueDefinition] = [
    RawDataValueDefinition(
        id_="start_time",
        name="start time",
        description="A series of the start time for each merged time window.",
        data_type="datetime64",
    ),
    RawDataValueDefinition(
        id_="end_time",
        name="end time",
        description="A series of the end time for each merged time window.",
        data_type="datetime64",
    ),
    RawDataValueDefinition(
        id_="duration",
        name="duration",
        description="A series of the duration for each merged time window.",
        data_type="float64",
    ),
    RawDataValueDefinition(
        id_="placement",
        name="detected placement",
        description="A string time series indicating the predicted"
        "placement for each merged time window.",
        data_type="str",
    ),
]


[docs] def placement_classification_one_window(window_measures: pd.Series) -> str: """Classify placement for a window of measure data. Parameters ---------- window_measures A pd.Series containing the measure for a specific time Returns ------- str The label of the predicted class """ if abs(window_measures.loc["mean_gravityX"]) > THRES_BELT_MIN_ABS_MEAN_GRAVITY_X_1: # if phone placed roughly landscape -> belt res = "belt" elif ( abs(window_measures.loc["mean_gravityY"]) > THRES_PANT_MIN_ABS_MEAN_GRAVITY_Y_1 ): # if phone placed landscape -> pants res = "pants" elif ( abs(window_measures.loc["mean_gravityZ"]) > THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_1 ) & (window_measures.loc["std_norm"] < THRES_TABL_MAX_STD_ACC_NORM_1): # if phone placed face up/down and not moving -> table res = "table" elif ( abs(window_measures.loc["mean_gravityZ"]) > THRES_TABL_MIN_ABS_MEAN_GRAVITY_Z_2 ): # if phone placed precisely face up/down -> table res = "table" elif window_measures.loc["mean_gravityZ"] < THRES_HAND_MAX_MEAN_GRAVITY_Z_1: # if phone placed face up with a tilt -> handheld res = "handheld" else: # in all other cases -> else res = "else" return res
[docs] def placement_classification( data: pd.DataFrame, rolling_window_size: str = ROLLING_WINDOW_SIZE, rolling_step_size: str = ROLLING_STEP_SIZE, ) -> pd.DataFrame: """Extract measures and predict labels given the gravity and norm data. Parameters ---------- data A pd.DataFrame containing the gravity and accelerometer norm data rolling_window_size A string containing the size of the sliding window (duration between begin and end of sliding window) rolling_step_size A string containing the step of the sliding window (duration between centers of consecutive windows) Returns ------- pandas.DataFrame A dataframe containing the extracted measures and label per window """ if "ts" in data.columns: data = data.set_index("ts") # extract placement measures mean = ( data.loc[:, ["gravityX", "gravityY", "gravityZ"]] .rolling(window=rolling_window_size, center=True) .mean() ) std = data.loc[:, ["norm"]].rolling(window=rolling_window_size, center=True).std() # rename columns mean.columns = "mean_" + mean.columns std.columns = "std_" + std.columns measures = pd.concat([mean, std], axis=1) # downsampling here to introduce step size in the sliding window # The sliding window computes a moving mean and moving standard deviation # for each sample. We need to have a single measure value for each step # in the overlapping sliding window process so we take the first. measures = measures.resample(rolling_step_size).first() predictions = measures.apply(placement_classification_one_window, axis=1) measures_predictions = measures.copy() measures_predictions["placement"] = predictions return measures_predictions
[docs] def merge_adjacent_annotations_rolling( measures_predictions: pd.DataFrame, rolling_step_size: str = ROLLING_STEP_SIZE ) -> pd.DataFrame: """Merge adjacent annotations into segments of continuous labels. Parameters ---------- measures_predictions A pd.DataFrame containing the measures and predicted class rolling_step_size A str indicating the size of the sliding window Returns ------- pandas.DataFrame contains start time, end time and label of each continuous segment """ annotations = measures_predictions[["placement"]].copy() annotations["start_time"] = annotations.index # add the end times as the start times plus rolling step size annotations["end_time"] = annotations.index + pd.Timedelta(rolling_step_size) annotations = annotations.astype({"placement": "category"}) # in the fixed window dataframe we look if there are differences between # consecutive windows and if so we derive a new adjacent window index annotations["window_index_adjacent"] = ( annotations.placement.cat.codes.diff().abs() > 0 ).cumsum() start = annotations.groupby("window_index_adjacent").start_time.first() end = annotations.groupby("window_index_adjacent").end_time.last() duration = (end - start).dt.total_seconds() duration = duration.rename("duration") placement = annotations.groupby("window_index_adjacent").placement.first() # create the final dataframe annotations_adjacent = pd.DataFrame( data={ "start_time": start, "end_time": end, "duration": duration, "placement": placement, } ) return annotations_adjacent
[docs] def placement_classification_merged( acc_ts: pd.DataFrame, acc_ts_euclidean_norm: pd.DataFrame ): """Concatenate accelerometer and norm data and call placement function. Parameters ---------- acc_ts A pd.DataFrame containing the acceleration and gravity axis acc_ts_euclidean_norm A pd.DataFrame containing the norm of the acceleration Returns ------- pandas.DataFrame contains start time, end time and label of each continuous segment """ data = pd.concat([acc_ts, acc_ts_euclidean_norm], axis=1) return merge_adjacent_annotations_rolling(placement_classification(data))
[docs] class ClassifyPlacement(TransformStep): """Classify placement in 100 ms windows.""" data_set_ids = ["acc_ts_resampled", "acc_ts_resampled_euclidean_norm"] transform_function = placement_classification_merged definitions = PLACEMENT_DEFINITIONS new_data_set_id = "placement_bouts"