dispel.processing.extract module#

Extraction functionalities for processing module.

dispel.processing.extract.AGGREGATION_CENTER_BASED = {'cv', 'cvi', 'kurtosis', 'skew', 'std'}#

A set of aggregations for which the validator on definitions is ignored

dispel.processing.extract.AGGREGATION_REGISTRY: Dict[str, Tuple[str | Callable[[Series], float], str]] = {'cv': (<function variation>, 'coefficient of variation'), 'cvi': (<function variation_increase>, 'coefficient of variation increase'), 'kurtosis': ('kurtosis', 'kurtosis'), 'max': ('max', 'maximum'), 'mean': ('mean', 'mean'), 'median': ('median', 'median'), 'min': ('min', 'minimum'), 'npcv': (<function npcv>, 'non parametric coefficient of variation'), 'q95': (functools.partial(<function Series.quantile>, q=0.95), '95th percentile'), 'skew': ('skew', 'skewness'), 'std': ('std', 'standard deviation')}#

A dictionary containing all aggregation methods

class dispel.processing.extract.AggregateMeasures[source]#

Bases: MeasureDefinitionMixin, ProcessingStep

Aggregate multiple measures into a single one.

Parameters:
  • definition (dispel.data.values.ValueDefinition | dispel.data.values.ValueDefinitionPrototype | None) – The measure definition

  • measure_ids (List[dispel.data.values.DefinitionId | str]) – A list of measure ids to be considered for aggregation

  • aggregation_method – The method used to aggregate the measure values, np.mean by default.

  • fail_if_missing – If True and any of the measure_ids is not present the processing fails.

  • yield_if_nan – If True, yield null values as measure values. Otherwise, processing will not return a measure value in case of a null result for the aggregation.

__init__(definition=None, measure_ids=None, aggregation_method=None, fail_if_missing=None, yield_if_nan=None)[source]#
Parameters:
aggregate(values)[source]#

Aggregate measure values.

Parameters:

values (List[Any]) –

Return type:

Any

aggregation_method = None#
property error_handler: ErrorHandling#

Get error handler corresponding to the fail_if_missing arg.

fail_if_missing = False#
get_measure_ids(**kwargs)[source]#

Get the measure ids considered for aggregation.

Return type:

List[DefinitionId | str]

static get_measure_set(reading)[source]#

Get the measure set used for getting measure values for ids.

Parameters:

reading (Reading) –

Return type:

MeasureSet

get_measure_values(reading, **kwargs)[source]#

Get the measure values for aggregation.

Parameters:

reading (Reading) –

Return type:

List[Any]

get_measures(reading, **kwargs)[source]#

Get the measures for aggregation.

Parameters:

reading (Reading) –

Return type:

List[MeasureValue]

measure_ids: List[DefinitionId | str] = []#
process_reading(reading, **kwargs)[source]#

See process().

Parameters:

reading (Reading) –

Return type:

Generator[ProcessingResult | ProcessingControlResult, None, None]

yield_if_nan = False#
class dispel.processing.extract.AggregateModalities[source]#

Bases: AggregateMeasures

Aggregate measure values from different modalities.

This is a convenience step to address the common pattern of aggregating a measure from different modalities of the same root measure. The measure ids are derived from the provided definition and the modalities.

get_measure_ids(**kwargs)[source]#

Get measure ids based on modalities and base measure definition.

Return type:

List[DefinitionId | str]

get_modalities()[source]#

Get a list of modalities to be aggregated.

Return type:

List[List[str | AbbreviatedValue]]

modalities: List[List[str | AbbreviatedValue]] = []#

A list of modalities to use for aggregation

class dispel.processing.extract.AggregateRawDataSetColumn[source]#

Bases: ExtractStep

An extraction step that allows to summarise a column of a dataset.

This processing step encapsulates the class ExtractMultipleStep and allows to produce multiple MeasureValues derived on the same column of a dataset.

Parameters:
  • data_set_id – A single data set id

  • column_id (str) – The column id of the dataset on which the transform function will be applied.

  • aggregations (AggregationsDefinitionType) –

    Either a list of tuples (func, label) where func consumes the data sets specified through data_set_id at the column column_id and returns a single value passed to MeasureValue. The label element of the tuple will be passed as aggregation keyword to create_definition(). The label can be either a string or an AbbreviatedValue. If it is a string the label is wrapped with the label and aggregation method as abbreviation.

    There are three constants BASIC_AGGREGATIONS, DEFAULT_AGGREGATIONS and EXTENDED_AGGREGATIONS that can be used for common aggregation scenarios.

    The function is passed to pandas.Series.agg() and hence allows to specify some default aggregation functions like 'mean' or 'std' without actually having to pass a callable.

  • definition – A ValueDefinitionPrototype that is used to create the MeasureValueDefinitions for the aggregation functions provided in aggregations.

  • level_filter – An optional LevelFilter to determine the levels for extraction. If no filter is provided, all levels will be considered. The level_filter also accepts str, LevelIds and lists of either and passes them to a LevelIdFilter for convenience.

