Usage
Preparing Data for Ingestion
The data has to be provided in pandas DataFrame
with a
DateTimeIndex
. The following example shows how such a dataframe
should look:
import pandas as pd
faros_df = pd.read_csv(
"./test-data/faros-plus-physilog/faros.csv.gz",
index_col=[0],
parse_dates=True
)
print(faros_df.head())
The output of faros_df.head()
shows that the index is a DateTimeIndex
.
The NaN
values due to the different sampling frequencies are ignored during synchronization.
Accel X Accel Y Accel Z ECG
1970-01-01 00:00:01.000 -88.0 771.0 -531.5 -21.0
1970-01-01 00:00:01.008 NaN NaN NaN -10.0
1970-01-01 00:00:01.010 -86.0 779.0 -539.5 NaN
1970-01-01 00:00:01.016 NaN NaN NaN -2.0
1970-01-01 00:00:01.020 -82.5 781.0 -543.0 NaN
Each signal source, i.e., each sensor, is given in a dictionary together with the name of the column containing the events that should be synchronized, e.g., the shake common to all sensor signals in the acceleration magnitude. The name of that column and its frequency can be different for each sensor.
Finally, given the source dictionary, the synchronizer instance can be created.
import jointly
sources = {
"Faros": {
"data": faros_df,
"ref_column": "Accel Mag",
},
"Physilog": {
"data": physilog_df,
"ref_column": "Accel Mag",
},
# Any number of sensors can be added
# 'Everion': {
# 'data': everion_dataframe,
# 'ref_column': 'ACCELERATION_MAGNITUDE',
# }
}
jointly.Synchronizer(sources, reference_source_name="Faros")
Tuning Shake Detection
If the shake detection doesn’t find all shakes on the first try, the following parameters will help:
import pandas as pd
import jointly
extractor = jointly.ShakeExtractor()
# The start window should be long enough to contain
# only the start shake in every data stream
extractor.start_window_length = pd.Timedelta(seconds=15)
# The end window (measured from the end of data)
# should be exactly long enough to contain
# only the end shake in every data stream
extractor.end_window_length = pd.Timedelta(seconds=3)
# Set to at most the number of shakes you did
extractor.min_length = 3
# Shakes are only accepted if they are higher than the
# threshold (with all data normalized).
extractor.threshold = 0.5
Debugging
To find issues with the shake detection, it often helps to plot the data.
plot_reference_columns
is available to plot the reference columns from
a source table.
Problems during synchronization throw exceptions, such as a BadWindowException
:
jointly.synchronization_errors.BadWindowException:
Start (0 days 00:10:00) or end (0 days 00:10:00) window lengths greater than length of signal Faros (0 days 00:00:36.992000). Make it so each window only covers start or end, not both.
Thus, the following code catches the problem and prints/shows helpful information:
# if the extractor parameters are wrong, print the problem and show the data
try:
# get_synced_data returns a dictionary of sensor names to synced DataFrames
synchronizer.get_synced_data()
except Exception:
traceback.print_exc()
jointly.plot_reference_columns(sources)
Saving data
There are two approaches to saving the data. save_data()
can be used
to create an export file for each data category, while save_pickles
dumps the synchronized dataframes for each individual sensor into a .pickle
each.
To run the following examples, you should already have a Synchronizer
instance
called synchronizer
with an extractor configured such that no exceptions are thrown.
Check the readme file for an example.
save_pickles()
To save an individual DataFrame for each input source, call synchronizer.save_pickles()
synchronizer.save_pickles(sync_dir_path)
save_data()
To use save_data()
create a dictionary as follows: every
key at the root level defines the name of a corresponding file.
In each entry, select the source columns by creating a key (for
example, add Faros
to select data from the Faros
source)
that points to the columns to be extracted from that source, e.g.,
['Accel X', 'Accel Y', 'Accel Z']
.
# define output format for two files, one containing all acceleration
# data, the other the ECG data
tables = {
'ACC': {
'Faros': ['Accel X', 'Accel Y', 'Accel Z'],
'Physilog': ['Accel X', 'Accel Y', 'Accel Z'],
},
'ECG': {
'Faros': ['ECG'],
},
}
# if the extractor parameters are wrong, print the problem and show the data
try:
# get_synced_data returns a dictionary of sensor names to synced DataFrames
with tempfile.TemporaryDirectory() as tmp_dir:
synchronizer.save_data(tmp_dir, tables=tables, save_total_table=False)
print("test")
except Exception:
traceback.print_exc()
jointly.plot_reference_columns(sources)
In the resulting CSV file, each combination gets a column like this:
Faros_Accel X
, or Physilog_Accel Z
, etc:
Faros_Accel X Faros_Accel Y Faros_Accel Z Physilog_Accel X Physilog_Accel Y Physilog_Accel Z
1970-01-01 00:00:01.000000000 -88 771 -531.5
1970-01-01 00:00:01.010000000 -86 779 -539.5
1970-01-01 00:00:01.020000000 -82.5 781 -543
1970-01-01 00:00:01.020907696 -0.80457 0.02234 0.61023
1970-01-01 00:00:01.030000000 -98 787 -521.5
1970-01-01 00:00:01.040000000 -80.5 777 -557
1970-01-01 00:00:01.050000000 -94 761.5 -539.5
1970-01-01 00:00:01.052150462 -0.81104 0.01721 0.59253
Logging
To activate logging simply add the following lines to your code:
import logging
from jointly.log import logger
logger.setLevel(logging.DEBUG)
This will give you insight into the shake detection, calculation of the timeshifts and stretching factor, and output plots of the segements.