"""
Input & Output Module
---------------------
*paat.io* provides loading and saving functions to process the raw
acceleration signals.
"""
import os
import logging
import zipfile
from struct import unpack
import tempfile
import sys
import numpy as np
import pandas as pd
from bitstring import Bits
from pygt3x.reader import FileReader
from . import preprocessing
def _unzip_gt3x_file(file, save_location=None, delete_source_file=False, metadata_only=False):
"""
Unzip the .gt3x file
Parameters
---------
file : string
file location of the .gt3x file
save_location : string (optional)
location where the unzipped files should be saved, if not given, files are saved within the same folder as f
delete_source_file : Boolean (Optional)
if True, then the original .gt3x file will be deleted after it is unzipped
metadata_only : Boolean (optional)
if True, only the metadata info.txt file is unzipped. This saves a lot of time if only the metadata should loaded
Returns
-------
log_bin : string
location where the log.bin file is stored
info_txt : string
location where the info.txt file is stored
"""
# if save location is not given, then save in the same folder
# as where the file resides with the folder name equal the name of the file
if save_location is None:
save_location = os.path.splitext(file)[0]
# if folder does not exist, create it
if not os.path.exists(save_location):
os.makedirs(save_location)
# create the path locations where the log.bin and info.txt files are stored
log_bin = os.path.join(save_location, 'log.bin')
info_txt = os.path.join(save_location, 'info.txt')
if metadata_only:
with zipfile.ZipFile(file, 'r') as myzip:
myzip.extract("info.txt", path=save_location)
return info_txt
# check if file already exists
if not os.path.exists(log_bin) and not os.path.exists(info_txt):
try:
# unzip the file
with zipfile.ZipFile(file, 'r') as myzip:
myzip.extractall(save_location)
except Exception as msg:
logging.error('Error unpacked file: %s', msg)
return None, None
finally:
# delete source file if delete_source_file parameter is set to True
if delete_source_file:
os.remove(file)
else:
logging.debug('file already unpacked: %s', file)
# return location of the files
return log_bin, info_txt
def _extract_info(info_txt):
"""
Extract the content from the info.txt file that was unzipped from the raw .gt3x file
Example:
{'Battery_Voltage': '3,83', 'Acceleration_Scale': '256.0', 'Download_Date': '635672143370000000', 'Unexpected_Resets': '0', 'Stop_Date': '635664672000000000',
'Firmware': '1.3.0', 'Acceleration_Min': '-8.0', 'Subject_Name': '90046928', 'Last_Sample_Time': '635664672000000000', 'Sample_Rate': '100',
'Device_Type': 'wGT3XBT', 'TimeZone': '02:00:00', 'Acceleration_Max': '8.0', 'Serial_Number': 'MOS2C02150348', 'Start_Date': '635658624000000000',
'Board_Revision': '3'}
Parameters
----------
info_txt : string
location of the info.txt file on disk
Returns
-------
info_data : dictionary
dictionary of key, value pairs as extracted from the info file
"""
# create empty dictionary
info_data = {}
try:
# open the file
with open(info_txt, 'r') as file:
# loop trough each of the lines
for line in file.readlines():
# strip away the new lines
line = line.strip('\r\n')
# split on semicolon + space (the timezone value has semicolons in it and not using the space would cause issues splitting it)
key, value = line.split(': ')
# add to dictionary and replace key values with space in key with underscore
info_data[key.replace(' ', '_')] = value
except Exception as msg:
logging.error('Error extracting data from info.txt file: %s', msg)
# return dictionary
return info_data
def _extract_log(log_bin, acceleration_scale, sample_rate, use_scaling=False):
"""
Extract acceleration data from log.bin file that was unzipped from the raw .gt3x file
One second of raw activity samples packed into 12-bit values in YXZ order.
Parameters
----------
log_bin : string
location of the log.bin file on disk
acceleration_scale : float
Scale the resultant by the scale factor (this gives us an acceleration value in g's).
Device serial numbers starting with NEO and CLE use a scale factor of 341 LSB/g (±6g).
MOS devices use a 256 LSB/g scale factor (±8g). If a LOG_PARAMETER record is preset,
then the ACCEL_SCALE value should be used.
sample_rate : int
sample rate, i.e. the number of Hz (how many values we obtain per second)
This value can also be inferred from the payload size when you know how many axis data you have.
So payload size of 450 bytes = 300 (12 bit sized ints). And when we have 3 axis, we have 300/3 = 100 values per 3-axis per second
use_scaling: Boolean (optional)
acceleration data is originally stored as a signed integer. To obtain the acceleration value we need to scale it with the acceleration_scale
this will give us the decimal numbers. However, this would also increase memory size of the array in which we store the values. We might not want to
scale if we only want to store the data in a smaller memory size format. Scaling can then be done at a later stage, for instance, when we want to pre-process the data
Returns
-------
log_data : numpy array (time steps * sample_rate, num axes)
log data contains the raw acceleration values in YXZ order
log_time : numpy array (time steps, 1)
log time contains the timestamps of measurements
"""
# define the size of the payload. This is necessary because we need to define the size of the numpy array before we populate it.
SIZE = _count_payload_size(log_bin)
# raw data values are stored in ints, to obtain values in G, we need to scale them by a factor found in the acceleration_scale parameter within the info.txt file. For example, 256.0
SCALING = 1. / acceleration_scale
# counter so we can keep track of how many acceleration values we have processed
COUNTER = 0
# number of axes, the GTX3 is tri-axial, so we hard code it here.
NUM_AXES = 3
# empty dictionary where we can store the 12bit to signed integer values; this saves calculating them all the time
bit12_to_int = {}
"""
create empty numpy array where we can store the acceleration data to: dimensions (sample_rate x SIZE,number of axis)
if we want to scale the values to floats, we need a bigger datatype, it would take more memory. If no scaling is necessary, because we can scale later during preprocessing or so, we can
proceed with just an int8 datatype array.
For example, without scaling, 7 days of data equals around 180MB, with scaling the numpy array equals around 1.5GB
"""
# use int16 as datatype to store the signed integer values, this saves memory when saving the array
acc_data_type = np.int16
if use_scaling:
# use float when we want to store the acceleration data in G, meaning that we the scaling factor to recalculate the signed int into decimal values
acc_data_type = float
# create empty array for the acceleration data
log_data = np.empty((sample_rate * SIZE, NUM_AXES), dtype=acc_data_type)
# empty numpy array to store the timestamps
time_data = np.empty((SIZE, 1), dtype=np.uint32)
# open the log.bin file in binary mode
with open(log_bin, mode='rb') as file:
try:
# keep reading byte by byte
while True:
"""
Log Record Format
Offset (bytes) Size (bytes) Name Description Part of Record
0 1 Seperator An ASCII record separator byte (1Eh) marks the beginning of each log record. Header
1 1 Type A type identifier is used to interpret the payload of the record. Header
2 4 Timestamp The date and time of the data contained in the record are marked to the nearest second in Unix time format. Header
6 2 Size The size of the payload is given in bytes as an little-endian unsigned integer. Header
"""
_, payload_type, timestamp, size = unpack("<cbLH", file.read(8))
# acceleration type 0 is the activity data, we skip all other data but can easily be read with an if statement
if payload_type == 0:
"""
This is the actual data that varies based on the record *Type* field. It's size is provided in the *Size* field. Please refer to the appropriate section for the record type for the indiviual payload formats.
read the payload in bits (the reason we are converting the bytes to bits is that 1 acceleration value is encoded in 12bit, so we can cut out 12 bits at a time from the full payload represented as individual bits)
basically the YXZ (3 axis) is 12 bit + 12 bit + 12 bit = 36 bits. When we have 100hz, we have a total of 36 * 100 = 3600 bits. When you look at the size of the payload in bytes, that is for instance 450 bytes, you can
see that this is also 450 * 8 = 3600 bits
"""
# read the bytes as bits as a large string
payload_bits = Bits(bytes=file.read(size)).bin
# extract 12 bits as 1 acceleration value and add them to a list
bits_list = []
for ii in range(0, len(payload_bits), 12):
# extract the 12 bit as a string
bitstring = payload_bits[ii:ii + 12]
# convert to 12bit two's complement to signed integer value: also store values in dictionary for faster reading if not already present (the Bits function is not as fast as reading it from a dictionary)
if bitstring not in bit12_to_int:
# convert 12bit to signed integer
acc_value = Bits(bin=bitstring).int
# add to dictionary so we can read it faster next time we have the same value
bit12_to_int[bitstring] = acc_value
else:
# bitstring previously already converted to signed int, so we can obtain it from the dictionary
acc_value = bit12_to_int[bitstring]
# add to list
bits_list.append(acc_value)
# convert list to numpy array and perform scaling if it was set to True: no scaling allows for a smaller numpy array because we can use int8 and not need the float
if use_scaling:
payload_bits_array = np.array(bits_list).reshape(sample_rate, NUM_AXES) * SCALING
else:
payload_bits_array = np.array(bits_list).reshape(sample_rate, NUM_AXES)
# add payload bits array to overall numpy array
np_start = COUNTER * sample_rate
np_end = np_start + sample_rate
log_data[np_start:np_end] = payload_bits_array
# add the time component
time_data[COUNTER] = timestamp
# increase the counter
COUNTER += 1
else:
# skip whatever is not acceleration data
# there are different payload types and can easily be read by adding a different payload_type in this section
file.seek(size, 1)
"""
A 1-byte checksum immediately follows the record payload. It is a 1's complement, exclusive-or (XOR) of the log header and payload with an initial value of zero.
TODO: calculate checksum from payload and header and see if it matches the checksum that we read from the last byte
"""
_ = unpack("B", file.read(1))
# stop when all records have been read
if COUNTER == SIZE:
logging.info('Finished processing activity data')
break
except Exception as msg:
logging.error('Unpacking GTX3 exception: %s', msg)
return None, None
# return acceleration data + time data
return log_data, time_data
def _count_payload_size(log_bin, count_payload=0):
"""
Count the payload size of the log.bin file. The size of the payload is necessary to know how large
the numpy array needs to be that will be populated while reading the log.bin file byte by byte. The reasons why
is that populating an array is much faster than appending new array values to it
Parameters
----------
log_bin : string
location of the log.bin file on disk
count_payload : int (optional)
the payload type that we want to count. default is 0, which is the acceleration data.
Returns
-------
SIZE : int
the size (as in count) of the payload
"""
# define size variable
SIZE = 0
# open the log.bin file in binary mode
with open(log_bin, mode='rb') as file:
try:
# keep reading byte by byte
while True:
# extract header information
_, payload_type, _, size = unpack("<cbLH", file.read(8))
# count payload
if payload_type == count_payload:
# skip the byte content, we don't need to process it here
file.seek(size, 1)
# increment counter
SIZE += 1
else:
# skip other payload types, we don't need to read it here
file.seek(size, 1)
# skip the 1-byte checksum value
file.seek(1, 1)
except Exception:
logging.info('Counted payload size: %s', SIZE)
# return the value
return SIZE
def _create_time_array(time_data, hz=100):
"""
Create a time array by adding the miliseconds range of the sampling frequency
the standard time array only accounts for full seconds. However, when the sampling
frequency is 100 hertz for instance, we need to add 100 microseconds to the data
Parameters
----------
time_data : np.array
numpy array containing unix timestamps (obtained by reading the raw .gt3x data)
hz : int (optional)
sampling frequency of the acceleration data (this is to know how many miliseconds we need to add to the time series)
Returns
-------
time_data : np.array
numpy array with correct number of time series (so now the sampling frequency is added within the original data which only contained seconds and not miliseconds)
Raises
------
NotImplementedError
If the sampling frequency does not fit on milisecond precision
"""
# check if the sampling frequenzy can fit into equal parts within a 1000ms window
if 1000 % hz != 0:
raise NotImplementedError("Creating time array does not support sampling frequencies other than 100hz yet. See https://github.com/Trybnetic/paat/issues/10")
# calculate the step size of hz in 1s (so 100hz means 100 measurements in 1sec, so if we need to fill 1000ms then we need use a step size of 10)
step_size = 1000 / hz
# convert time_data of unix timestamps to numpy array of 64 bit seconds
time_data = np.asarray(time_data, dtype='datetime64[s]')
# convert the array to 64 bit milliseconds and add a time delta of a range of ms within a 1000ms window
time_data = np.asarray(time_data, dtype='datetime64[ms]') + np.asarray(np.arange(0, 1000, step_size), dtype='timedelta64[ms]')
# flatten the array
time_data = time_data.flatten()
return time_data
def _format_time(tstamp):
dt = np.datetime64('0001-01-01T00:00:00') + np.timedelta64(int(tstamp) // 10000000, 's')
return str(dt)
def _format_meta_data(meta):
"""
Formats the meta data dict that is extracted from the gt3x file
Parameters
----------
meta : dict
the meta data dict from the gt3x file
Returns
-------
new_meta : dict
an updated meta data dict having the fields of meta in the correct format
"""
new_meta = {}
# standard meta data fields of ActiGraph and their conversion functions
fields_and_formats = {'Serial_Number': str,
'Device_Type': str,
'Firmware': str,
'Battery_Voltage': str,
'Sample_Rate': int,
'Start_Date': _format_time,
'Stop_Date': _format_time,
'Last_Sample_Time': _format_time,
'TimeZone': str,
'Download_Date': _format_time,
'Board_Revision': int,
'Unexpected_Resets': int,
'Acceleration_Scale': float,
'Acceleration_Min': float,
'Acceleration_Max': float,
'Subject_Name': str}
for field, dtype in fields_and_formats.items():
try:
new_meta[field] = dtype(meta[field])
except KeyError as msg:
logging.info(msg)
return new_meta
def _create_time_vector(start, n_samples, hz):
"""
Creates a time vector starting at point start until start + n_samples/hz
with a sampling frequency of hz
Parameters
----------
start : int or np.datetime64
start point of the time vector at second precision level.
n_samples : int
number of samples
hz : int
sampling frequency
Returns
-------
time_data : np.array
a numpy array of np.datetime64 at nanosecond precision
Raises
------
NotImplementedError
If the sampling frequency does not fit on milisecond precision
"""
n_sec = n_samples / hz
if not n_sec.is_integer():
logging.error('Actiwave time in seconds not a whole number: %s', n_sec)
sys.exit()
# declare how many nanoseconds in one second
ms_in_sec = 1e3
# check if the sampling frequenzy can fit into equal parts within a nanosecond window
if ms_in_sec % hz != 0:
raise NotImplementedError("Creating time vector does not support sampling frequencies other than 100hz yet. See https://github.com/Trybnetic/paat/issues/10")
step_size = ms_in_sec / hz
# convert the array to 64 bit nanoseconds (necessary to fit 128hz for instance) and add a time delta of a range nanoseconds in seconds * lenght_sec, with corresponding stepsize
time_data = np.asarray(start, dtype='datetime64[ms]') + np.asarray(np.arange(0, ms_in_sec * n_sec, step_size), dtype='timedelta64[ms]')
return time_data.flatten()
[docs]
def read_gt3x(file, rescale=True, pandas=True, metadata=False, use_pygt3x=False):
"""
Reads a .gt3x file and returns the tri-axial acceleration values together
with the corresponding time stamps and all meta data.
Parameters
----------
file : string
file location of the .gt3x file
rescale : boolean (optional)
boolean indicating whether raw acceleration data should be rescaled to g values
pandas : boolean (optional)
boolean indicating whether the data should be returned as a pandas DataFrame
metadata : boolean (optional)
boolean indicating whether the full metadata should be returned
use_pygt3x : boolean (optional)
boolean indicating whether to use ActiGraph's Pygt3x library to read the file.
Returns
-------
data : DataFrame
a DataFrame containg the raw acceleration data
sample_freq : int
the sampling frequency in which the data was recorded
time : np.array (n_samples x 1)
a numpy array with time stamps for the observations in values
values : np.array (n_samples x 3)
a numpy array with the tri-axial acceleration values. If rescale is true, data
is rescaled to units of g. Note, that this function returns the values in
the default order of ActiGraph which is ['Y','X','Z']. Depending on further
use, you might want to adjust that order.
meta : dict
a dict containing all meta data produced by ActiGraph
"""
if use_pygt3x:
with FileReader(file) as reader:
values = reader.to_pandas()
time = pd.to_datetime(values.index, unit="s").values
values = values[["Y", "X", "Z"]].values
meta = read_metadata(file)
else:
with tempfile.TemporaryDirectory() as tmpdirname:
# unzip .gt3x file and get the file location of the binary log.bin (which contains the raw data) and the info.txt which contains the meta-data
log_bin, info_txt = _unzip_gt3x_file(file=file, save_location=tmpdirname)
# get meta data from info.txt file
meta = _extract_info(info_txt)
meta = _format_meta_data(meta)
# extract acceleration data from the log file
values, time_data = _extract_log(log_bin, meta['Acceleration_Scale'], meta['Sample_Rate'], use_scaling=rescale)
# create time array
time = _create_time_array(time_data, hz=meta['Sample_Rate'])
# Add additional keys to meta (Note: they are important to later reconstruct the time vector)
meta["Number_Of_Samples"] = values.shape[0]
meta["Start_Time"] = time[0].astype(int)
if pandas:
data = pd.DataFrame(values, columns=["Y", "X", "Z"], index=time)
data = data[["X", "Y", "Z"]]
if metadata:
return data, meta['Sample_Rate'], meta
else:
return data, meta['Sample_Rate']
else:
return time, values, meta