Source code for paat.wear_time

"""
Wear Time Module
----------------

*paat.wear_time* provides functions to infer wear and non wear times from
raw acceleration signals.

"""
import logging
import os
import sys

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import numpy as np
import pandas as pd
# from tensorflow.keras import models
import keras

from . import preprocessing, features


def _find_candidate_non_wear_segments_from_raw(acc_data, std_threshold, hz, min_segment_length=1, sliding_window=1, use_vmu=False):
    """
    Find segements within the raw acceleration data that can potentially be non-wear time (finding the candidates)

    Parameters
    ----------
    acc_data : np.array(samples, axes)
        numpy array with acceleration data (typically YXZ)
    std_threshold : int or float
        the standard deviation threshold in g
    hz : int
        sample frequency of the acceleration data (could be 32hz or 100hz for example)
    min_segment_length : int (optional)
        minimum length of the segment to be candidate for non-wear time (default 1 minutes, so any shorter segments will not be considered non-wear time)
    sliding_window : int (optional)
        sliding window in minutes that will go over the acceleration data to find candidate non-wear segments
    use_vmu : bool
        indicates whether the algorithm should runon vector magnitude data

    Returns
    -------
    nw_vector_final: array_like
        a numpy array with candidate non-wear time periods
    """

    # adjust the sliding window to match the samples per second (this is encoded in the samplign frequency)
    sliding_window *= hz * 60
    # adjust the minimum segment lenght to reflect minutes
    min_segment_length *= hz * 60

    # define new non wear time vector that we initiale to all 1s, so we only have the change when we have non wear time as it is encoded as 0
    nw_vector = np.ones((len(acc_data), 1), dtype=np.uint8)
    nw_vector_final = np.ones((len(acc_data), 1), dtype=np.uint8)

    # loop over slices of the data
    for ii in range(0, len(acc_data), sliding_window):

        # slice the data
        data = acc_data[ii:ii + sliding_window]

        # calculate VMU if set to true
        if use_vmu:
            # calculate the VMU of XYZ
            data = features.calculate_vector_magnitude(data)

        # calculate the standard deviation of each column (YXZ)
        std = np.std(data, axis=0)

        # check if all of the standard deviations are below the standard deviation threshold
        if np.all(std <= std_threshold):

            # add the non-wear time encoding to the non-wear-vector for the correct time slices
            nw_vector[ii:ii + sliding_window] = 0

    # find all indexes of the numpy array that have been labeled non-wear time
    non_wear_indexes = np.where(nw_vector == 0)[0]

    # find the min and max of those ranges, and increase incrementally to find the edges of the non-wear time
    for row in _find_consecutive_index_ranges(non_wear_indexes):

        # check if not empty
        if row.size != 0:

            # define the start and end of the index range
            start_slice, end_slice = np.min(row), np.max(row)

            # backwards search to find the edge of non-wear time vector
            start_slice = _backward_search_non_wear_time(data=acc_data, start_slice=start_slice, end_slice=end_slice, std_max=std_threshold, hz=hz)
            # forward search to find the edge of non-wear time vector
            end_slice = _forward_search_non_wear_time(data=acc_data, start_slice=start_slice, end_slice=end_slice, std_max=std_threshold, hz=hz)

            # calculate the length of the slice (or segment)
            length_slice = end_slice - start_slice

            # minimum length of the non-wear time
            if length_slice >= min_segment_length:

                # update numpy array by setting the start and end of the slice to zero (this is a non-wear candidate)
                nw_vector_final[start_slice:end_slice] = 0

    # return non wear vector with 0= non-wear and 1 = wear
    return nw_vector_final


def _find_consecutive_index_ranges(vector, increment=1):
    """
    Find ranges of consequetive indexes in numpy array

    Parameters
    ----------
    vector: numpy vector
        numpy vector of integer values
    increment: int (optional)
        difference between two values (typically 1)

    Returns
    -------
    indexes: list
        list of ranges, for instance [1,2,3,4],[8,9,10], [44]
    """

    return np.split(vector, np.where(np.diff(vector) != increment)[0] + 1)


