import numpy as np
import pandas as pd
[docs]def mask_corrections_as_nan(series, mask):
"""Get corrections series with NaNs where mask == True.
Parameters
----------
series : pd.Series
timeseries to provide corrections for
mask : DateTimeIndex or boolean np.array
DateTimeIndex containing timestamps where value should be set to NaN,
or boolean array with same length as series set to True where
value should be set to NaN. (Uses pandas .loc[mask] to set values.)
Returns
-------
c : pd.Series
return corrections series
"""
c = pd.Series(
index=series.index,
data=np.zeros(series.index.size),
fastpath=True,
dtype=float,
)
c.loc[mask] = np.nan
return c
[docs]def resample_short_series_to_long_series(short_series, long_series):
"""Resample a short timeseries to index from a longer timeseries.
First uses 'ffill' then 'bfill' to fill new series.
Parameters
----------
short_series : pd.Series
short timeseries
long_series : pd.Series
long timeseries
Returns
-------
new_series : pd.Series
series with index from long_series and data from short_series
"""
new_series = pd.Series(index=long_series.index, dtype=float)
for i, idatetime in enumerate(short_series.index):
mask = long_series.index >= idatetime
if mask.sum() == 0:
continue
first_date_after = long_series.loc[mask].index[0]
new_series.loc[first_date_after] = short_series.iloc[i]
new_series = new_series.fillna(method="ffill").fillna(method="bfill")
return new_series
[docs]def diff_with_gap_awareness(series, max_gap="7D"):
"""Get diff of timeseries with a limit on gap between two values.
Parameters
----------
series : pd.Series
timeseries to calculate diff for
max_gap : str, optional
maximum period between two observations for calculating diff, otherwise
set value to NaN, by default "7D"
Returns
-------
diff : pd.Series
timeseries with diff, with NaNs whenever two values are farther apart
than max_gap.
"""
diff = series.diff()
# identify gaps and set diff value after gap to nan
dt = series.index[1:] - series.index[:-1]
mask = np.r_[np.array([False]), dt > pd.Timedelta(max_gap)]
for idate in series.index[mask]:
diff.loc[idate] = np.nan
return diff
[docs]def spike_finder(series, threshold=0.15, spike_tol=0.15, max_gap="7D"):
"""Find spikes in timeseries.
Spikes are sudden jumps in the value of a timeseries that last 1 timestep.
They can be both negative or positive.
Parameters
----------
series : pd.Series
timeseries to find spikes in
threshold : float, optional
the minimum size of the jump to qualify as a spike, by default 0.15
spike_tol : float, optional
offset between value of timeseries before spike and after spike,
by default 0.15. After a spike, the value of the timeseries is usually
close to but not identical to the value that preceded the spike. Use
this parameter to control how close the value has to be.
max_gap : str, optional
only considers observations within this maximum gap
between measurements to calculate diff, by default "7D".
Returns
-------
upspikes, downspikes : pandas.DateTimeIndex
pandas DateTimeIndex objects containing timestamps of upward and
downward spikes.
"""
# identify gaps and set diff value after gap to nan
diff = diff_with_gap_awareness(series, max_gap=max_gap)
diff_up = diff.copy()
diff_up.loc[diff < 0.0] = np.nan
diff_down = diff.copy()
diff_down.loc[diff > 0.0] = np.nan
# Find spikes:
# find up and down spike moments and mark when change in
# head after spike is less than spike_tol
spike_up = (diff_up.iloc[1:-1] + diff_down.values[2:]).abs()
spike_up.loc[spike_up > spike_tol] = np.nan
spike_down = (diff_down.iloc[1:-1] + diff_up.values[2:]).abs()
spike_down.loc[spike_down > spike_tol] = np.nan
# Mask spikes to only include large ones
# use spike moments from above and check whether
# jump in head is larger than threshold.
upspikes = (
diff.loc[spike_up.dropna().index]
.where(lambda s: s > threshold)
.dropna()
)
downspikes = (
diff.loc[spike_down.dropna().index]
.where(lambda s: s < -threshold)
.dropna()
)
return upspikes, downspikes
[docs]def bandwidth_moving_avg_n_sigma(series, window, n):
"""Calculate bandwidth around timeseries based moving average + n * std.
Parameters
----------
series : pd.Series
series to calculate bandwidth for
window : int
number of observations to consider for moving average
n : float
number of standard deviations from moving average for bandwidth
Returns
-------
bandwidth : pd.DataFrame
dataframe with 2 columns, with lower and upper bandwidth
"""
avg = series.rolling(window).mean()
nstd = series.std() * n
bandwidth = pd.DataFrame(index=series.index)
bandwidth["lower_{}_sigma".format(n)] = avg - nstd
bandwidth["upper_{}_sigma".format(n)] = avg + nstd
return bandwidth
[docs]def interpolate_series_to_new_index(series, new_index):
"""Interpolate timeseries to new DateTimeIndex.
Parameters
----------
series : pd.Series
original series
new_index : DateTimeIndex
new index to interpolate series to
Returns
-------
si : pd.Series
new series with new index, with interpolated values
"""
# interpolate to new index
s_interp = np.interp(
new_index, series.index.asi8, series.values, left=np.nan, right=np.nan
)
si = pd.Series(index=new_index, data=s_interp, dtype=float, fastpath=True)
return si
[docs]def unique_nans_in_series(series, *args):
"""Get mask where NaNs in series are unique compared to other series.
Parameters
----------
series : pd.Series
identify unique NaNs in series
*args
any number of pandas.Series
Returns
-------
mask : pd.Series
mask with value True where NaN is unique to series
"""
mask = series.isna()
for s in args:
if not isinstance(s, pd.Series):
raise ValueError("Only supports pandas Series")
mask = mask & ~s.isna()
return mask
[docs]def create_synthetic_raw_timeseries(raw_series, truth_series, comments):
"""Create synthetic raw timeseries.
Updates 'truth_series' (where values are labelled with a comment)
with values from raw_series. Used for removing unlabeled changes between
a raw and validated timeseries.
Parameters
----------
raw_series : pd.Series
timeseries with raw data
truth_series : pd.Series
timeseries with validated data
comments : pd.Series
timeseries with comments. Index must be same as 'truth_series'.
When value does not have a comment it must be an empty string: ''.
Returns
-------
s : pd.Series
synthetic raw timeseries, same as truth_series but updated with
raw_series where value has been commented.
"""
if truth_series.index.symmetric_difference(comments.index).size > 0:
raise ValueError("'truth_series' and 'comments' must have same index!")
# get intersection of index (both need to have data)
idx_in_both = raw_series.dropna().index.intersection(truth_series.index)
# get obs with comments
mask_comments = comments.loc[idx_in_both] != ""
# create synthetic raw series
synth_raw = truth_series.loc[idx_in_both].copy()
synth_raw.loc[mask_comments] = raw_series.loc[idx_in_both].loc[
mask_comments
]
return synth_raw
def shift_series_forward_backward(s, freqstr="1D"):
n = int(freqstr[:-1]) if freqstr[:-1].isnumeric() else 1
freq = freqstr[-1] if freqstr[:-1].isalpha() else "D"
shift_forward = s.shift(periods=n, freq=freq)
shift_backward = s.shift(periods=-n, freq=freq)
return pd.concat([shift_backward, s, shift_forward], axis=1)
def smooth_upper_bound(b, smoothfreq="1D"):
smoother = shift_series_forward_backward(b, freqstr=smoothfreq)
return smoother.max(axis=1)
def smooth_lower_bound(b, smoothfreq="1D"):
smoother = shift_series_forward_backward(b, freqstr=smoothfreq)
return smoother.min(axis=1)