Source code for traval.ts_utils

from enum import IntEnum

import numpy as np
import pandas as pd


[docs] class CorrectionCode(IntEnum): """Codes and labels for labeling error detection results.""" NO_CORRECTION = 0 BELOW_THRESHOLD = -2 NOT_EQUAL_VALUE = -1 EQUAL_VALUE = 1 ABOVE_THRESHOLD = 2 MODIFIED_VALUE = 4 UNKNOWN_COMPARISON_VALUE = 99
[docs] def get_empty_corrections_df(series): """Method to get corrections empty dataframe. Parameters ---------- series : pd.Series time series to apply corrections to """ c = pd.DataFrame( index=series.index, data={ "correction_code": CorrectionCode.NO_CORRECTION, "series_values": np.full(series.size, np.nan), "comparison_values": np.full(series.size, np.nan), }, ) return c
def _mask_corrections(series, values, mask, correction_code): c = get_empty_corrections_df(series) c.loc[mask, "series_values"] = series if values is not None: if isinstance(values, pd.Series): c.loc[mask, "comparison_values"] = values.loc[mask] else: c.loc[mask, "comparison_values"] = values c.loc[mask, "correction_code"] = correction_code return c
[docs] def mask_corrections_above_below( series, mask_above, threshold_above, mask_below, threshold_below, ): """Get corrections where above threshold. Parameters ---------- series : pd.Series time series to apply corrections to threshold_above : pd.Series time series with values to compare with mask_above : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) threshold_below : pd.Series time series with values to compare with mask_below : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ c_above = mask_corrections_above_threshold(series, threshold_above, mask_above) c_below = mask_corrections_below_threshold(series, threshold_below, mask_below) return c_above.add(c_below, fill_value=0)
[docs] def mask_corrections_above_threshold(series, threshold, mask): """Get corrections where below threshold. Parameters ---------- series : pd.Series time series to apply corrections to threshold : pd.Series time series with values to compare with mask : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ return _mask_corrections(series, threshold, mask, CorrectionCode.ABOVE_THRESHOLD)
[docs] def mask_corrections_below_threshold(series, threshold, mask): """Get corrections where below threshold. Parameters ---------- series : pd.Series time series to apply corrections to threshold : pd.Series time series with values to compare with mask : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ return _mask_corrections(series, threshold, mask, CorrectionCode.BELOW_THRESHOLD)
[docs] def mask_corrections_equal_value(series, values, mask): """Get corrections where equal to value. Parameters ---------- series : pd.Series time series to apply corrections to values : pd.Series time series with values to compare with mask : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ return _mask_corrections(series, values, mask, CorrectionCode.EQUAL_VALUE)
[docs] def mask_corrections_modified_value(series, values, mask): """Get corrections where value was modified. Parameters ---------- series : pd.Series time series to apply corrections to values : pd.Series time series with values to compare with mask : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ return _mask_corrections(series, values, mask, CorrectionCode.MODIFIED_VALUE)
[docs] def mask_corrections_not_equal_value(series, values, mask): """Get corrections where not equal to value. Parameters ---------- series : pd.Series time series to apply corrections to values : pd.Series time series with values to compare with mask : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ return _mask_corrections(series, values, mask, CorrectionCode.NOT_EQUAL_VALUE)
[docs] def mask_corrections_no_comparison_value(series, mask): """Get corrections where equal to value. Parameters ---------- series : pd.Series time series to apply corrections to mask : DateTimeIndex or boolean np.array DateTimeIndex containing timestamps where value should be set to NaN, or boolean array with same length as series set to True where value should be set to NaN. (Uses pandas .loc[mask] to set values.) """ return _mask_corrections( series, None, mask, CorrectionCode.UNKNOWN_COMPARISON_VALUE )
[docs] def corrections_as_nan(corrections): """Convert correction code series to NaNs. Excludes codes 0 and 4, which are used to indicate no correction and a modification of the value, respectively. Parameters ---------- corrections : pd.Series or pd.DataFrame series or dataframe with correction code Returns ------- c : pd.Series return corrections series with nans where value is corrected """ if isinstance(corrections, pd.DataFrame): corrections = corrections["correction_code"] c = pd.Series(index=corrections.index, data=0.0) # set values where correction code is *not* 0 or 4 to NaN # (meaning a correction was applied) c.loc[(corrections != 0) | (corrections != 4)] = np.nan return c
[docs] def corrections_as_float(corrections): """Convert correction code series to NaNs. Excludes codes 0 and 4, which are used to indicate no correction and a modification of the value, respectively. Parameters ---------- corrections : pd.DataFrame dataframe with correction code and original + modified values Returns ------- c : pd.Series return corrections series with floats where value is modified """ c = pd.Series(index=corrections.index, data=0.0) # set values where correction code is 4 to difference between original and modified mask = corrections["correction_code"] == 4 c.loc[mask] = ( corrections.loc[mask, "comparison_values"] - corrections.loc[mask, "series_values"] ) return c
[docs] def resample_short_series_to_long_series(short_series, long_series): """Resample a short time series to index from a longer time series. First uses 'ffill' then 'bfill' to fill new series. Parameters ---------- short_series : pd.Series short time series long_series : pd.Series long time series Returns ------- new_series : pd.Series series with index from long_series and data from short_series """ new_series = pd.Series(index=long_series.index, dtype=float) for i, idatetime in enumerate(short_series.index): mask = long_series.index >= idatetime if mask.sum() == 0: continue first_date_after = long_series.loc[mask].index[0] new_series.loc[first_date_after] = short_series.iloc[i] new_series = new_series.ffill().bfill() return new_series
[docs] def diff_with_gap_awareness(series, max_gap="7D"): """Get diff of time series with a limit on gap between two values. Parameters ---------- series : pd.Series time series to calculate diff for max_gap : str, optional maximum period between two observations for calculating diff, otherwise set value to NaN, by default "7D" Returns ------- diff : pd.Series time series with diff, with NaNs whenever two values are farther apart than max_gap. """ diff = series.diff() # identify gaps and set diff value after gap to nan dt = series.index[1:] - series.index[:-1] mask = np.r_[np.array([False]), dt > pd.Timedelta(max_gap)] for idate in series.index[mask]: diff.loc[idate] = np.nan return diff
[docs] def spike_finder(series, threshold=0.15, spike_tol=0.15, max_gap="7D"): """Find spikes in time series. Spikes are sudden jumps in the value of a time series that last 1 timestep. They can be both negative or positive. Parameters ---------- series : pd.Series time series to find spikes in threshold : float, optional the minimum size of the jump to qualify as a spike, by default 0.15 spike_tol : float, optional offset between value of time series before spike and after spike, by default 0.15. After a spike, the value of the time series is usually close to but not identical to the value that preceded the spike. Use this parameter to control how close the value has to be. max_gap : str, optional only considers observations within this maximum gap between measurements to calculate diff, by default "7D". Returns ------- upspikes, downspikes : pandas.DateTimeIndex pandas DateTimeIndex objects containing timestamps of upward and downward spikes. """ # identify gaps and set diff value after gap to nan diff = diff_with_gap_awareness(series, max_gap=max_gap) diff_up = diff.copy() diff_up.loc[diff < 0.0] = np.nan diff_down = diff.copy() diff_down.loc[diff > 0.0] = np.nan # Find spikes: # find up and down spike moments and mark when change in # head after spike is less than spike_tol spike_up = (diff_up.iloc[1:-1] + diff_down.values[2:]).abs() spike_up.loc[spike_up > spike_tol] = np.nan spike_down = (diff_down.iloc[1:-1] + diff_up.values[2:]).abs() spike_down.loc[spike_down > spike_tol] = np.nan # Mask spikes to only include large ones # use spike moments from above and check whether # jump in head is larger than threshold. upspikes = diff.loc[spike_up.dropna().index].where(lambda s: s > threshold).dropna() downspikes = ( diff.loc[spike_down.dropna().index].where(lambda s: s < -threshold).dropna() ) return upspikes, downspikes
[docs] def bandwidth_moving_avg_n_sigma(series, window, n): """Calculate bandwidth around time series based moving average + n * std. Parameters ---------- series : pd.Series series to calculate bandwidth for window : int number of observations to consider for moving average n : float number of standard deviations from moving average for bandwidth Returns ------- bandwidth : pd.DataFrame dataframe with 2 columns, with lower and upper bandwidth """ avg = series.rolling(window).mean() nstd = series.std() * n bandwidth = pd.DataFrame(index=series.index) bandwidth["lower_{}_sigma".format(n)] = avg - nstd bandwidth["upper_{}_sigma".format(n)] = avg + nstd return bandwidth
[docs] def interpolate_series_to_new_index(series, new_index): """Interpolate time series to new DateTimeIndex. Parameters ---------- series : pd.Series original series new_index : DateTimeIndex new index to interpolate series to Returns ------- si : pd.Series new series with new index, with interpolated values """ # interpolate to new index s_interp = np.interp( new_index, series.index.asi8, series.values, left=np.nan, right=np.nan ) si = pd.Series(index=new_index, data=s_interp, dtype=float) return si
[docs] def unique_nans_in_series(series, *args): """Get mask where NaNs in series are unique compared to other series. Parameters ---------- series : pd.Series identify unique NaNs in series *args any number of pandas.Series Returns ------- mask : pd.Series mask with value True where NaN is unique to series """ mask = series.isna() for s in args: if not isinstance(s, pd.Series): raise ValueError("Only supports pandas Series") mask = mask & ~s.isna() return mask
[docs] def create_synthetic_raw_time_series(raw_series, truth_series, comments): """Create synthetic raw time series. Updates 'truth_series' (where values are labelled with a comment) with values from raw_series. Used for removing unlabeled changes between a raw and validated time series. Parameters ---------- raw_series : pd.Series time series with raw data truth_series : pd.Series time series with validated data comments : pd.Series time series with comments. Index must be same as 'truth_series'. When value does not have a comment it must be an empty string: ''. Returns ------- s : pd.Series synthetic raw time series, same as truth_series but updated with raw_series where value has been commented. """ if truth_series.index.symmetric_difference(comments.index).size > 0: raise ValueError("'truth_series' and 'comments' must have same index!") # get intersection of index (both need to have data) idx_in_both = raw_series.dropna().index.intersection(truth_series.index) # get obs with comments mask_comments = comments.loc[idx_in_both] != "" # create synthetic raw series synth_raw = truth_series.loc[idx_in_both].copy() synth_raw.loc[mask_comments] = raw_series.loc[idx_in_both].loc[mask_comments] return synth_raw
def shift_series_forward_backward(s, freqstr="1D"): n = int(freqstr[:-1]) if freqstr[:-1].isnumeric() else 1 freq = freqstr[-1] if freqstr[:-1].isalpha() else "D" shift_forward = s.shift(periods=n, freq=freq) shift_backward = s.shift(periods=-n, freq=freq) return pd.concat([shift_backward, s, shift_forward], axis=1) def smooth_upper_bound(b, smoothfreq="1D"): smoother = shift_series_forward_backward(b, freqstr=smoothfreq) smoother.iloc[:, 0] = smoother.iloc[:, 0].interpolate(method="linear") smoother.iloc[:, 2] = smoother.iloc[:, 2].interpolate(method="linear") return smoother.max(axis=1).loc[smoother.iloc[:, 1].dropna().index] def smooth_lower_bound(b, smoothfreq="1D"): smoother = shift_series_forward_backward(b, freqstr=smoothfreq) smoother.iloc[:, 0] = smoother.iloc[:, 0].interpolate(method="linear") smoother.iloc[:, 2] = smoother.iloc[:, 2].interpolate(method="linear") return smoother.min(axis=1).loc[smoother.iloc[:, 1].dropna().index]
[docs] def get_correction_status_name(corrections): """Get correction status name from correction codes. Parameters ---------- correction_code : pd.DataFrame or pd.Series dataframe or series containing corrections codes Returns ------- pd.DataFrame or pd.Series dataframe or series filled with correction status name """ return corrections.fillna(0).map(lambda c: CorrectionCode(c).name)