def _forward_search_non_wear_time(data, start_slice, end_slice, std_max, hz, time_step=60):
    """
    Increase the end_slice to obtain more non_wear_time (used when non-wear range has been found but due to window size, the actual non-wear time can be slightly larger)

    Parameters
    ----------
    data: numpy array of time x 3 axis
        raw log data
    start_slice: int
        start of known non-wear time range
    end_slice: int
        end of known non-wear time range
    std_max: int or float
        the standard deviation threshold in g
    hz : int
        sample frequency of the data (necessary when working with indexes)
    time_step: int (optional)
        value to add (or subtract in the backwards search) to find more non-wear time

    Returns
    -------
    end_slice: int
        index of the end of the non-wear time period
    """

    # adjust time step on number of samples per time step window
    time_step *= hz

    # define the end of the range
    end_of_data = len(data)

    # Do-while loop
    while True:

        # define temporary end_slice variable with increase by step
        temp_end_slice = end_slice + time_step

        # check condition range still contains non-wear time
        if temp_end_slice <= end_of_data and np.all(np.std(data[start_slice:temp_end_slice], axis=0) <= std_max):

            # update the end_slice with the temp end slice value
            end_slice = temp_end_slice

        else:
            # here we have found that the additional time we added is not non-wear time anymore, stop and break from the loop by returning the updated slice
            return end_slice


def _backward_search_non_wear_time(data, start_slice, end_slice, std_max, hz, time_step=60):
    """
    Decrease the start_slice to obtain more non_wear_time (used when non-wear range has been found but the actual non-wear time can be slightly larger, so here we try to find the boundaries)

    Parameters
    ----------
    data: numpy array of time x 3 axis
        raw log data
    start_slice: int
        start of known non-wear time range
    end_slice: int
        end of known non-wear time range
    std_max: int or float
        the standard deviation threshold in g
    hz : int
        sample frequency of the data (necessary when working with indexes)
    time_step: int (optional)
        value to add (or subtract in the backwards search) to find more non-wear time

    Returns
    -------
    start_slice: int
        index of the start of the non-wear time period
    """

    # adjust time step on number of samples per time step window
    time_step *= hz

    # Do-while loop
    while True:

        # define temporary end_slice variable with increase by step
        temp_start_slice = start_slice - time_step

        # logging.debug('Decreasing temp_start_slice to: {}'.format(temp_start_slice))

        # check condition range still contains non-wear time
        if temp_start_slice >= 0 and np.all(np.std(data[temp_start_slice:end_slice], axis=0) <= std_max):

            # update the start slice with the new temp value
            start_slice = temp_start_slice

        else:
            # here we have found that the additional time we added is not non-wear time anymore, stop and break from the loop by returning the updated slice
            return start_slice