Examples

To ease the generation of multiple similar measures for the same column of a dataset, the AggregateRawDataSetColumn provides a convenient way to do so. Assume you want to create both the median and standard deviation of a specific column of a data set, this can be achieved as follows:

>>> from dispel.data.values import ValueDefinitionPrototype
>>> from dispel.processing.extract import AggregateRawDataSetColumn
>>> step = AggregateRawDataSetColumn(
...     'data-set-id',
...     'column-name',
...     aggregations=[
...         ('median', 'median'),
...         ('std', 'standard deviation')
...     ],
...     definition=ValueDefinitionPrototype(
...         id_='measure-{method}',
...         name='{method} measure',
...         unit='s'
...     )
... )

This extraction step will result in two measure values, one for the medianand one with the standard deviation of the column 'column-name' of the data set identified by 'data-set-id'.

This extraction step will result in three measure values, one for the median, one for the standard deviation and one for the variation increase of the column 'column-name' of the data set identified by 'data-set-id'. The median and variation increase measures will have associated COI references as provided.

__init__(data_set_id=None, column_id=None, aggregations=None, definition=None, level_filter=None)[source]#
Parameters:
aggregations: AggregationsDefinitionType#
column_id: str#
get_agg_func_and_kwargs(func, label)[source]#

Get the keyword arguments for the aggregation.

Parameters:
Return type:

Tuple[Callable[[DataFrame], float], Dict[str, Any]]

get_aggregations()[source]#

Get the aggregations to be performed on the specified column.

Return type:

Iterable[Tuple[str | Callable[[Series], float], str | AbbreviatedValue]]

get_column_id()[source]#

Get the id of the column to be aggregated.

Return type:

str

get_definition(**kwargs)[source]#

Get value definition specific for aggregation.

Return type:

ValueDefinition

get_transform_functions()[source]#

Get the functions to transform the specified column.

Return type:

Generator[Tuple[Callable, Dict[str, Any]], None, None]

dispel.processing.extract.BASIC_AGGREGATIONS: List[Tuple[str, str]] = [('mean', 'mean'), ('std', 'standard deviation')]#

A list of basic used aggregation methods

