"""
Features Module
---------------
*paat.features* provides functions to compute various features from the raw
acceleration signal.
"""
import numpy as np
import pandas as pd
from numpy.lib.stride_tricks import sliding_window_view
from scipy import signal
import resampy
from agcounts.extract import get_counts
BROND_COEFF_A = np.array([1, -4.1637, 7.5712, -7.9805, 5.385, -2.4636, 0.89238, 0.06361,
-1.3481, 2.4734, -2.9257, 2.9298, -2.7816, 2.4777, -1.6847,
0.46483, 0.46565, -0.67312, 0.4162, -0.13832, 0.019852])
BROND_COEFF_B = np.array([0.049109, -0.12284, 0.14356, -0.11269, 0.053804, -0.02023,
0.0063778, 0.018513, -0.038154, 0.048727, -0.052577, 0.047847,
-0.046015, 0.036283, -0.012977, -0.0046262, 0.012835, -0.0093762,
0.0034485, -0.00080972, -0.00019623])
[docs]
def calculate_vector_magnitude(data, minus_one=False, round_negative_to_zero=False, dtype=np.float32):
r"""
Calculate the vector magnitude of the acceleration data.
Parameters
----------
data : array_like
numpy array with acceleration data
minus_one : Boolean (optional)
If set to True, the calculate the vector magnitude minus one, also known as the ENMO (Euclidian Norm Minus One)
round_negative_to_zero : Boolean (optional)
If set to True, round negative values to zero
dtype : np.dtype (optional)
set the data type of the return array. Standard float 32, but can be set to better precision
Returns
-------
vector_magnitude : np.array (acceleration values, 1)(np.float)
numpy array with vector magnitude of the acceleration
Notes
-----
The vector magnitude of the acceleration is calculated as the Euclidian Norm.
.. math:: \sqrt{y^2 + x^2 + z^2}
if minus_one is set to True then it it is the Euclidian Norm Minus One.
.. math:: \sqrt{y^2 + x^2 + z^2} - 1
if round_negative_to_zero all negative values are clipped.
"""
# change dtype of array to float32 (also to hold scaled data correctly). The original unscaled data is stored as int16, but when we want to calculate the vector we exceed the values that can be stored in 16 bit
data = data.astype(dtype=np.float32)
# calculate the vector magnitude on the whole array
vector_magnitude = np.sqrt(np.sum(np.square(data), axis=1)).astype(dtype=dtype)
# check if minus_one is set to True, if so, we need to calculate the ENMO
if minus_one:
vector_magnitude -= 1
# if set to True, round negative values to zero
if round_negative_to_zero:
vector_magnitude = vector_magnitude.clip(min=0)
# reshape the array into number of acceleration values, 1 column
return vector_magnitude.reshape(data.shape[0], 1)
[docs]
def calculate_enmo(data, dtype=np.float32):
"""
Calculate the Euclidean norm minus one from raw acceleration data.
This function is a wrapper of `calculate_vector_magnitude`.
Parameters
----------
data : array_like
numpy array with acceleration data
dtype : np.dtype (optional)
set the data type of the return array. Standard float 32, but can be set to better precision
Returns
-------
vector_magnitude : np.array (acceleration values, 1)(np.float)
numpy array with the Eucledian Norm Minus One (ENMO) of the acceleration
"""
if isinstance(data, pd.DataFrame):
data = data[["Y", "X", "Z"]].values
return calculate_vector_magnitude(data, minus_one=True, round_negative_to_zero=True)
[docs]
def calculate_frequency_features(data, win_len=60, win_step=60, sample_rate=100, nfft=512, nfilt=40):
"""
Calculate frequency features from raw acceleration signal.
Parameters
----------
data : DataFrame
a DataFrame containg the raw acceleration data
win_len : int (optional)
an int indicating the window length in seconds
win_step : int (optional)
an int indicating the step size between windows in seconds
sample_rate : float (optional)
a float indicating the sampling rate in Hz
nfft: int (optional)
an int indicating the number of points for the Fourier transform
nfilt: int (optional)
an int indicating the number of triangular filters to use
Returns
-------
time : np.array (n_samples x 1)
a numpy array with time stamps for the observations in values
features : np.array (n_samples x 160)
a numpy array with the 160-dimensional feature vector per time step
"""
acceleration = data[["Y", "X", "Z"]].values
# Calculate Euclidian Norm Minus One for the three axis
emno = calculate_vector_magnitude(acceleration, minus_one=True).squeeze()
# Calculate filter banks per signal
_, fbanks_x = _calculate_filter_banks(acceleration[:, 1], sample_rate, win_len, win_step, nfft=nfft, nfilt=nfilt)
_, fbanks_y = _calculate_filter_banks(acceleration[:, 0], sample_rate, win_len, win_step, nfft=nfft, nfilt=nfilt)
_, fbanks_z = _calculate_filter_banks(acceleration[:, 2], sample_rate, win_len, win_step, nfft=nfft, nfilt=nfilt)
_, fbanks_emno = _calculate_filter_banks(emno, sample_rate, win_len, win_step, nfft=nfft, nfilt=nfilt)
# Create feature vector
features = np.hstack([fbanks_x, fbanks_y, fbanks_z, fbanks_emno])
return features
def _hz_to_mel(hz):
return (2595 * np.log10(1 + (hz / 2) / 700))
def _mel_to_hz(mel):
return (700 * (10**(mel / 2595) - 1))
def _calculate_filter_banks(signal, sample_rate, win_len, win_step, nfft=512, nfilt=40):
"""
Calculate filter banks for a signal.
See https://haythamfayek.com/2016/04/21/speech-processing-for-machine-learning.html
"""
# Calculate frames on second level to have more data points
frame_len, frame_step = int(win_len * sample_rate), int(win_step * sample_rate)
frames = sliding_window_view(signal, frame_len)[::frame_step].copy()
frames *= np.hamming(frame_len)
mag_frames = np.absolute(np.fft.rfft(frames, nfft))
pow_frames = (1.0 / nfft) * (mag_frames ** 2)
low_freq_mel = 0
high_freq_mel = _hz_to_mel(sample_rate)
mel_points = np.linspace(low_freq_mel, high_freq_mel, nfilt + 2)
hz_points = _mel_to_hz(mel_points)
bin = np.floor((nfft + 1) * hz_points / sample_rate)
fbank = np.zeros((nfilt, int(np.floor(nfft / 2 + 1))))
for mm in range(1, nfilt + 1):
f_m_minus = int(bin[mm - 1]) # left
f_m = int(bin[mm]) # center
f_m_plus = int(bin[mm + 1]) # right
for kk in range(f_m_minus, f_m):
fbank[mm - 1, kk] = (kk - bin[mm - 1]) / (bin[mm] - bin[mm - 1])
for kk in range(f_m, f_m_plus):
fbank[mm - 1, kk] = (bin[mm + 1] - kk) / (bin[mm + 1] - bin[mm])
filter_banks = np.dot(pow_frames, fbank.T)
# remove zeros for log
filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)
filter_banks = 20 * np.log10(filter_banks) # dB
return hz_points, filter_banks
def _calc_one_axis_brond_counts(acc, sample_freq, epoch_length, deadband=0.068, peak=2.13, adcResolution=0.0164, A=BROND_COEFF_A, B=BROND_COEFF_B):
"""
Helper function to process one axis with the Brond algorithm.
Parameters
----------
acc : array_like
a numpy containg one axis of the raw acceleration data
sample_freq : int
an int indicating at which sampling frequency the data was recorded
epoch_length: int
an int indicating the length of the epochs to calculate in seconds
deadband: float (optional)
a float indicating the threshold that counts are being calculated. The
dead band threshold for newer ActiGraph models (GT3X+ and newer models)
is specified at 0.05g
peak: float
a float indicating the upper limit of recording. ActiLife truncates values
above 2.13g to ensure compatibility with the 8-bit analog to digital
conversion
adcResolution: float
a float indicating the conversion factor for 8bit
A, B: array_like
arrays with the filter's nominator and denominator
Returns
-------
counts: array_like
a numpy array containg the Brønd counts
"""
# Step 0: Downsample to 30hz if not already
target_hz = 30
if sample_freq != target_hz:
resampled_data = resampy.resample(acc, sample_freq, target_hz)
# Step 1: Aliasing filter (0.01-7hz)
B2, A2 = signal.butter(4, np.array([0.01, 7])/(target_hz/2), btype='bandpass')
dataf = signal.filtfilt(B2, A2, resampled_data)
# Step 2: ActiGraph filter
filtered = signal.lfilter(0.965 * B, A, dataf)
# Step 3, 4 & 5: Downsample to 10hz, clip at peak (2.13g) and rectify
rectified = np.abs(np.clip(filtered[::3], a_min=-1*peak, a_max=peak))
# Step 6 & 7: Dead-band and convert to 8bit resolution
downsampled = np.where(rectified < deadband, 0, rectified) // adcResolution
# If last epoch was not fully recorded, pad with zeros
values_per_epoch = 10 * epoch_length
missing_values = int(np.ceil(len(downsampled) / values_per_epoch) * values_per_epoch) - len(downsampled)
downsampled = np.pad(downsampled, (0, missing_values), 'constant', constant_values=0)
# Step 8: Accumulate
downsampled = downsampled.reshape([-1, 10 * epoch_length])
counts = np.where(downsampled >= 0, downsampled, 0).sum(axis=1)
return counts.astype(int)
[docs]
def calculate_brond_counts(data, sample_freq, epoch_length):
"""
Create Brønd counts from uniaxial acceleration data. The algorithm was described in
Brønd, J. C., Andersen, L. B., & Arvidsson, D. (2017). Generating ActiGraph Counts
from Raw Acceleration Recorded by an Alternative Monitor, Medicine & Science in
Sports & Exercise, doi: 10.1249/MSS.0000000000001344
Parameters
----------
data : DataFrame
a DataFrame containg the raw acceleration data
sample_freq : int
an int indicating at which sampling frequency the data was recorded
epoch_length: int
an int indicating the length of the epochs to calculate in seconds
Returns
-------
counts: DataFrame
a DataFrame containg the Brønd counts
"""
timestamps = data.resample(epoch_length).mean().index
if isinstance(epoch_length, str):
epoch_length = pd.Timedelta(epoch_length).seconds
counts = pd.DataFrame({"Y": _calc_one_axis_brond_counts(data["Y"].values, sample_freq, epoch_length),
"X": _calc_one_axis_brond_counts(data["X"].values, sample_freq, epoch_length),
"Z": _calc_one_axis_brond_counts(data["Z"].values, sample_freq, epoch_length)},
index=timestamps)
return counts
[docs]
def calculate_actigraph_counts(data, sample_freq, epoch_length):
"""
Wrapper function to create ActiGraph counts. A companion paper has been
submitted by ActiGraph and will be referenced here as soon as it is published.
Parameters
----------
data: array_like
a numpy array containing the uniaxial acceleration data
sample_freq: int
an int indicating at which sampling frequency the data was recorded
epoch_length: str
a string indicating the length of the epochs to calculate
Returns
-------
counts: array_like
a numpy array containg the ActiGraph counts
"""
sec_per_epoch = pd.Timedelta(epoch_length).seconds
counts = get_counts(data[["Y", "X", "Z"]].values, sample_freq, sec_per_epoch)
index = data.resample(epoch_length).mean(numeric_only=True).index
counts = pd.DataFrame(counts, columns=["Y", "X", "Z"], index=index[:len(counts)])
return counts
def _mad(chunk):
"""
Calculate the mean amplitude deviation (MAD) of the raw
acceleration chunk.
Parameters
----------
chunk : pd.Series
a chunk of a DataFrame
Returns
-------
m : float
the mean amplitude deviation (MAD) of the raw acceleration chunk
"""
r = np.sqrt(np.sum(np.square(chunk[["X", "Y", "Z"]].values), axis=1))
m = np.mean(np.abs(r - r.mean()))
return m
[docs]
def calculate_mad(data, freq="6s"):
"""
Calculate the mean amplitude deviation (MAD) of the raw
acceleration signal based on Vähä-Ypyä et al. (2015), by
.. math:: MAD = \\frac{1}{n} \sum_i | r_i - \\bar{r} |
with
.. math:: r = \\sqrt{y^2 + x^2 + z^2}
References
----------
Vähä-Ypyä, H., Vasankari, T., Husu, P., Suni, J., & Sievänen, H. (2015). A universal, accurate intensity-based classification of different physical activities using raw data of accelerometer. *Clinical Physiology and Functional Imaging*, 35(1), 64–70. https://doi.org/10.1111/cpf.12127
Parameters
----------
data : array_like
numpy array with acceleration data
freq : str
the sampling frequency on which the MAD values should be calculated
Returns
-------
mad : np.array (acceleration values, 1)(np.float)
numpy array with the Mean Amplitude Deviation (MAD) of the acceleration
"""
mad = mad_data = data.resample(freq).apply(_mad)
return mad