def _group_episodes(episodes, distance_in_min=3, correction=3, hz=100, training=False):
    """
    Group episodes that are very close together

    Parameters
    -----------
    episodes : pd.DataFrame()
        dataframe with episodes that need to be grouped
    distance_in_min : int
        maximum distance two episodes can be apart and need to be grouped together
    correction : int
        due to changing from 100hz to 32hz we need to allow for a small correction to capture full minutes
    hz : int
        sample frequency of the data (necessary when working with indexes)

    Returns
    -------
    grouped_episodes: pd.DataFrame()
        dataframe with grouped episodes
    """

    # check if there is only 1 episode in the episodes dataframe, if so, we need not to do anything since we cannot merge episodes if we only have 1
    if episodes.empty or len(episodes) == 1:
        # transpose back and return
        return episodes.T

    # create a new dataframe that will contain the grouped rows
    grouped_episodes = []

    # get all current values from the first row
    current_start = episodes.iloc[0]['start']
    current_start_index = episodes.iloc[0]['start_index']
    current_stop = episodes.iloc[0]['stop']
    current_stop_index = episodes.iloc[0]['stop_index']
    current_label = None if not training else episodes.iloc[0]['label']
    current_counter = episodes.iloc[0]['counter']

    # loop over each next row (note that we skip the first row)
    for _, row in episodes.iloc[1:].iterrows():

        # define all next values
        next_start = row.loc['start']
        next_start_index = row.loc['start_index']
        next_stop = row.loc['stop']
        next_stop_index = row.loc['stop_index']
        next_label = None if not training else row.loc['label']
        next_counter = row.loc['counter']

        # check if there are 'distance_in_min' minutes apart from current and next ( + correction for some adjustment)
        if next_start_index - current_stop_index <= hz * 60 * distance_in_min + correction:

            # here the two episodes are close to eachother, we update the values and continue the next row to see if we can group more. If it's the last row, we need to add it to the dataframe
            current_stop_index = next_stop_index
            current_stop = next_stop

            # check if row is the last row
            if next_counter == episodes.iloc[-1]['counter']:

                # create the counter label
                counter_label = f'{current_counter}-{next_counter}'

                # save to new dataframe
                grouped_episodes += [pd.Series({
                    'counter': counter_label,
                    'start_index': current_start_index,
                    'start': current_start,
                    'stop_index': current_stop_index,
                    'stop': current_stop,
                    'label': None if not training else current_label
                })]
        else:

            # create the counter label
            counter_label = current_counter if (next_counter - current_counter == 1) else f'{current_counter}-{next_counter - 1}'

            # save to new dataframe
            grouped_episodes += [pd.Series({
                'counter': counter_label,
                'start_index': current_start_index,
                'start': current_start,
                'stop_index': current_stop_index,
                'stop': current_stop,
                'label': None if not training else current_label
            })]

            # update tracker variables
            current_start = next_start
            current_start_index = next_start_index
            current_stop = next_stop
            current_stop_index = next_stop_index
            current_label = next_label
            current_counter = next_counter

            # check if last row then also include by itself
            if next_counter == episodes.iloc[-1]['counter']:

                # save to new dataframe
                grouped_episodes += [pd.Series({
                    'counter': next_counter,
                    'start_index': current_start_index,
                    'start': current_start,
                    'stop_index': current_stop_index,
                    'stop': current_stop,
                    'label': None if not training else current_label
                })]

    return pd.concat(grouped_episodes, axis=1)


