"""Keyboard functions to extract relevant properties for Typing."""
from difflib import SequenceMatcher
from typing import List, Optional, Union
import numpy as np
import pandas as pd
from unidecode import unidecode
from dispel.providers.bdh.tasks.typing.const import KeyState
[docs]
def compute_rt_slope(df_reaction_time: pd.DataFrame, col: str) -> float:
"""Compute the mean of the reaction time slope.
Parameters
----------
df_reaction_time : pd.DataFrame
The reaction time dataframe (first letter and first correct letter)
col : String
The column to be used : reaction_time or reaction_time_first_correct
Returns
-------
float
A float which is the mean slope of the reaction time (first letter
or first correct letter)
"""
return (
df_reaction_time[col].diff()
/ df_reaction_time.index.to_series().diff().dt.total_seconds()
).mean()
[docs]
def total_words_typed(word: pd.DataFrame) -> int:
"""Find total number of words typed.
The total number of words typed successfully is the number of True values
in the columns ``success``.
Parameters
----------
word: pd.DataFrame
A data frame containing the word information with the column
``success``.
Returns
-------
int
Number of words successfully typed.
"""
return word["success"].sum()
[docs]
def time_intervals(data: pd.DataFrame, interval_column: Optional[str]) -> pd.DataFrame:
"""Find time intervals between consecutive element of a data frame.
Parameters
----------
data: pd.DataFrame
Any data frame with a timestamp index.
interval_column: Optional[str]
When specified, indicates that the interval should be computed with
values from a column and not the index.
Returns
-------
pd.Series
Time intervals between samples.
"""
if not interval_column:
return data.index.to_series().diff().dt.total_seconds()
return data[interval_column].diff().dt.total_seconds()
[docs]
def word_duration(word: pd.DataFrame) -> pd.DataFrame:
"""Compute the time spent per word.
Parameters
----------
word: pd.DataFrame
A data frame containing the word information with the column
``success``.
Returns
-------
pd.Series
A data frame containing the time spent per word.
"""
return (
word["disappearance_timestamp"] - word["appearance_timestamp"]
).dt.total_seconds()
[docs]
def get_submission_state(word: pd.DataFrame, key_typed: pd.DataFrame) -> pd.DataFrame:
"""Compute the submission state.
Create a data frame that contains the state at each time a user types a
key. Available states are the following: ``Correct``, ``Incorrect``, and
``Correcting``. The state is ``Correct`` if the textbox is empty or if
it forms a subpart of the word displayed on the screen. The state is
incorrect whenever the user types a key (that is not backspace), leading
to a textbox that does not form a subpart of the word displayed on the
screen. Finally, the state is identified as correcting whenever the user
corrects its mistake by typing backspace; it can be seen as an
intermediate state between an incorrect and a correct state.
Parameters
----------
word : pd.DataFrame
A data frame listing all the words displayed on screen during the
test with the following columns: ``words``, ``appearance_timestamp``
and ``disappearance_timestamp``.
key_typed : pd.DataFrame
The dataframe with the list of key pressed by the user,
it should contain the columns: ``timestamp_out`` and ``key``.
Returns
-------
pd.DataFrame
A data frame that contains the user state after each keystroke.
"""
def _displayed_word(timestamp):
"""Find the word displayed on screen."""
return word["word"].values[
(timestamp < word["disappearance_timestamp"])
& (timestamp > word["appearance_timestamp"])
][0]
input_and_screen = pd.DataFrame(
{
"displayed_input": key_typed["displayed_input"],
"displayed_word": key_typed["timestamp_out"].apply(_displayed_word),
}
)
is_correct = input_and_screen.apply(
lambda x: unidecode(x["displayed_word"]).startswith(x["displayed_input"]),
axis=1,
)
backspace_pressed = key_typed["key"] == "Backspace"
word_changed = input_and_screen["displayed_word"] != input_and_screen[
"displayed_word"
].shift(1)
word_changed[0] = False
# update the states
submission_state: List[KeyState] = [KeyState.INIT] * len(is_correct)
for i in range(len(is_correct)):
if word_changed.iloc[i] or is_correct.iloc[i]:
submission_state[i] = KeyState.CORRECT
continue
if not is_correct.iloc[i - 1] and backspace_pressed.iloc[i]:
submission_state[i] = KeyState.CORRECTING
continue
if not is_correct.iloc[i]:
submission_state[i] = KeyState.INCORRECT
res = pd.DataFrame(
{
"submission_state": submission_state,
"displayed_word": input_and_screen["displayed_word"],
"displayed_input": input_and_screen["displayed_input"],
"key": key_typed["key"],
}
)
res["success"] = res["displayed_word"].apply(
lambda x: word.loc[word["word"] == x, "success"].values[0]
)
return res.set_index(key_typed["timestamp_out"])
[docs]
def detect_key_autocompletion(submission_state: pd.DataFrame) -> pd.DataFrame:
"""Detect the autocompletion at the key level.
Create a data frame that contains all the typed keys
the autocompletion status.
Parameters
----------
submission_state : pd.DataFrame
A data frame that contains the user state after each keystroke.
Returns
-------
pd.DataFrame
An new dataset with the displayed word, all the key typed and
autocompletion information.
"""
submission_state_copy = submission_state[["displayed_word", "key"]].copy()
# Detect the non autocompleted keys
to_keep = (submission_state_copy["key"].str.len() == 1) | (
submission_state_copy["key"].str.match("Backspace")
)
submission_state_copy["autocomplete_per_key"] = ~to_keep
return submission_state_copy
[docs]
def detect_word_autocompletion(keys_with_autocompletion: pd.DataFrame) -> pd.DataFrame:
"""Detect the autocompletion for all the words.
Parameters
----------
keys_with_autocompletion : pd.DataFrame
A data frame that .
Returns
-------
pd.DataFrame
The word dataset without the autocompleted words
"""
# Detect the words with autocompletion from the keys
per_word_autocompletion = keys_with_autocompletion.groupby(
"displayed_word", sort=False
).apply(lambda x: x["autocomplete_per_key"].any())
return pd.DataFrame(per_word_autocompletion, columns=["autocomplete_per_word"])
[docs]
def total_autocomplete(keys_with_autocompletion: pd.DataFrame) -> int:
"""Get the total number of autocompletions.
Parameters
----------
keys_with_autocompletion: pd.DataFrame
A dataframe indicating whether a key was autocompleted or not.
Returns
-------
int
Number of autocompletions performed by the user.
"""
return keys_with_autocompletion["autocomplete_per_key"].sum()
[docs]
def get_state_durations(
submission_state: pd.DataFrame,
key_intervals: pd.DataFrame,
word: pd.DataFrame,
) -> pd.DataFrame:
"""Return submission state duration.
To compute the state duration, we group by displayed word and
by submission state. Indeed, we want to compute statistics
per word on the time spent in a given state or between states.
Parameters
----------
submission_state : pd.DataFrame
Data frame containing the submission state each time a user
types a key.
key_intervals : pd.DataFrame
Time intervals between samples
word : pd.DataFrame
A data frame listing all the words displayed on screen during
the test with the following columns: ``words``,
``appearance_timestamp`` and ``disappearance_timestamp``.
Returns
-------
pd.DataFrame
A data frame that contains duration of each correcting state.
"""
# We group by displayed word and submission_state to keep track of
# whether the state corresponds to a successfully typed word or
# not. Otherwise, two correctly typed words would just regroup into
# one `Correct` state.
temp_df = submission_state.copy()
intervals = key_intervals["key_intervals"].values
temp_df["key_intervals"] = intervals
temp_df["success"] = submission_state["displayed_word"].apply(
lambda x: word.loc[word["word"] == x, "success"].values[0]
)
t_between_states = submission_state.index.to_series().diff().dt.total_seconds()
temp_df["t_between_states"] = t_between_states
temp_df["index"] = submission_state.index
groups = temp_df.groupby(
[
"displayed_word",
(
submission_state["submission_state"].shift()
!= submission_state["submission_state"]
).cumsum(),
]
)
res = pd.DataFrame(groups)
res["submission_state"] = groups.submission_state.apply(lambda x: x.iloc[0]).values
res["duration"] = groups.sum()["key_intervals"].values
res["success"] = groups.success.apply(lambda x: x.all()).values
res["t_between_states"] = groups.t_between_states.apply(lambda x: x.iloc[0]).values
res["displayed_word"] = groups.displayed_word.apply(lambda x: x.iloc[0]).values
res["index"] = groups.index.apply(lambda x: x.iloc[0]).values
# These columns were created by the groupby object being converted
# into a dataframe.
res.drop([0, 1], axis=1, inplace=True)
return res.set_index("index").sort_index()
[docs]
def get_reaction_duration(state_durations: pd.DataFrame) -> pd.DataFrame:
"""Compute the reaction duration.
The reaction duration is the time elapsed in an incorrect state
for a correct submission.
Parameters
----------
state_durations : pd.DataFrame
Data frame containing submission states and duration.
Returns
-------
pd.DataFrame
A data frame that contains reaction duration.
"""
return state_durations.loc[
state_durations["submission_state"] == KeyState.INCORRECT,
["duration", "success"],
]
[docs]
def get_correct_reaction_duration(
reaction_duration: pd.DataFrame,
) -> pd.DataFrame:
"""Return reaction durations for correct words.
Parameters
----------
reaction_duration : pd.DataFrame
Data frame containing reacting durations.
Returns
-------
pd.DataFrame
A data frame that contains reaction duration for correct
words.
"""
return reaction_duration.loc[reaction_duration["success"], "duration"]
[docs]
def get_correcting_duration(state_durations: pd.DataFrame) -> pd.DataFrame:
"""Compute correcting duration.
The Correcting Duration is the time elapsed in a correcting state
for a correct submission, in other words, the time spent correcting
a mistake.
Parameters
----------
state_durations : pd.DataFrame
Data frame containing submission states and duration.
Returns
-------
pd.DataFrame
A data frame that contains correcting duration.
"""
return state_durations.loc[
state_durations["submission_state"] == KeyState.CORRECTING,
["duration", "success"],
]
[docs]
def get_correct_correcting_duration(
correcting_duration: pd.DataFrame,
) -> pd.DataFrame:
"""Return correcting duration for correct words.
Parameters
----------
correcting_duration : pd.DataFrame
Data frame containing correcting durations.
Returns
-------
pd.DataFrame
A data frame that contains correcting duration for correct
words.
"""
return correcting_duration.loc[correcting_duration["success"], "duration"]
[docs]
def get_reacting_times(state_durations: pd.DataFrame) -> pd.DataFrame:
"""Return reacting times.
The reacting time is the time elapsed between an incorrect state
and a correcting state for a correct submission, in other words,
the time to react to a mistake.
Parameters
----------
state_durations : pd.DataFrame
Data frame containing submission states and duration.
Returns
-------
pd.DataFrame
A data frame containing reacting times.
"""
return state_durations.loc[
state_durations["submission_state"] == KeyState.CORRECTING,
["t_between_states", "success"],
]
[docs]
def get_correct_reacting_time(reacting_times: pd.DataFrame) -> pd.DataFrame:
"""Return reacting times for correct words.
Parameters
----------
reacting_times : pd.DataFrame
Data frame containing correction differences.
Returns
-------
pd.DataFrame
A data frame that contains reacting duration for correct words.
"""
return reacting_times.loc[reacting_times["success"], "t_between_states"]
[docs]
def find_consec_element_and_count(data: pd.DataFrame) -> pd.DataFrame:
"""Find consecutive elements and count them.
Parameters
----------
data : pd.DataFrame
Data frame to compute measure related to successfully written words
with(out) error, and also a streak of words with(out) error(s).
Returns
-------
pd.DataFrame
A data frame with three columns: the count, the submission state and if
the sequence was free of error
"""
res = data.copy()
# Assign a group to each consecutive elements
res["subgroup"] = (res["is_error_free"] != res["is_error_free"].shift(1)).cumsum()
group = res.groupby("subgroup")
return pd.DataFrame(
{
"count": group["is_error_free"].count(),
"success": group["success"].any(),
"is_error_free": group["is_error_free"].any(),
}
)
[docs]
def count_words_typed_in_row(data: pd.DataFrame, error_free: bool = True) -> int:
"""Count the successfully typed words with(out) error in a row.
The parameter ``error_free`` indicates if we look at consecutive words that has
been successfully typed and: without error (True) or with error (False).
Parameters
----------
data
Dataframe containing typing data
error_free
If True, only count words typed with no error
Returns
-------
Number of words typed in a row
"""
mask = data.is_error_free
if not error_free:
mask = ~mask
res = data.loc[data.success & mask, "count"]
if len(res) > 0:
return res.max()
return 0
[docs]
def similarity_ratio(x: Union[pd.Series, pd.DataFrame]) -> float:
"""Compute the similarity ratio."""
a, b = x.values
return SequenceMatcher(None, a, b).ratio()
[docs]
def apply_similarity_ratio(submission_state: pd.DataFrame) -> pd.Series:
"""Apply similarity ratio.
The similarity ratio is computed between the word being typed
``displayed_input`` and the word on screen ``displayed_word``.
Parameters
----------
submission_state
A data frame that contains the user state after each keystroke.
Returns
-------
Similarity ratio between the displayed input and
displayed_word.
"""
res = submission_state[["displayed_input", "displayed_word"]].copy()
res["similarity_ratio"] = res.apply(similarity_ratio, axis=1)
return res
[docs]
def count_incorrect_words(word: pd.DataFrame) -> float:
"""Count the number of incorrect words.
Parameters
----------
word : pd.DataFrame
A data frame of the words.
Returns
-------
int
The number of incorrect words.
"""
return (~word["success"]).sum()
[docs]
def count_key_pressed(key_pressed: pd.Series, alphabet: bool = False) -> int:
"""Count the number of keys pressed.
Parameters
----------
key_pressed
A pd.Series indicating the key pressed during the test.
alphabet
An optional argument indicating if the count should be done on alphabet
letters only.
Returns
-------
int
The number of keys pressed.
"""
if alphabet:
return key_pressed.apply(lambda x: x.isalpha()).sum()
return key_pressed.sum()
[docs]
def ratio_key_pressed(
submission_state: pd.DataFrame, word: pd.DataFrame
) -> pd.DataFrame:
"""Group by word and count the letters typed for completed words.
Parameters
----------
submission_state
A data frame that contains the user state after each keystroke.
word
A data frame listing all the words displayed on screen during the
test with the following columns: ``words``, ``appearance_timestamp``
and ``disappearance_timestamp``.
Returns
-------
pd.DataFrame
A data frame with a column ``letters_typed_over_length``
containing the following ratio: number of letters typed divided
by the length of the word (for completed words).
"""
res = (
submission_state[["displayed_word", "key"]]
.groupby(by="displayed_word")
.apply(lambda x: count_key_pressed(x["key"], True))
)
# keep the completed words only
completed_word = word.loc[word["success"], "word"]
res = res[completed_word]
# compute word length
word_length = res.index.to_series().apply(len)
# return the ratio letters typed over word length
return pd.DataFrame({"letters_typed_over_length": res / word_length}).set_index(
res.index
)
[docs]
def letter_interval(
key_is_letter: pd.Series,
key_intervals: pd.Series,
extra_mask: Optional[pd.Series] = None,
) -> pd.Series:
"""Compute the interval between two consecutive letters.
Parameters
----------
key_is_letter
A boolean series indicating if a key is a letter.
key_intervals
A series of float indicating the time between two consecutive keys.
extra_mask
An optional series of boolean that will be used as an extra mask
to filter intervals.
Returns
-------
Tuple[pd.Series, pd.Series]
The first element of the tuple is the time between each consecutive
letter keys. The second element is the mask that has been used to
filter the `key_intervals` to find the letters of interests.
"""
mask = key_is_letter & key_is_letter.shift(1)
if extra_mask is not None:
mask = mask & extra_mask
return key_intervals[mask], mask
[docs]
def keep_top_ten(data: pd.DataFrame) -> pd.Series:
"""Keep top 10% intervals.
Parameters
----------
data
A dataframe of letter intervals with the column `letter_intervals`.
Returns
-------
The top 10 percent letter intervals.
"""
res = data["letter_intervals"]
if res.empty:
return res
return res[res >= np.percentile(res, q=90)]
[docs]
def max_letter_interval_dev(data: pd.DataFrame) -> pd.Series:
"""Compute the maximum letter interval deviation from the mean."""
return ((data["letter_intervals"] - data["letter_intervals"].mean()) ** 2).max()
[docs]
def filter_intervals(data: pd.DataFrame, differentiate: bool) -> pd.Series:
"""Compute letter intervals prior a mistake within a word."""
mask = (data["submission_state"] == KeyState.CORRECT).to_numpy()
# Find first non-correct element if there is one
not_correct = np.nonzero(~mask)[0]
if len(not_correct) > 0:
# turn to False every Correct letters typed after a mistake
mask[not_correct[0] :] = False
mask = mask & data["is_letter"].to_numpy()
res = data.index.to_series().diff()
if differentiate:
res = res.diff()
return res[mask].dt.total_seconds()
[docs]
def interval_until_mistake(
submission_state: pd.DataFrame,
key_is_letter: pd.DataFrame,
differentiate: bool,
) -> pd.DataFrame:
"""Compute letter intervals prior a mistake for each word."""
col = "interval_until_mistake"
if differentiate:
col = f"derived_{col}"
res = submission_state[["displayed_word", "submission_state"]].copy()
res["is_letter"] = key_is_letter["key_is_letter"].values
multi_index_res = (
res.groupby(by="displayed_word")
.apply(lambda x: filter_intervals(x, differentiate))
.dropna()
)
if multi_index_res.empty:
return pd.DataFrame({col: []})
return pd.DataFrame({col: multi_index_res.values})
[docs]
def compute_typing_speed_slope(
word: pd.DataFrame, submission_state: pd.DataFrame
) -> float:
"""Compute the slope of the typing speed."""
def _compute_char_speed(group: pd.DataFrame):
# Return the mean interval between the characters of a word
return group.index.to_series().diff().dt.total_seconds().mean()
# Get the right columns and change index for the merge
df_word_filtered = word[["appearance_timestamp", "word"]].copy().set_index("word")
# Compute the typing speed for each word
char_speed = pd.DataFrame(
submission_state.groupby("displayed_word").apply(_compute_char_speed),
columns=["mean_char_speed"],
)
# Index based inner join to discard uncompleted words
res = pd.merge(df_word_filtered, char_speed, left_index=True, right_index=True)
# Return the mean slope
return (
res["mean_char_speed"].diff()
/ res["appearance_timestamp"].diff().dt.total_seconds()
).mean()