dispel.processing.extract.DEFAULT_AGGREGATIONS: List[Tuple[str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum')]#

A list of commonly used aggregation methods

dispel.processing.extract.DEFAULT_AGGREGATIONS_CV: List[Tuple[Callable[[Any], float] | str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), (<function variation>, 'coefficient of variation')]#

A list of commonly used aggregation methods plus coefficient of variation

dispel.processing.extract.DEFAULT_AGGREGATIONS_IQR: List[Tuple[Callable[[Any], float] | str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), (<function iqr>, 'iqr')]#

A list of commonly used aggregation methods plus inter-quartile range

dispel.processing.extract.DEFAULT_AGGREGATIONS_Q95: List[Tuple[Callable[[Any], float] | str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), (functools.partial(<function Series.quantile>, q=0.95), '95th percentile')]#

A list of commonly used aggregation methods plus 95th percentile

dispel.processing.extract.EXTENDED_AGGREGATIONS: List[Tuple[str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), ('skew', 'skewness'), ('kurtosis', 'kurtosis')]#

An extended list of commonly used aggregation methods

class dispel.processing.extract.ExtractMultipleStep[source]#

Bases: ExtractStep

A measure extraction processing step for multiple measures.

This processing step allows to produce multiple MeasureValues by providing a list of functions and a ValueDefinitionPrototype to create the ValueDefinitions from.

Parameters:
  • data_set_ids – An optional list of data set ids to be used for the transformation. See DataSetProcessingStepMixin.

  • transform_functions (Iterable[Dict[str, Any]]) – An optional list of dictionaries containing at least the processing function under the key func, which consumes the specified data sets though data_set_ids as positional arguments and returns a measure value passed to MeasureValue. Additional keywords will be passed to create_definition(). If no functions are provided, the transform_functions class variable will be used.

  • definition – A ValueDefinitionPrototype that is used to create the MeasureValueDefinitions for the transformation functions provided in transform_functions.

  • level_filter – An optional filter to limit the levels being processed. See LevelProcessingStep.

  • yield_if_nan – If True, yield null values as measure values. Otherwise, processing will not return a measure value in case of a null result for the extraction.

Examples

To ease the generation of multiple similar measures the ExtractMultipleStep provides a convenient way to do so. Assume you want to create both the mean and median of a data set this can be achieved as follows:

>>> import numpy as np
>>> from dispel.data.values import ValueDefinitionPrototype
>>> from dispel.processing.extract import ExtractMultipleStep
>>> step = ExtractMultipleStep(
...     'data-set-id',
...     [
...         {'func': np.mean, 'method': 'average'},
...         {'func': np.median, 'method': 'median'}
...     ],
...     ValueDefinitionPrototype(
...         id_='measure-{method}',
...         name='{method} measure',
...         unit='s'
...     )
... )

This extraction step will result in two measure values, one for the mean and one with the median.

__init__(data_set_ids=None, transform_functions=None, definition=None, level_filter=None, yield_if_nan=None)[source]#
Parameters:
static __new__(cls, *args, **kwargs)#
get_transform_functions()[source]#

Get the transform functions applied to the data sets.

Return type:

Generator[Tuple[Callable, Dict[str, Any]], None, None]

transform_functions: Iterable[Dict[str, Any]]#
class dispel.processing.extract.ExtractStep[source]#

Bases: MeasureDefinitionMixin, TransformStepChainMixIn, MutateDataSetProcessingStepBase

A measure extraction processing step.

This class provides a convenient way to extract a measure from one or more data sets by specifying their id, their level_ids or level filter, a transformation function and a measure value definition.

Parameters:
  • data_set_ids – An optional list of data set ids to be used for the transformation. See DataSetProcessingStepMixin.

  • transform_function – An optional function to be applied to the data sets. See MutateDataSetProcessingStepBase.

  • definition – An optional value definition or prototype. See MeasureDefinitionMixin.

  • level_filter – An optional filter to limit the levels being processed. See LevelProcessingStep.

  • yield_if_nan (bool) – If True, yield null values as measure values. Otherwise, processing will not return a measure value in case of a null result for the extraction.

Examples

Assuming we wanted to compute the maximum value of a raw data set we can create the following step

>>> from dispel.data.values import ValueDefinition
>>> from dispel.processing.extract import ExtractStep
>>> step = ExtractStep(
...     'data-set-id',
...     lambda data: data.max(axis=0),
...     ValueDefinition('maximum','Maximum value')
... )

A common approach is to define a processing step for re-use and leveraging the @transformation decorator to specify the transformation function:

>>> import pandas as pd
>>> from dispel.data.values import ValueDefinition
>>> from dispel.processing.extract import ExtractStep
>>> from dispel.processing.data_set import transformation
>>> class MyExtractStep(ExtractStep):
...     data_set_ids = 'data-set-id'
...     definition = ValueDefinition('maximum','Maximum value')
...
...     @transformation
...     def _max(self, data: pd.DataFrame) -> float:
...         return data.max(axis=0)

Often one wants to extract multiple measures from one data set. This can be achieved by using prototypes and optional named arguments with @transformation:

>>> import pandas as pd
>>> from dispel.data.values import ValueDefinitionPrototype
>>> from dispel.processing.extract import ExtractStep
>>> from dispel.processing.data_set import transformation
>>> class MyExtractStep(ExtractStep):
...     data_set_ids = 'data-set-id'
...     definition = ValueDefinitionPrototype(
...         id_='id-{agg_abbr}',
...         name='{agg} value'
...     )
...
...     @transformation(agg='Maximum', agg_abbr='max')
...     def _max(self, data: pd.DataFrame) -> float:
...         return data.max(axis=0)
...
...     @transformation(agg='Minimum', agg_abbr='min')
...     def _min(self, data: pd.DataFrame) -> float:
...         return data.min(axis=0)
__init__(data_set_ids=None, transform_function=None, definition=None, level_filter=None, yield_if_nan=None)[source]#
Parameters:
wrap_result(res, level, reading, **kwargs)[source]#

Wrap the result from the processing function into a class.

Parameters:
  • res (Any) – Any result returned by the extraction step. If res is a WrappedResult, the flag contained in the object will be automatically added to the MeasureValue, hence the flagged wrapped results will always translate into flagged MeasureValue.

  • level (Level) – The current level

  • reading (Reading) – The current reading

  • kwargs (Any) – Additional kwargs

Yields:

LevelProcessingResult – The processing result

Return type:

Generator[LevelProcessingResult | RawDataSetProcessingResult, None, None]

yield_if_nan: bool = False#
class dispel.processing.extract.MeasureDefinitionMixin[source]#

Bases: object

A mixin class for processing steps producing measure values.

Parameters:

definition (dispel.data.values.ValueDefinition | dispel.data.values.ValueDefinitionPrototype | None) – An optional value definition. If no value definition is provided, the definition class variable will be used. Alternatively, one can overwrite get_definition() to provide the definition.

__init__(*args, **kwargs)[source]#
definition: ValueDefinition | ValueDefinitionPrototype | None = None#

The specification of the measure definition

get_definition(**kwargs)[source]#

Get the measure definition.

Parameters:

kwargs – Optional parameters that will be passed along to the creation of measure definitions from prototypes. See create_definition()

Returns:

The definition of the value

Return type:

ValueDefinition

get_value(value, **kwargs)[source]#

Get a measure value based on the definition.

Parameters:
Returns:

The value wrapped with the definition from get_definition().

Return type:

MeasureValue

class dispel.processing.extract.MeasureFlagStep[source]#

Bases: FlagStepMixin, ProcessingStep

A class for measure flag.

Parameters:

Examples

Assuming you want to flag the step count measure value, you can create the following flag step:

>>> from dispel.data.values import AbbreviatedValue as AV
>>> from dispel.processing.extract import MeasureFlagStep
>>> step = MeasureFlagStep(
...     measure_ids='6mwt-step_count',
...     task_name=AV('Six-minute walk test', '6mwt'),
...     flag_name=AV('step count threshold', 'sct'),
...     flag_type='behavioral',
...     flag_severity=FlagSeverity.DEVIATION,
...     reason='The step count value exceeds 1000 steps.',
...     flagging_function=lambda value: value < 1000,
... )

The flagging function will be called with the measure value corresponding to provided measure id. If the function has named parameters matching level or reading, the respective level and reading will be passed to the flag function.

Another common scenario is to define a class that can be reused.

>>> from dispel.data.flags import FlagType
>>> from dispel.processing.extract import MeasureFlagStep
>>> class StepCountThreshold(MeasureFlagStep):
...     measure_ids = '6mwt-step_count'
...     task_name = AV('Six-minute walk test', '6mwt')
...     flag_name = AV('step count threshold', 'sct')
...     flag_type = FlagType.BEHAVIORAL
...     flag_severity = FlagSeverity.DEVIATION
...     reason = 'The step count value exceeds 1000 steps.'
...     stop_processing = False
...     flagging_function = lambda value: value < 1000

Another convenient way to provide the flagging function is to use the @flag decorator, one can also use multiple flags for the same class as follows:

>>> from dispel.processing.extract import MeasureFlagStep
>>> from dispel.processing.flags import flag
>>> class StepCountThreshold(MeasureFlagStep):
...     measure_ids = '6mwt-step_count'
...     task_name = AV('Six-minute walk test', '6mwt')
...     flag_name = AV('step count threshold', 'sct')
...     flag_type = 'behavioral'
...     reason = 'The step count value exceeds {threshold} steps.'
...     stop_processing = False
...
...     @flag(threshold=1000, flag_severity=FlagSeverity.INVALIDATION)
...     def _threshold_1000(self, value: float) -> bool:
...         return value < 1000
...
...     @flag(threshold=800, flag_severity=FlagSeverity.DEVIATION)
...     def _threshold_800(self, value: float) -> bool:
...         return value < 800

Note that the @flag decorator can take keyword arguments. These kwargs are merged with any keyword arguments that come from processing step groups in order to format the flag reason.

__init__(measure_ids=None, task_name=None, flag_name=None, flag_type=None, flag_severity=None, reason=None, stop_processing=False, flagging_function=None, target_ids=None)[source]#
Parameters:
flag_measure_values(measure_values, reading, **kwargs)[source]#

Flag the provided measure value.

Parameters:
Return type:

Generator[Flag, None, None]

get_flag_targets(reading, level=None, **kwargs)[source]#

Get flag targets for measures.

Parameters:
Return type:

Iterable[Reading | Level | RawDataSet | MeasureValue | LevelEpoch]

get_measure_ids()[source]#

Get the measure ids to be flagged.

Return type:

Iterable[DefinitionId | str]

get_measure_values(reading)[source]#

Get the measure raw values used for flag.

Return type:

Iterable[Any]

get_measures(reading)[source]#

Get the measure value classes used for flag.

Parameters:

reading (Reading) –

Return type:

Iterable[Any]

get_target_ids()[source]#

Get the ids of the target data sets to be flagged.

Returns:

The identifiers of the target data sets.

Return type:

str

measure_ids: DefinitionId | str#
process_reading(reading, **kwargs)[source]#

Process the provided reading.

Parameters:
  • reading (Reading) – The reading to be processed

  • kwargs – Additional arguments passed by process().

Yields:

ProcessResultType – The results from processing readings.

Return type:

Generator[ProcessingResult | ProcessingControlResult, None, None]

target_ids: str | Iterable[str] | None = None#
dispel.processing.extract.agg_column(column, method)[source]#

Create a function to apply an aggregation function on a column.

Parameters:
  • column (str) – The column to be aggregated

  • method (str | Callable[[Series], float]) – A function to apply on the column

Returns:

A function that aggregates one column of a ~pandas.DataFrame.

Return type:

Callable[[pandas.DataFrame], float]