[docs] def detect_non_wear_time_syed2021(data, sample_freq, cnn_model_file=None, std_threshold=0.004, distance_in_min=5, episode_window_sec=7, edge_true_or_false=True, start_stop_label_decision='and', min_segment_length=1, sliding_window=1, verbose=False): """ Infer non-wear time from raw 100Hz triaxial data based on the method proposed by Syed et al. (2021). Data at different sample frequencies will be resampled to 100hz. The steps are described in the paper but are summarized here too: Detect candidate non-wear episodes: Perform a forward pass through the raw acceleration signal and calculate the SD for each 1-minute interval and for each individual axis. If the standard deviation is <= 0.004 g for all axes, record this 1-minute interval as a candidate non-wear interval. After all 1-minute intervals have been processed, merge consecutive 1-minute intervals into candidate non-wear episodes and record their start and stop timestamps. Merge bordering candidate non-wear episodes: Merge candidate non-wear episodes that are no more than 5 minutes apart and record their new start and stop timestamps. This step is required to capture artificial movement that would typically break up two or more candidate non-wear episodes in close proximity. Detect the edges of candidate non-wear episodes: Perform a backward pass with a 1-second step size through the acceleration data from the start timestamp of a candidate non-wear episode and calculate the SD for each individual axis. The same is applied for the stop timestamps with a forward pass and a step size of 1 second. If the standard deviation of all axes is <= 0.004 g, include the 1-second interval into the candidate non-wear episode and record the new start or stop timestamp. Repeat until the standard deviation of the 1-second interval does not satisfy <= 0.004 g. As a result, the resolution of the edges is now recorded on a 1-second resolution. Classifying the start and stop windows: For each candidate non-wear episode, extract the start and stop segment with a window length of 3 seconds to create input features for the CNN classification model. For example, if a candidate non-wear episode has a start timestamp of tstart a feature matrix is created as (tstart – w , tstart) x 3 axes with w = 3 seconds, resulting in an input feature with dimensions (300 x 3) for 100Hz data. If both (i.e., logical ‘AND’) start and stop features are classified (through the CNN model) as non-wear time, the candidate non-wear episode can be considered true non-wear time. If tstart is at t = 0, or tend is at the end of the acceleration data—meaning that those candidate non-wear episodes do not have a preceding or following window to extract features from—classify the start or stop as non-wear time by default. References ---------- Syed, S., Morseth, B., Hopstock, L. A., & Horsch, A. (2021). A novel algorithm to detect non-wear time from raw accelerometer data using deep convolutional neural networks. *Scientific Reports*, 11(1), 8832. https://doi.org/10.1038/s41598-021-87757-z Parameters ---------- data : DataFrame a DataFrame containg the raw acceleration data sample_freq : int sample frequency of the data. The CNN model was trained for 100Hz of data. If the data is at a different sampling frequency it will be resampled to 100Hz cnn_model_file: os.path (optional) file location of the trained CNN model. On default, the corresponding pretrained model is used. std_threshold: float (optional) standard deviation threshold to find candidate non-wear episodes. Default 0.004 g distance_in_min: int (optional) causes two nearby candidate non-wear episodes not more than 'distance_in_min' apart to be grouped/merged together. This results in capturing artificial movement that would otherwise break up a longer candidate non-wear time. Defaults to 5 minutes. episode_window_sec: int (optional) length of the window to extract features from the start or the end of a candidate non-wear episode. If a non-wear episodes starts at time t, then a feature will be extracted from the raw data t-'episode_window_sec' to t. The same happens at the end of a candidate non-wear episode. So t-end untill t-end + 'episode_window_sec' Default to 7 seconds. Also note that a different value will need a different trained CNN model. edge_true_or_false: Bool (optional) default classification if a candidate non-wear episode starts at the start of the acceleration data, so at t=0, or ends at the end of the acceleration data. In such cases, we can't obtain the feature at t-'episode_window_sec' since there is no data before t=0. In these cases, the start or stop of the candidate non-wear episode will be defaulted to True (non-wear time) or False (wear-time). Default value is True start_stop_label_decision: string ('or','and') (optional) Logical operator OR or AND to determine if a candidate non-wear episode should be considered non-wear time if only one side, either the start or the stop parts, is inferred as non-wear time, or if both sides need to be inferred as non-wear time for the candidate non-wear time to be considered true non-wear time. Default to AND, meaning that both the start and the stop parts of the candidate non-wear time need to be inferred as non-wear time to allow the candidate non-wear time to be true non-wear time. In all other cases, the candidate non-wear time is then wear-time. min_segment_length: int (optional) minimum length of the segment to be candidate for non-wear time sliding_window: int (optional) sliding window in minutes that will go over the acceleration data to find candidate non-wear segments verbose: Bool (optional) set to True if debug messages should be printed to the console and log file. Default False. Returns ------- nw_vector: np.array(n_samples,) a numpy array indicating whether the values of the acceleration data are non-wear time Notes ----- - If the data is not 100hz, then it will be resampled to 100hz. However, how the inference of non-wear time is affected by this has not been investigated. - CNN models were trained with a hip worn accelerometer. """ raw_acc = data[["X", "Y", "Z"]].values # use one of the default models if no model file is given if cnn_model_file is None: cnn_model_file = os.path.join(os.path.pardir, os.path.dirname(__file__), 'models', f'cnn_v2_{str(episode_window_sec)}.pb') # check if data is triaxial if raw_acc.shape[1] != 3: logging.error('Acceleration data should be triaxial/3 axes. Number of axes found is %s', raw_acc.shape[1]) sys.exit() # check if data needs to be resampled to 100hz if sample_freq != 100: logging.info('Sampling frequency of the data is %s Hz, should be 100Hz, starting resampling....', sample_freq) # call resampling function raw_acc = preprocessing.resample_acceleration(data=raw_acc, from_hz=sample_freq, to_hz=100, verbose=verbose) logging.info('Data resampled to 100hz') # set sampling frequency to 100hz sample_freq = 100 # create new non-wear vector that is prepopulated with wear-time encoding. This way we only have to record the non-wear time nw_vector = np.zeros(raw_acc.shape[0], dtype=bool) # get candidate non-wear episodes (note that these are on a minute resolution). Also note that it returns wear time as 1 and non-wear time as 0 nw_episodes = _find_candidate_non_wear_segments_from_raw(acc_data=raw_acc, std_threshold=std_threshold, min_segment_length=min_segment_length, sliding_window=sliding_window, hz=sample_freq) # find all indexes of the numpy array that have been labeled non-wear time nw_indexes = np.where(nw_episodes == 0)[0] # find consecutive ranges non_wear_segments = _find_consecutive_index_ranges(nw_indexes) # empty dictionary where we can store the start and stop times dic_segments = {} # check if segments are found if len(non_wear_segments[0]) > 0: # find start and stop times (the indexes of the edges and find corresponding time) for ii, row in enumerate(non_wear_segments): # find start and stop start, stop = np.min(row), np.max(row) # add the start and stop times to the dictionary # note that start and stop timestamps are not given. dic_segments[ii] = {'counter': ii, 'start': start, 'start_index': start, 'stop': stop, 'stop_index': stop} # create dataframe from segments episodes = pd.DataFrame.from_dict(dic_segments) # Merge episodes that are close to each other grouped_episodes = _group_episodes(episodes=episodes.T, distance_in_min=distance_in_min, correction=3, hz=sample_freq, training=False).T # load CNN model cnn_model = keras.layers.TFSMLayer(cnn_model_file, call_endpoint='serving_default') # For each episode, extend the edges, create features and infer label for _, row in grouped_episodes.iterrows(): start_index = int(row.loc['start_index']) stop_index = int(row.loc['stop_index']) if verbose: logging.debug('Processing episode start_index: %s, stop_index: %s', start_index, stop_index) # forward search to extend stop index stop_index = _forward_search_episode(raw_acc, stop_index, hz=sample_freq, max_search_min=5, std_threshold=std_threshold, verbose=verbose) # backwar search to extend start index start_index = _backward_search_episode(raw_acc, start_index, hz=sample_freq, max_search_min=5, std_threshold=std_threshold, verbose=verbose) # get start episode start_episode = raw_acc[start_index - (episode_window_sec * sample_freq): start_index] # get stop episode stop_episode = raw_acc[stop_index: stop_index + (episode_window_sec * sample_freq)] # default label for start and stop combined. The first False will turn into True if the start of the episode is inferred as non-wear time. The same happens to the # second False when the end is inferred as non-weaer time start_stop_label = [False, False] # Start episode if start_episode.shape[0] == episode_window_sec * sample_freq: # reshape into num feature x time x axes start_episode = start_episode.reshape(1, start_episode.shape[0], start_episode.shape[1]) # get binary class from model start_label = (cnn_model(start_episode)["output_0"].numpy().squeeze() >= .5).astype("int32") # if the start label is 1, this means that it is wear time, and we set the first start_stop_label to 1 if start_label == 1: start_stop_label[0] = True else: # there is an episode right at the start of the data, since we cannot obtain a full epsisode_window_sec array # here we say that True for nw-time and False for wear time start_stop_label[0] = edge_true_or_false # Stop episode if stop_episode.shape[0] == episode_window_sec * sample_freq: # reshape into num feature x time x axes stop_episode = stop_episode.reshape(1, stop_episode.shape[0], stop_episode.shape[1]) # get binary class from model stop_label = (cnn_model(stop_episode)["output_0"].numpy().squeeze() >= .5).astype("int32") # if the start label is 1, this means that it is wear time, and we set the first start_stop_label to 1 if stop_label == 1: start_stop_label[1] = True else: # there is an episode right at the END of the data, since we cannot obtain a full epsisode_window_sec array # here we say that True for nw-time and False for wear time start_stop_label[1] = edge_true_or_false # check the start_stop_label. if start_stop_label_decision == 'or': # use logical OR to determine if episode is true non-wear time if any(start_stop_label): # true non-wear time, record start and stop in nw-vector nw_vector[start_index:stop_index] = True # verbose if verbose: logging.info('Found non-wear time: start_index: %s, Stop_index: %s', start_index, stop_index) elif start_stop_label_decision == 'and': # use logical and to determine if episode is true non-wear time if all(start_stop_label): # true non-wear time, record start and stop in nw-vector nw_vector[start_index:stop_index] = True # verbose if verbose: logging.info('Found non-wear time: start_index: %s, Stop_index: %s', start_index, stop_index) else: logging.error('Start/Stop decision unknown, can only use or/and, given: %s', start_stop_label_decision) sys.exit() return nw_vector
[docs] def detect_non_wear_time_hees2011(data, sample_freq, min_non_wear_time_window=60, window_overlap=15, std_mg_threshold=3.0, std_min_num_axes=2, value_range_mg_threshold=50.0, value_range_min_num_axes=2): """ Estimation of non-wear time periods based on Van Hees et al. (2011, 2013) Accelerometer non-wear time was estimated on the basis of the standard deviation and the value range of each accelerometer axis, calculated for consecutive blocks of 30 minutes. A block was classified as non-wear time if the standard deviation was less than 3.0 mg (1 mg = 0.00981 m·s−2) for at least two out of the three axes or if the value range, for at least two out of three axes, was less than 50 mg. Important to note that the default encoding for this function of non-wear time = 0, and that of wear time is 1. References ---------- Van Hees, V. T., Renström, F., Wright, A., Gradmark, A., Catt, M., Chen, K. Y., Löf, M., Bluck, L., Pomeroy, J., Wareham, N. J., Ekelund, U., Brage, S., & Franks, P. W. (2011). Estimation of Daily Energy Expenditure in Pregnant and Non-Pregnant Women Using a Wrist-Worn Tri-Axial Accelerometer. *PLOS ONE*, 6(7), 7. https://doi.org/10.1371/journal.pone.0022922 Hees, V. T. van, Gorzelniak, L., León, E. C. D., Eder, M., Pias, M., Taherian, S., Ekelund, U., Renström, F., Franks, P. W., Horsch, A., & Brage, S. (2013). Separating Movement and Gravity Components in an Acceleration Signal and Implications for the Assessment of Human Daily Physical Activity. *PLOS ONE*, 8(4), e61691. https://doi.org/10.1371/journal.pone.0061691 Parameters ---------- data : DataFrame a DataFrame containg the raw acceleration data sample_freq : int sample frequency in hertz. Indicates the number of samples per 1 second. Default to 100 for 100hz. The sample frequency is necessary to know how many samples there are in a specific window. So let's say we have a window of 15 minutes, then there are hz * 60 * 15 samples min_non_wear_time_window: int (optional) minimum window length in minutes to be classified as non-wear time window_overlap: int (optional) basically the sliding window that progresses over the acceleration data. Defaults to 15 minutes. std_mg_threshold: float (optional) standard deviation threshold in mg. Acceleration axes values below or equal this threshold can be considered non-wear time. Defaults to 3.0g. Note that within the code we convert mg to g. std_min_num_axes: int (optional) minimum numer of axes used to check if acceleration values are below the std_mg_threshold value. Defaults to 2 axes; meaning that at least 2 axes need to have values below a threshold value to be considered non wear time value_range_mg_threshold: float (optional) value range threshold value in mg. If the range of values within a window is below this threshold (meaning that there is very little change in acceleration over time) then this can be considered non wear time. Default to 50 mg. Note that within the code we convert mg to g value_range_min_num_axes: int (optional) minimum numer of axes used to check if acceleration values range are below the value_range_mg_threshold value. Defaults to 2 axes; meaning that at least 2 axes need to have a value range below a threshold value to be considered non wear time Returns ------- nw_vector: np.array(n_samples,) a numpy array indicating whether the values of the acceleration data are non-wear time """ raw_acc = data[["X", "Y", "Z"]].values # number of data samples in 1 minute num_samples_per_min = sample_freq * 60 # define the correct number of samples for the window and window overlap min_non_wear_time_window *= num_samples_per_min window_overlap *= num_samples_per_min # convert the standard deviation threshold from mg to g std_mg_threshold /= 1000 # convert the value range threshold from mg to g value_range_mg_threshold /= 1000 # new array to record non-wear time. The default behavior is 0 = non-wear time, and 1 = wear time. Since we create a new array filled with wear time encoding, we only have to # deal with non-wear time, since everything else is already set as wear-time. nw_vector = np.zeros(raw_acc.shape[0], dtype=bool) # loop over the data, start from the beginning with a step size of window overlap for ii in range(0, len(raw_acc), window_overlap): # define the start of the sequence start = ii # define the end of the sequence end = ii + min_non_wear_time_window # slice the data from start to end subset_data = raw_acc[start:end] # check if the data sequence has been exhausted, meaning that there are no full windows left in the data sequence (this happens at the end of the sequence) # comment out if you want to use all the data if len(subset_data) < min_non_wear_time_window: break # calculate the standard deviation of each column (YXZ) std = np.std(subset_data, axis=0) # check if the standard deviation is below the threshold, and if the number of axes the standard deviation is below equals the std_min_num_axes threshold if (std < std_mg_threshold).sum() >= std_min_num_axes: # at least 'std_min_num_axes' are below the standard deviation threshold of 'std_min_num_axes', now set this subset of the data to the non-wear time encoding. # Note that the full 'new_wear_vector' is pre-populated with the wear time encoding, so we only have to set the non-wear time. nw_vector[start:end] = True # calculate the value range (difference between the min and max) (here the point-to-point numpy method is used) for each column value_range = np.ptp(subset_data, axis=0) # check if the value range, for at least 'value_range_min_num_axes' (e.g. 2) out of three axes, was less than 'value_range_mg_threshold' (e.g. 50) mg if (value_range < value_range_mg_threshold).sum() >= value_range_min_num_axes: # set the non wear vector to non-wear time for the start to end slice of the data nw_vector[start:end] = True return nw_vector
[docs] def detect_non_wear_time_naive(data, sample_freq, std_threshold, min_interval, use_vmu=False, min_segment_length=1, sliding_window=1): """ Calculate non-wear time from raw acceleration data by finding intervals in which the acceleration standard deviation is below a std_threshold value Parameters ---------- data : DataFrame a DataFrame containg the raw acceleration data sample_freq : int sample frequency of the data std_threshold: int or float the standard deviation threshold in g min_interval: int or float minimal interval to consider period as non-wear time use_vmu: bool indicates whether the algorithm should runon vector magnitude data min_segment_length: int (optional) minimum length of the segment to be candidate for non-wear time (default 1 minutes, so any shorter segments will not be considered non-wear time) sliding_window: int (optional) sliding window in minutes that will go over the acceleration data to find candidate non-wear segments Returns ------- nw_vector: np.array(n_samples,) a numpy array indicating whether the values of the acceleration data are non-wear time """ raw_acc = data[["X", "Y", "Z"]].values # make sure hz is int sample_freq = int(sample_freq) # create an new non-wear vector that we propoulate with wear-time encoding. This way we only have to update the vector with non-wear time nw_vector = np.zeros(raw_acc.shape[0], dtype=bool) """ FIND CANDIDATE NON-WEAR SEGMENTS ACTIGRAPH ACCELERATION DATA """ # get candidate non-wear episodes (note that these are on a minute resolution) nw_episodes = _find_candidate_non_wear_segments_from_raw(acc_data=raw_acc, std_threshold=std_threshold, min_segment_length=min_segment_length, sliding_window=sliding_window, hz=sample_freq, use_vmu=use_vmu) """ GET START AND END TIME OF NON WEAR SEGMENTS """ # find all indexes of the numpy array that have been labeled non-wear time. Note that the function _find_candidate_non_wear_segments_from_raw returns # non-wear episodes as 1, and wear time as 1 nw_indexes = np.where(nw_episodes == 0)[0] # find consecutive ranges non_wear_segments = _find_consecutive_index_ranges(nw_indexes) # check if segments are found if len(non_wear_segments[0]) > 0: # find start and stop times (the indexes of the edges and find corresponding time) for _, row in enumerate(non_wear_segments): # find start and stop start, stop = np.min(row), np.max(row) # calculate lenght of episode in minutes length = int((stop - start) / sample_freq / 60) # check if length exceeds threshold, if so, then this is non-wear time if length >= min_interval: # now update nw vector nw_vector[start:stop] = True # return values return nw_vector
def _forward_search_episode(acc_data, index, hz, max_search_min, std_threshold, verbose=False): """ When we have an episode, this was created on a minute resolution, here we do a forward search to find the edges of the episode with a second resolution """ # calculate max slice index max_slice_index = acc_data.shape[0] for ii in range(hz * 60 * max_search_min): # create new slices new_start_slice = index new_stop_slice = index + hz if verbose: logging.info('i: %s, new_start_slice: %s, new_stop_slice: %s', ii, new_start_slice, new_stop_slice) # check if the new stop slice exceeds the max_slice_index if new_stop_slice > max_slice_index: if verbose: logging.info('Max slice index reached: %s', max_slice_index) break # slice out new activity data slice_data = acc_data[new_start_slice:new_stop_slice] # calculate the standard deviation of each column (YXZ) std = np.std(slice_data, axis=0) # check if all of the standard deviations are below the standard deviation threshold if np.all(std <= std_threshold): # update index index = new_stop_slice else: break if verbose: logging.info('New index: %s, number of loops: %s', index, ii) return index def _backward_search_episode(acc_data, index, hz, max_search_min, std_threshold, verbose=False): """ When we have an episode, this was created on a minute resolution, here we do a backward search to find the edges of the episode with a second resolution """ # calculate min slice index min_slice_index = 0 for ii in range(hz * 60 * max_search_min): # create new slices new_start_slice = index - hz new_stop_slice = index if verbose: logging.info('i: %s, new_start_slice: %s, new_stop_slice: %s', ii, new_start_slice, new_stop_slice) # check if the new start slice exceeds the max_slice_index if new_start_slice < min_slice_index: if verbose: logging.debug('Minimum slice index reached: %s', min_slice_index) break # slice out new activity data slice_data = acc_data[new_start_slice:new_stop_slice] # calculate the standard deviation of each column (YXZ) std = np.std(slice_data, axis=0) # check if all of the standard deviations are belowii the standard deviation threshold if np.all(std <= std_threshold): # update index index = new_start_slice else: break if verbose: logging.info('New index: %s, number of loops: %s', index, ii) return index