dispel.processing.extract module#
Extraction functionalities for processing module.
- dispel.processing.extract.AGGREGATION_CENTER_BASED = {'cv', 'cvi', 'kurtosis', 'skew', 'std'}#
A set of aggregations for which the validator on definitions is ignored
- dispel.processing.extract.AGGREGATION_REGISTRY: Dict[str, Tuple[str | Callable[[Series], float], str]] = {'cv': (<function variation>, 'coefficient of variation'), 'cvi': (<function variation_increase>, 'coefficient of variation increase'), 'kurtosis': ('kurtosis', 'kurtosis'), 'max': ('max', 'maximum'), 'mean': ('mean', 'mean'), 'median': ('median', 'median'), 'min': ('min', 'minimum'), 'npcv': (<function npcv>, 'non parametric coefficient of variation'), 'q95': (functools.partial(<function Series.quantile>, q=0.95), '95th percentile'), 'skew': ('skew', 'skewness'), 'std': ('std', 'standard deviation')}#
A dictionary containing all aggregation methods
- class dispel.processing.extract.AggregateMeasures[source]#
Bases:
MeasureDefinitionMixin
,ProcessingStep
Aggregate multiple measures into a single one.
- Parameters:
definition (dispel.data.values.ValueDefinition | dispel.data.values.ValueDefinitionPrototype | None) – The measure definition
measure_ids (List[dispel.data.values.DefinitionId | str]) – A list of measure ids to be considered for aggregation
aggregation_method – The method used to aggregate the measure values, np.mean by default.
fail_if_missing – If
True
and any of themeasure_ids
is not present the processing fails.yield_if_nan – If
True
, yield null values as measure values. Otherwise, processing will not return a measure value in case of a null result for the aggregation.
- __init__(definition=None, measure_ids=None, aggregation_method=None, fail_if_missing=None, yield_if_nan=None)[source]#
- Parameters:
definition (MeasureValueDefinition | None) –
measure_ids (List[DefinitionId | str] | None) –
fail_if_missing (bool | None) –
yield_if_nan (bool | None) –
- aggregation_method = None#
- property error_handler: ErrorHandling#
Get error handler corresponding to the
fail_if_missing
arg.
- fail_if_missing = False#
- get_measure_ids(**kwargs)[source]#
Get the measure ids considered for aggregation.
- Return type:
List[DefinitionId | str]
- static get_measure_set(reading)[source]#
Get the measure set used for getting measure values for ids.
- Parameters:
reading (Reading) –
- Return type:
- get_measures(reading, **kwargs)[source]#
Get the measures for aggregation.
- Parameters:
reading (Reading) –
- Return type:
- measure_ids: List[DefinitionId | str] = []#
- process_reading(reading, **kwargs)[source]#
See
process()
.- Parameters:
reading (Reading) –
- Return type:
Generator[ProcessingResult | ProcessingControlResult, None, None]
- yield_if_nan = False#
- class dispel.processing.extract.AggregateModalities[source]#
Bases:
AggregateMeasures
Aggregate measure values from different modalities.
This is a convenience step to address the common pattern of aggregating a measure from different modalities of the same root measure. The measure ids are derived from the provided
definition
and themodalities
.- get_measure_ids(**kwargs)[source]#
Get measure ids based on modalities and base measure definition.
- Return type:
List[DefinitionId | str]
- get_modalities()[source]#
Get a list of modalities to be aggregated.
- Return type:
List[List[str | AbbreviatedValue]]
- modalities: List[List[str | AbbreviatedValue]] = []#
A list of modalities to use for aggregation
- class dispel.processing.extract.AggregateRawDataSetColumn[source]#
Bases:
ExtractStep
An extraction step that allows to summarise a column of a dataset.
This processing step encapsulates the class
ExtractMultipleStep
and allows to produce multipleMeasureValue
s derived on the same column of a dataset.- Parameters:
data_set_id – A single data set id
column_id (str) – The column id of the dataset on which the transform function will be applied.
aggregations (AggregationsDefinitionType) –
Either a list of tuples (func, label) where
func
consumes the data sets specified throughdata_set_id
at the columncolumn_id
and returns a single value passed toMeasureValue
. Thelabel
element of the tuple will be passed asaggregation
keyword tocreate_definition()
. The label can be either a string or anAbbreviatedValue
. If it is a string the label is wrapped with the label and aggregation method as abbreviation.There are three constants
BASIC_AGGREGATIONS
,DEFAULT_AGGREGATIONS
andEXTENDED_AGGREGATIONS
that can be used for common aggregation scenarios.The function is passed to
pandas.Series.agg()
and hence allows to specify some default aggregation functions like'mean'
or'std'
without actually having to pass a callable.definition – A
ValueDefinitionPrototype
that is used to create theMeasureValueDefinition
s for the aggregation functions provided inaggregations
.level_filter – An optional
LevelFilter
to determine the levels for extraction. If no filter is provided, all levels will be considered. Thelevel_filter
also acceptsstr
,LevelId
s and lists of either and passes them to aLevelIdFilter
for convenience.
Examples
To ease the generation of multiple similar measures for the same column of a dataset, the
AggregateRawDataSetColumn
provides a convenient way to do so. Assume you want to create both the median and standard deviation of a specific column of a data set, this can be achieved as follows:>>> from dispel.data.values import ValueDefinitionPrototype >>> from dispel.processing.extract import AggregateRawDataSetColumn >>> step = AggregateRawDataSetColumn( ... 'data-set-id', ... 'column-name', ... aggregations=[ ... ('median', 'median'), ... ('std', 'standard deviation') ... ], ... definition=ValueDefinitionPrototype( ... id_='measure-{method}', ... name='{method} measure', ... unit='s' ... ) ... )
This extraction step will result in two measure values, one for the medianand one with the standard deviation of the column
'column-name'
of the data set identified by'data-set-id'
.This extraction step will result in three measure values, one for the median, one for the standard deviation and one for the variation increase of the column
'column-name'
of the data set identified by'data-set-id'
. The median and variation increase measures will have associated COI references as provided.- __init__(data_set_id=None, column_id=None, aggregations=None, definition=None, level_filter=None)[source]#
- aggregations: AggregationsDefinitionType#
- dispel.processing.extract.BASIC_AGGREGATIONS: List[Tuple[str, str]] = [('mean', 'mean'), ('std', 'standard deviation')]#
A list of basic used aggregation methods
- dispel.processing.extract.DEFAULT_AGGREGATIONS: List[Tuple[str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum')]#
A list of commonly used aggregation methods
- dispel.processing.extract.DEFAULT_AGGREGATIONS_CV: List[Tuple[Callable[[Any], float] | str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), (<function variation>, 'coefficient of variation')]#
A list of commonly used aggregation methods plus coefficient of variation
- dispel.processing.extract.DEFAULT_AGGREGATIONS_IQR: List[Tuple[Callable[[Any], float] | str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), (<function iqr>, 'iqr')]#
A list of commonly used aggregation methods plus inter-quartile range
- dispel.processing.extract.DEFAULT_AGGREGATIONS_Q95: List[Tuple[Callable[[Any], float] | str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), (functools.partial(<function Series.quantile>, q=0.95), '95th percentile')]#
A list of commonly used aggregation methods plus 95th percentile
- dispel.processing.extract.EXTENDED_AGGREGATIONS: List[Tuple[str, str]] = [('mean', 'mean'), ('std', 'standard deviation'), ('median', 'median'), ('min', 'minimum'), ('max', 'maximum'), ('skew', 'skewness'), ('kurtosis', 'kurtosis')]#
An extended list of commonly used aggregation methods
- class dispel.processing.extract.ExtractMultipleStep[source]#
Bases:
ExtractStep
A measure extraction processing step for multiple measures.
This processing step allows to produce multiple
MeasureValue
s by providing a list of functions and aValueDefinitionPrototype
to create theValueDefinition
s from.- Parameters:
data_set_ids – An optional list of data set ids to be used for the transformation. See
DataSetProcessingStepMixin
.transform_functions (Iterable[Dict[str, Any]]) – An optional list of dictionaries containing at least the processing function under the key
func
, which consumes the specified data sets thoughdata_set_ids
as positional arguments and returns a measure value passed toMeasureValue
. Additional keywords will be passed tocreate_definition()
. If no functions are provided, thetransform_functions
class variable will be used.definition – A
ValueDefinitionPrototype
that is used to create theMeasureValueDefinition
s for the transformation functions provided intransform_functions
.level_filter – An optional filter to limit the levels being processed. See
LevelProcessingStep
.yield_if_nan – If
True
, yield null values as measure values. Otherwise, processing will not return a measure value in case of a null result for the extraction.
Examples
To ease the generation of multiple similar measures the
ExtractMultipleStep
provides a convenient way to do so. Assume you want to create both the mean and median of a data set this can be achieved as follows:>>> import numpy as np >>> from dispel.data.values import ValueDefinitionPrototype >>> from dispel.processing.extract import ExtractMultipleStep >>> step = ExtractMultipleStep( ... 'data-set-id', ... [ ... {'func': np.mean, 'method': 'average'}, ... {'func': np.median, 'method': 'median'} ... ], ... ValueDefinitionPrototype( ... id_='measure-{method}', ... name='{method} measure', ... unit='s' ... ) ... )
This extraction step will result in two measure values, one for the mean and one with the median.
- __init__(data_set_ids=None, transform_functions=None, definition=None, level_filter=None, yield_if_nan=None)[source]#
- static __new__(cls, *args, **kwargs)#
- class dispel.processing.extract.ExtractStep[source]#
Bases:
MeasureDefinitionMixin
,TransformStepChainMixIn
,MutateDataSetProcessingStepBase
A measure extraction processing step.
This class provides a convenient way to extract a measure from one or more data sets by specifying their id, their level_ids or level filter, a transformation function and a measure value definition.
- Parameters:
data_set_ids – An optional list of data set ids to be used for the transformation. See
DataSetProcessingStepMixin
.transform_function – An optional function to be applied to the data sets. See
MutateDataSetProcessingStepBase
.definition – An optional value definition or prototype. See
MeasureDefinitionMixin
.level_filter – An optional filter to limit the levels being processed. See
LevelProcessingStep
.yield_if_nan (bool) – If
True
, yield null values as measure values. Otherwise, processing will not return a measure value in case of a null result for the extraction.
Examples
Assuming we wanted to compute the maximum value of a raw data set we can create the following step
>>> from dispel.data.values import ValueDefinition >>> from dispel.processing.extract import ExtractStep >>> step = ExtractStep( ... 'data-set-id', ... lambda data: data.max(axis=0), ... ValueDefinition('maximum','Maximum value') ... )
A common approach is to define a processing step for re-use and leveraging the
@transformation
decorator to specify the transformation function:>>> import pandas as pd >>> from dispel.data.values import ValueDefinition >>> from dispel.processing.extract import ExtractStep >>> from dispel.processing.data_set import transformation >>> class MyExtractStep(ExtractStep): ... data_set_ids = 'data-set-id' ... definition = ValueDefinition('maximum','Maximum value') ... ... @transformation ... def _max(self, data: pd.DataFrame) -> float: ... return data.max(axis=0)
Often one wants to extract multiple measures from one data set. This can be achieved by using prototypes and optional named arguments with
@transformation
:>>> import pandas as pd >>> from dispel.data.values import ValueDefinitionPrototype >>> from dispel.processing.extract import ExtractStep >>> from dispel.processing.data_set import transformation >>> class MyExtractStep(ExtractStep): ... data_set_ids = 'data-set-id' ... definition = ValueDefinitionPrototype( ... id_='id-{agg_abbr}', ... name='{agg} value' ... ) ... ... @transformation(agg='Maximum', agg_abbr='max') ... def _max(self, data: pd.DataFrame) -> float: ... return data.max(axis=0) ... ... @transformation(agg='Minimum', agg_abbr='min') ... def _min(self, data: pd.DataFrame) -> float: ... return data.min(axis=0)
- __init__(data_set_ids=None, transform_function=None, definition=None, level_filter=None, yield_if_nan=None)[source]#
- Parameters:
definition (ValueDefinition | ValueDefinitionPrototype | None) –
level_filter (str | LevelId | List[str] | List[LevelId] | LevelFilter | None) –
yield_if_nan (bool | None) –
- wrap_result(res, level, reading, **kwargs)[source]#
Wrap the result from the processing function into a class.
- Parameters:
res (Any) – Any result returned by the extraction step. If res is a
WrappedResult
, the flag contained in the object will be automatically added to theMeasureValue
, hence the flagged wrapped results will always translate into flaggedMeasureValue
.level (Level) – The current level
reading (Reading) – The current reading
kwargs (Any) – Additional kwargs
- Yields:
LevelProcessingResult – The processing result
- Return type:
Generator[LevelProcessingResult | RawDataSetProcessingResult, None, None]
- class dispel.processing.extract.MeasureDefinitionMixin[source]#
Bases:
object
A mixin class for processing steps producing measure values.
- Parameters:
definition (dispel.data.values.ValueDefinition | dispel.data.values.ValueDefinitionPrototype | None) – An optional value definition. If no value definition is provided, the
definition
class variable will be used. Alternatively, one can overwriteget_definition()
to provide the definition.
- definition: ValueDefinition | ValueDefinitionPrototype | None = None#
The specification of the measure definition
- get_definition(**kwargs)[source]#
Get the measure definition.
- Parameters:
kwargs – Optional parameters that will be passed along to the creation of measure definitions from prototypes. See
create_definition()
- Returns:
The definition of the value
- Return type:
- get_value(value, **kwargs)[source]#
Get a measure value based on the definition.
- Parameters:
value (Any) – The value
kwargs – Optional arguments passed to
get_definition()
.
- Returns:
The
value
wrapped with the definition fromget_definition()
.- Return type:
- class dispel.processing.extract.MeasureFlagStep[source]#
Bases:
FlagStepMixin
,ProcessingStep
A class for measure flag.
- Parameters:
measure_ids (dispel.data.values.DefinitionId | str) – The identifier(s) of the measure(s) that are to be flagged.
task_name (dispel.data.values.AbbreviatedValue | str) – An optional abbreviated name value of the task used for the flag. See
FlagStepMixin
.flag_name (dispel.data.values.AbbreviatedValue | str) – An optional abbreviated name value of the considered flag. See
FlagStepMixin
.flag_type (dispel.data.flags.FlagType | str) – An optional flag type. See
FlagType
.flag_severity (dispel.data.flags.FlagSeverity | str) – An optional flag severity. See
FlagSeverity
.reason (str) – An optional string reason of the considered flag. See
FlagStepMixin
.stop_processing (bool) – An optional boolean that specifies whether the flag is stop_processing i.e. raises an error or not. See
FlagStepMixin
.flagging_function (Callable[[...], bool] | None) – An optional flagging function to be applied to a
MeasureValue
’s raw value. SeeFlagStepMixin
.target_ids (str | Iterable[str] | None) – An optional id(s) of the target measures to be flagged. If the user doesn’t specify the targets then the targets will automatically be the used measures.
Examples
Assuming you want to flag the step count measure value, you can create the following flag step:
>>> from dispel.data.values import AbbreviatedValue as AV >>> from dispel.processing.extract import MeasureFlagStep >>> step = MeasureFlagStep( ... measure_ids='6mwt-step_count', ... task_name=AV('Six-minute walk test', '6mwt'), ... flag_name=AV('step count threshold', 'sct'), ... flag_type='behavioral', ... flag_severity=FlagSeverity.DEVIATION, ... reason='The step count value exceeds 1000 steps.', ... flagging_function=lambda value: value < 1000, ... )
The flagging function will be called with the measure value corresponding to provided measure id. If the function has named parameters matching
level
orreading
, the respective level and reading will be passed to the flag function.Another common scenario is to define a class that can be reused.
>>> from dispel.data.flags import FlagType >>> from dispel.processing.extract import MeasureFlagStep >>> class StepCountThreshold(MeasureFlagStep): ... measure_ids = '6mwt-step_count' ... task_name = AV('Six-minute walk test', '6mwt') ... flag_name = AV('step count threshold', 'sct') ... flag_type = FlagType.BEHAVIORAL ... flag_severity = FlagSeverity.DEVIATION ... reason = 'The step count value exceeds 1000 steps.' ... stop_processing = False ... flagging_function = lambda value: value < 1000
Another convenient way to provide the flagging function is to use the
@flag
decorator, one can also use multiple flags for the same class as follows:>>> from dispel.processing.extract import MeasureFlagStep >>> from dispel.processing.flags import flag >>> class StepCountThreshold(MeasureFlagStep): ... measure_ids = '6mwt-step_count' ... task_name = AV('Six-minute walk test', '6mwt') ... flag_name = AV('step count threshold', 'sct') ... flag_type = 'behavioral' ... reason = 'The step count value exceeds {threshold} steps.' ... stop_processing = False ... ... @flag(threshold=1000, flag_severity=FlagSeverity.INVALIDATION) ... def _threshold_1000(self, value: float) -> bool: ... return value < 1000 ... ... @flag(threshold=800, flag_severity=FlagSeverity.DEVIATION) ... def _threshold_800(self, value: float) -> bool: ... return value < 800
Note that the
@flag
decorator can take keyword arguments. These kwargs are merged with any keyword arguments that come from processing step groups in order to format the flagreason
.- __init__(measure_ids=None, task_name=None, flag_name=None, flag_type=None, flag_severity=None, reason=None, stop_processing=False, flagging_function=None, target_ids=None)[source]#
- Parameters:
measure_ids (DefinitionId | str | None) –
task_name (str | AbbreviatedValue | None) –
flag_name (str | AbbreviatedValue | None) –
flag_severity (FlagSeverity | str | None) –
reason (str | AbbreviatedValue | None) –
stop_processing (bool) –
- get_flag_targets(reading, level=None, **kwargs)[source]#
Get flag targets for measures.
- Parameters:
- Return type:
Iterable[Reading | Level | RawDataSet | MeasureValue | LevelEpoch]
- get_target_ids()[source]#
Get the ids of the target data sets to be flagged.
- Returns:
The identifiers of the target data sets.
- Return type:
- measure_ids: DefinitionId | str#
- process_reading(reading, **kwargs)[source]#
Process the provided reading.
- Parameters:
- Yields:
ProcessResultType – The results from processing readings.
- Return type:
Generator[ProcessingResult | ProcessingControlResult, None, None]