"""Core functionality for signal processing tasks."""
import math
import warnings
from typing import Iterable, List, Literal, Optional, Sequence, Tuple, Union
import numpy as np
import numpy.typing as npt
import pandas as pd
from gatspy.periodic import LombScargle
from scipy import signal
[docs]
def euclidean_norm(data: pd.DataFrame) -> pd.Series:
    """Calculate the euclidean norm of a pandas Data Frame.
    Parameters
    ----------
    data
        A pandas data frame for which to compute the euclidean norm
    Returns
    -------
    pandas.Series
        The euclidean norm of ``data``
    """
    return data.pow(2).sum(axis=1).apply(np.sqrt) 
[docs]
def euclidean_distance(point1: Iterable[float], point2: Iterable[float]) -> float:
    """Calculate the euclidean distance between two points.
    This particular algorithm is chosen based on question 37794849 on StackOverflow.
    Parameters
    ----------
    point1
        The coordinates of `point1`.
    point2
        The coordinates of `point2`.
    Returns
    -------
    float
        The euclidean distance between ``point1`` and ``point2``.
    """
    dists = [(a - b) ** 2 for a, b in zip(point1, point2)]
    dist = math.sqrt(sum(dists))
    return dist 
[docs]
def compute_rotation_matrix_3d(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    """Compute a rotation matrix from unit vector ``a`` onto unit vector ``b``.
    Parameters
    ----------
    a
        The first unit vector
    b
        The second unit vector
    Returns
    -------
    numpy.ndarray
        The rotation matrix from unit vector ``a`` onto unit vector ``b``.
    See Also
    --------
        Implementation inspired by https://math.stackexchange.com/a/476311 and adapted
        to handle a series of rotations
    """
    assert a.shape == (3,)
    assert b.shape == (3,)
    a_norm = np.linalg.norm(a)
    b_norm = np.linalg.norm(b)
    # make sure we have norm vectors!
    if a_norm != 1:
        a = a / a_norm
    if b_norm != 1:
        b = b / b_norm
    v = np.cross(a, b)
    c = np.dot(a, b)
    i = np.identity(3)
    # the vectors are equal - no rotation needed
    if (a == b).all():
        return i
    # the vectors are in opposite direction
    if c == -a_norm * b_norm:
        return i * -1
    v_x = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]])
    r = i + v_x + np.matmul(v_x, v_x) * 1 / (1 + c)
    return r 
[docs]
def compute_rotation_matrix_2d(
    origin: list, point: list, angle: float
) -> Tuple[float, float]:
    """Compute a 2 dimension matrix rotation.
    Parameters
    ----------
    origin
        The origin of the referential.
    point
        The point to rotate.
    angle
        The angle rotation.
    Returns
    -------
    Tuple[float, float]
        The x and y coordinates after rotation.
    """
    origin_x, origin_y = origin
    point_x, point_y = point
    rotated_x = (
        origin_x
        + math.cos(angle) * (point_x - origin_x)
        - math.sin(angle) * (point_y - origin_y)
    )
    rotated_y = (
        origin_y
        + math.sin(angle) * (point_x - origin_x)
        + math.cos(angle) * (point_y - origin_y)
    )
    return rotated_x, rotated_y 
