Source code for jointly.shake_extractor

from typing import List, Tuple

import numpy as np
import pandas as pd
import scipy.signal
import scipy.interpolate
import pprint

from . import SyncPairs
from .abstract_extractor import AbstractExtractor
from .log import logger
from .synchronization_errors import (

pp = pprint.PrettyPrinter()

def _get_shake_weight(x: List[pd.DatetimeIndex]):
    """Returns a shake weight describing the importance of a shake sequence"""
    return np.median(x) + np.mean(x)

[docs]class ShakeExtractor(AbstractExtractor): def __init__(self): super().__init__() self.start_window_length = pd.Timedelta(seconds=600) self.end_window_length = pd.Timedelta(seconds=600) self.threshold = 0.6 @property def start_window_length(self) -> pd.Timedelta: """time window as pandas.Timedelta in which to look for peaks from start of signal""" return self._start_window_length @start_window_length.setter def start_window_length(self, value: pd.Timedelta): if isinstance(value, pd.Timedelta): self._start_window_length = value else: raise ValueError( "window lengths are given as e.g. pd.Timedelta(seconds=600)" ) @property def end_window_length(self) -> pd.Timedelta: """time window as pandas.Timedelta in which to look for peaks at end of signal""" return self._end_window_length @end_window_length.setter def end_window_length(self, value: pd.Timedelta): if isinstance(value, pd.Timedelta): self._end_window_length = value else: raise ValueError( "window lengths are given as e.g. pd.Timedelta(seconds=600)" ) @property def threshold(self) -> float: """min height for peak detection. In range [0, 1], as the data is normalized""" return self._threshold @threshold.setter def threshold(self, value: float): if 0 < value < 1: self._threshold = value else: raise ValueError(f"threshold must be given in (0, 1), but you gave {value}") distance = 1500 """distance in milliseconds in which the next peak must occur to be considered a sequence""" min_length = 6 """minimum number of peaks per sequence""" time_buffer = pd.Timedelta(seconds=1) """time in seconds will be padded to first and last peak for timestamps of segment""" def _merge_peak_sequences( self, peaks: List[pd.DatetimeIndex], signals: pd.DataFrame ) -> List[List[pd.DatetimeIndex]]: """ Merge the given peaks into peak sequences with inter-peak distances of less than ``self.distance``. :param peaks: list of peak indices :param signals: reference signals dataframe :return: list of lists, each inner list denotes a number of peaks by index """ sequences = [] for pos, index in enumerate(peaks): row = signals.iloc[[index]] if pos == 0: # start initial sequence sequences.append([row.index]) continue row_prev = signals.iloc[[peaks[pos - 1]]] time = pd.to_datetime(row.index) time_prev = pd.to_datetime(row_prev.index) if time_prev + pd.Timedelta(milliseconds=self.distance) < time: # add peak to sequence, since this peak lies within distance of previous one sequences.append([row.index]) else: # start new sequence sequences[len(sequences) - 1].append(row.index) return sequences def _get_peak_sequences( self, signals: pd.DataFrame, column: str, start_window: pd.Timestamp, end_window: pd.Timestamp, ) -> List[List[pd.DatetimeIndex]]: """ Returns index list of peak sequences from a normalized signal. Peaks that have no adjacent peaks within ``distance`` ms are ignored. Sequences with less than ``min_length`` peaks are ignored. """ logger.debug(f"Using peak threshold {self.threshold}") # find peaks in start window start_part = signals[column].truncate(after=start_window) peaks_start, _ = scipy.signal.find_peaks(start_part, height=self.threshold) # find peaks in end window end_part = signals[column].truncate(before=end_window) peaks_end, _ = scipy.signal.find_peaks(end_part, height=self.threshold) peaks_end += signals.index.get_loc(end_part.index[0]) peaks = [*peaks_start, *peaks_end] logger.debug("Found {} peaks for {}".format(len(peaks), column)) # merge peaks into peak sequences sequences = self._merge_peak_sequences(peaks, signals) logger.debug( f"Merged peaks within {self.distance} ms to " f"{len(sequences)} sequences for {column}" ) # filter sequences with less than min_length peaks sequences_filtered = [seq for seq in sequences if len(seq) >= self.min_length] logger.debug( f"{len(sequences_filtered)} sequences satisfy" f" minimum length of {self.min_length} for {column}" ) return sequences_filtered def _choose_sequence( self, signal: pd.Series, shake_list: List[List[pd.DatetimeIndex]] ) -> Tuple[pd.Timestamp, pd.Timestamp]: """ Choose the sequence with the highest shake weight :param signal: signal from which the shake is :param shake_list: list of peak sequence value lists :return: start and end index values """ best_shake = max(shake_list, key=_get_shake_weight) segment_start_time = best_shake[0].index[0] - self.time_buffer start_index = signal.index.get_loc(segment_start_time, method="nearest") segment_end_time = best_shake[-1].index[0] + self.time_buffer end_index = signal.index.get_loc(segment_end_time, method="nearest") return signal.index[start_index], signal.index[end_index] @staticmethod def _check_shakes_not_empty(shakes: List[List[int]], label: str): """Raise an exception if the given list of shakes is empty""" if len(shakes) <= 0: raise ShakeMissingException( f"No {label} shakes detected - " "check window lengths, " "detection threshold, " "minimum sequence length" )
[docs] def get_segments(self, signals: pd.DataFrame) -> SyncPairs: """ Returns dictionary with start and end for each sensor source, i.e., a ``SyncPairs`` instance :param signals: DataFrame containing the reference signals for each source :return: SyncPairs instance """ columns = list(signals.columns) self._init_segments(columns) # will be added to start and subtracted from end of sequence for column in columns: last_timestamp = signals[column].last_valid_index() first_timestamp = signals[column].first_valid_index() duration = last_timestamp - first_timestamp if duration < self.start_window_length or duration < self.end_window_length: raise BadWindowException( f"Start ({self.start_window_length}) or end ({self.end_window_length}) " f"window lengths greater than length of signal {column} ({duration}). " f"Make it so each window only covers start or end, not both." ) start_window = first_timestamp + self.start_window_length end_window = last_timestamp - self.end_window_length peak_sequences = self._get_peak_sequences( signals, column, start_window, end_window ) start_shakes, end_shakes, other_shakes = [], [], [] for peak_sequence in peak_sequences: sequence_values = [signals[column][index] for index in peak_sequence] if sequence_values[0].index[0] < start_window: start_shakes.append(sequence_values) elif sequence_values[-1].index[0] > end_window: end_shakes.append(sequence_values) else: other_shakes.append(sequence_values) # select sequences in start/end window logger.debug( f"{len(start_shakes)} shakes in start window ({start_window}), " f"{len(end_shakes)} shakes in end window ({end_window}), " f"{len(other_shakes)} shakes in between, for {column}." ) ShakeExtractor._check_shakes_not_empty(start_shakes, "start") ShakeExtractor._check_shakes_not_empty(end_shakes, "end") # choose sequence with highest weight start, end = self._choose_sequence(signals[column], start_shakes) self._set_first_segment(column, start, end) start, end = self._choose_sequence(signals[column], end_shakes) self._set_second_segment(column, start, end) f"Shake segments for {column}:\n{pp.pformat(self.segments[column])}" ) return self.segments