[docs]
def sparc(
    movement: np.ndarray,
    sample_freq: float = 60.0,
    pad_level: int = 4,
    cut_off_freq: float = 10.0,
    amp_th: float = 0.05,
) -> Tuple[float, tuple, tuple]:
    """Compute the spectral arc length of a signal.
    Parameters
    ----------
    movement
        The given 1 dimensional signal (x, y or z axis).
    sample_freq
        The sampling rate.
    pad_level
        The padding level.
    cut_off_freq
        The frequency cut off.
    amp_th
        The amplitude threshold.
    Returns
    -------
    new_sal: float
        The spectral arc length value.
    (freq, mag_spec): tuple
        A tuple containing both frequencies and the normalized magnitude spectrum of the
        movement.
    (freq_sel, mag_spec_sel): tuple
        A tuple containing both frequencies (after applying a cutoff) and the normalized
        magnitude spectrum (after applying a cutoff) of the movement.
    """
    # Number of zeros to be padded.
    n_fft = int(pow(2, np.ceil(np.log2(len(movement))) + pad_level))
    # Frequency
    freq = np.arange(0, sample_freq, sample_freq / n_fft)
    # Normalized magnitude spectrum
    mag_spec = abs(np.fft.fft(movement, n_fft))
    mag_spec = mag_spec / max(mag_spec)
    # Indices to choose only the spectrum within the given cut-off frequency
    # Fc.
    # NOTE: This is a low pass filtering operation to get rid of high frequency
    # noise from affecting the next step (amplitude threshold based cut off for
    # arc length calculation).
    cut_off_freq_inx = ((freq <= cut_off_freq) * 1).nonzero()
    freq_sel = freq[cut_off_freq_inx]
    mag_spec_sel = mag_spec[cut_off_freq_inx]
    # Choose the amplitude threshold based cut-off frequency.
    # Index of the last point on the magnitude spectrum that is greater than
    # or equal to the amplitude threshold.
    inx = ((mag_spec_sel >= amp_th) * 1).nonzero()[0]
    cut_off_freq_inx_range = range(inx[0], inx[-1] + 1)
    freq_sel = freq_sel[cut_off_freq_inx_range]
    mag_spec_sel = mag_spec_sel[cut_off_freq_inx_range]
    # Calculate arc length
    new_sal = -sum(
        np.sqrt(
            pow(np.diff(freq_sel) / (freq_sel[-1] - freq_sel[0]), 2)
            + pow(np.diff(mag_spec_sel), 2)
        )
    )
    return new_sal, (freq, mag_spec), (freq_sel, mag_spec_sel) 
[docs]
def assert_time_series_has_frequency(data: pd.Series) -> None:
    """Check whether a time series contains frequency as index."""
    if data.index.name != "freq":
        raise ValueError("Missing frequencies in data index.") 
[docs]
def get_sampling_rate_idx(data: pd.Series) -> float:
    """Get sampling rate from time series index.
    Parameters
    ----------
    data
        A pandas series containing the signal data for which sampling frequency is to be
        extracted.
    Returns
    -------
    float
        The sampling frequency.
    Raises
    ------
    ValueError
        Raises a value error if data has not been resampled to a constant sampling rate.
    """
    if data.index.freq is None:
        raise ValueError(
            "One is trying to extract the sampling frequency on a time series that has "
            "not been resampled to a constant sampling rate."
        )
    return 1 / data.index.freq.delta.total_seconds() 
[docs]
def entropy(power_spectrum_: np.ndarray) -> float:
    """Compute the entropy of a signal.
    Parameters
    ----------
    power_spectrum_
        An array containing the power spectrum of the signal in question.
    Returns
    -------
    float
        The signal's entropy.
    """
    data = power_spectrum_ / np.sum(power_spectrum_)
    return -np.sum(data * np.log2(data)) 
[docs]
def peak(
    power_spectrum_: pd.Series,
) -> float:
    """Compute the peak frequency of a signal.
    Parameters
    ----------
    power_spectrum_
        An array containing the power spectrum of the signal in question.
    Returns
    -------
    float
        The signal's peak frequency.
    """
    assert_time_series_has_frequency(power_spectrum_)
    return power_spectrum_.idxmax() 
[docs]
def energy(
    power_spectrum_: pd.Series,
    lowcut: Optional[float] = None,
    highcut: Optional[float] = None,
) -> float:
    """Compute the energy of a signal.
    Parameters
    ----------
    power_spectrum_
        A pandas series containing the power spectrum of the signal in question and the
        frequencies in index.
    lowcut
        The lower bound of frequencies to filter.
    highcut
        The higher bound of frequencies to filter.
    Returns
    -------
    float
        The signal's energy.
    """
    assert_time_series_has_frequency(power_spectrum_)
    if lowcut is not None:
        mask = power_spectrum_.index.to_series().between(lowcut, highcut)
        windowed_data = power_spectrum_[mask]
    else:
        windowed_data = power_spectrum_
    return 0.5 * (windowed_data * windowed_data.index).sum() 
[docs]
def amplitude(power_spectrum_: np.ndarray) -> float:
    """Compute the amplitude of a signal.
    Parameters
    ----------
    power_spectrum_
        An array containing the power spectrum of the signal in question.
    Returns
    -------
    float
        The signal's amplitude.
    """
    return np.max(power_spectrum_) 
[docs]
def get_cartesian(
    lat: Sequence[float], lon: Sequence[float]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Transform latitude longitude to cartesian coordinates in meters."""
    latitude, longitude = np.deg2rad(np.array(lat)), np.deg2rad(np.array(lon))
    earth_r = 6371000  # radius of the earth in meters
    x = earth_r * np.cos(latitude) * np.cos(longitude)
    y = earth_r * np.cos(latitude) * np.sin(longitude)
    z = earth_r * np.sin(latitude)
    return x - x[0], y - y[0], z - z[0] 
[docs]
def integrate_time_series(data: pd.Series) -> np.ndarray:
    """Compute the integral of a time series.
    Parameters
    ----------
    data
        Input series to integrate. Must have a DatetimeIndex.
    Returns
    -------
    numpy.ndarray
        Definite integral as approximated by trapezoidal rule.
    Raises
    ------
    TypeError
        Raises a type error if the index of the series to integrate is not a
        DateTimeIndex.
    """
    index = data.index
    if not isinstance(index, pd.DatetimeIndex):
        raise TypeError(
            "The time series to integrate must have a "
            f"DateTimeIndex. But index is of type {type(index)}."
        )
    return np.trapz(data.to_numpy().squeeze(), pd.to_numeric(index) / 10**9) 
[docs]
def index_time_diff(data: Union[pd.Series, pd.DataFrame]) -> pd.Series:
    """Get the time difference from the index in seconds.
    Parameters
    ----------
    data
        The series or data frame with a time-based index
    Returns
    -------
    pandas.Series
        A series containing the time difference in seconds between each row based on the
        index.
    """
    assert isinstance(
        data.index, (pd.DatetimeIndex, pd.TimedeltaIndex)
    ), "Index must be a pandas DatetimeIndex or TimedeltaIndex"
    return data.index.to_series().diff().dt.total_seconds() 
[docs]
def derive_time_series(data: pd.Series) -> pd.Series:
    """Derive a series based on its time-based index.
    Parameters
    ----------
    data
        The series for which to derive the values based on time. The series must have a
        time-based index. See :func:`index_time_diff`.
    Returns
    -------
    pandas.Series
        The time derivative of the values of ``data``.
    """
    return data.diff() / index_time_diff(data) 
[docs]
def derive_time_data_frame(data: pd.DataFrame) -> pd.DataFrame:
    """Derive a data frame based on its time-based index.
    This method is preferably used for data frames for which one wants to derive for
    each column (instead of using ``data.apply(derive_time_series)``).
    Parameters
    ----------
    data
        The pandas data frame for which to derive the values based on time. The data
        frame must have a time-based index. See :func:`index_time_diff`.
    Returns
    -------
    pandas.DataFrame
        The time derivative of the values of ``data``.
    """
    delta_t = index_time_diff(data)
    return (data.diff().T / delta_t).T 
[docs]
def discretize_sampling_frequency(
    data: pd.Series, fs_expected: List[int], max_frequency_distance: int = 5
) -> int:
    """Discretize the sampling frequency from a time series.
    First we extract the median sampling frequency of data, then return the closest
    expected frequency if the estimated sampling frequency is close enough
    (``np.abs(fs_expected, fs_estimate) < 5``) to one of the expected sampling
    frequencies.
    Parameters
    ----------
    data
        Any pandas series with a time series as index.
    fs_expected
        An iterable of expected sampling frequency in Hz.
    max_frequency_distance
        An optional integer specifying the maximum accepted distance between the
        expected frequency and the estimated frequency above which we raise an error.
    Returns
    -------
    int
        Discretized sampling frequency.
    Raises
    ------
    ValueError
        If estimated sampling frequency is too far
        (abs distance > max_frequency_distance) from all the expected sampling frequency
        in ``fs_expected``.
    """
    # Estimate sampling_frequency
    fs_estimate = extract_sampling_frequency(data)
    # Compute the distance to expected frequencies
    frequency_distance = np.abs(np.array(fs_expected) - fs_estimate)
    # Check if we are close enough otherwise raise a warning
    if min(frequency_distance) > max_frequency_distance:
        raise ValueError(
            f"Estimated sampling frequency {fs_estimate} is further than 5 Hz from "
            f"expected frequencies: {fs_expected}."
        )
    if min(frequency_distance) > 1:
        warnings.warn(
            f"Estimated sampling frequency {fs_estimate} is further than 1 Hz from "
            f"expected frequencies: {fs_expected}."
        )
    return fs_expected[int(np.argmin(frequency_distance))] 
[docs]
def cross_corr(
    data_1: npt.NDArray[np.float64], data_2: npt.NDArray[np.float64]
) -> Tuple[np.ndarray, np.ndarray]:
    """Compute cross-correlation between two signals.
    Uses cross-correlation with the same signal as input twice.
    Parameters
    ----------
    data_1
        A first signal passed as an iterable of floats.
    data_2
        A second signal passed as an iterable of floats.
    Returns
    -------
    Tuple[numpy.ndarray, numpy.ndarray]
        A Tuple containing temporal delays and auto-correlation array.
    """
    n_samples = len(list(data_1))
    corr = np.correlate(data_1, data_2, mode="full")
    lags = np.arange(-(n_samples - 1), n_samples)
    return lags, corr 
[docs]
def autocov(data: npt.NDArray[np.float64]) -> Tuple[np.ndarray, np.ndarray]:
    """Compute auto-covariance of a signal.
    Uses autocovariance from as autocorrelation of demeaned signal.
    Parameters
    ----------
    data
        A signal passed as an iterable of floats.
    Returns
    -------
    Tuple[numpy.ndarray, numpy.ndarray]
        A Tuple containing temporal delays  and auto-covariance array.
    """
    # compute autocovariance
    return autocorr(data - np.mean(data)) 
[docs]
def autocorr(data: npt.NDArray[np.float64]) -> Tuple[np.ndarray, np.ndarray]:
    """Compute auto-correlation of a signal.
    Uses cross-correlation with the same signal as input twice.
    Parameters
    ----------
    data
        A signal passed as an iterable of floats.
    Returns
    -------
    Tuple[numpy.ndarray, numpy.ndarray]
        A Tuple containing temporal delays  and auto-correlation array.
    """
    return cross_corr(data, data) 
[docs]
def scale_corr(
    corr: np.ndarray,
    n_samples: int,
    phase_shift: np.ndarray,
    method: Literal["biased", "unbiased"],
) -> np.ndarray:
    """Scale auto-correlation.
    Parameters
    ----------
    corr
        An iterable of floats containing the correlations for each delay.
    n_samples
        The number of samples of the input signal (denoted N in equation).
    phase_shift
        The phase shift in number of samples (denoted m in equation).
    method
        the method to be used (biased or unbiased)
    Returns
    -------
    numpy.ndarray
        An array containing the scaled auto-correlation values.
    """
    if method == "biased":
        corr = corr / n_samples
    elif method == "unbiased":
        corr /= n_samples - abs(phase_shift)
    return corr 
[docs]
def scaled_autocorr(
    data: npt.NDArray[np.float64],
    method: Literal["unbiased"] = "unbiased",
    do_autocov: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
    """Compute scaled auto-correlation function of a signal.
    Parameters
    ----------
    data
        An iterable signal of floats.
    method
        A string defining the scaling method to be used.
    do_autocov
        A boolean denoting whether autocovariance should be used for calculation of the
        autocorrelation function.
    Returns
    -------
    Tuple[numpy.ndarray, numpy.ndarray]
        A Tuple containing lags and auto-correlation output..
    """
    if do_autocov:
        lags, corr = autocov(data)
    else:
        lags, corr = autocorr(data)
    corr = scale_corr(corr, len(list(data)), lags, method)
    return lags, corr 
[docs]
def signal_duration(data: Union[pd.Series, pd.DataFrame]) -> float:
    """Get signal duration from time-based indices.
    Parameters
    ----------
    data
        The signal of which we want to compute the duration based on its index. The
        index has to be either a TimedeltaIndex or DatetimeIndex.
    Returns
    -------
    float
        The duration of the signal (in seconds) from the index.
    """
    assert isinstance(data.index, (pd.TimedeltaIndex, pd.DatetimeIndex))
    return (data.index.max() - data.index.min()).total_seconds()