import os
from operator import or_
import numpy as np
import pandas as pd
from .ts_utils import (
diff_with_gap_awareness,
interpolate_series_to_new_index,
mask_corrections_as_nan,
resample_short_series_to_long_series,
smooth_lower_bound,
smooth_upper_bound,
spike_finder,
)
[docs]def rule_funcdict_to_nan(series, funcdict):
"""Detection rule, flag values with dictionary of functions.
Use dictionary of functions to identify suspect values and set
them to NaN.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
funcdict : dict
dictionary with function names as keys and functions/methods as
values. Each function is applied to each value in the timeseries
using `series.apply(func)`. Suspect values are those where
the function evaluates to True.
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values (according to the provided functions)
are set to np.nan.
"""
for i, func in enumerate(funcdict.values()):
if i == 0:
mask = series.apply(func)
else:
mask = or_(mask, series.apply(func))
return mask_corrections_as_nan(series, mask)
[docs]def rule_max_gradient(series, max_step=0.5, max_timestep="1D"):
"""Detection rule, flag values when maximum gradient exceeded.
Set values tot NaN when maximum gradient between two
observations is exceeded.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
max_step : float, optional
max jump between two observations within given timestep,
by default 0.5
timestep : str, optional
maximum timestep to consider, by default "1D". The gradient is not
calculated for values that lie farther apart.
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
conversion = pd.Timedelta(max_timestep) / pd.Timedelta("1S")
grad = (
series.diff()
/ series.index.to_series().diff().dt.total_seconds()
* conversion
)
mask = grad.abs() > max_step
return mask_corrections_as_nan(series, mask)
[docs]def rule_ufunc_threshold(series, ufunc, threshold, offset=0.0):
"""Detection rule, flag values based on operator and threshold value.
Set values to Nan based on operator function and threshold value.
The argument ufunc is a tuple containing an operator function
(i.e. '>', '<', '>=', '<='). These are passed using their named
equivalents, e.g. in numpy: np.greater, np.less, np.greater_equal,
np.less_equal. This function essentially does the following:
ufunc(series, threshold).
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
ufunc : tuple
tuple containing ufunc (i.e. (numpy.greater_equal,) ). The function
must be callable according to `ufunc(series, threshold)`. The function
is passed as a tuple to bypass RuleSet logic.
threshold : float or pd.Series
value or timeseries to compare series with
offset : float, optional
value that is added to the threshold, e.g. if some extra tolerance is
allowable. Default value is 0.0.
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
ufunc = ufunc[0]
if isinstance(threshold, pd.Series):
full_threshold_series = resample_short_series_to_long_series(
threshold, series
)
mask = ufunc(series, full_threshold_series.add(offset))
else:
mask = ufunc(series, threshold + offset)
return mask_corrections_as_nan(series, mask)
[docs]def rule_diff_ufunc_threshold(series, ufunc, threshold, max_gap="7D"):
"""Detection rule, flag values based on diff, operator and threshold.
Calculate diff of series and identify suspect observations based on
comparison with threshold value.
The argument ufunc is a tuple containing a function, e.g. an operator
function (i.e. '>', '<', '>=', '<='). These are passed using their named
equivalents, e.g. in numpy: np.greater, np.less, np.greater_equal,
np.less_equal. This function essentially does the following:
ufunc(series, threshold_series). The argument is passed as a tuple to
bypass RuleSet logic.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
ufunc : tuple
tuple containing ufunc (i.e. (numpy.greater_equal,) ). The function
must be callable according to `ufunc(series, threshold)`. The function
is passed as a tuple to bypass RuleSet logic.
threshold : float
value to compare diff of timeseries to
max_gap : str, optional
only considers observations within this maximum gap
between measurements to calculate diff, by default "7D".
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
ufunc = ufunc[0]
# identify gaps and set diff value after gap to nan
diff = diff_with_gap_awareness(series, max_gap=max_gap)
mask = ufunc(diff.abs(), threshold)
return mask_corrections_as_nan(series, mask)
[docs]def rule_other_ufunc_threshold(series, other, ufunc, threshold):
"""Detection rule, flag values based on other series and threshold.
Set values to Nan based on comparison of another timeseries with a
threshold value.
The argument ufunc is a tuple containing an operator function (i.e. '>',
'<', '>=', '<='). These are passed using their named equivalents, e.g. in
numpy: np.greater, np.less, np.greater_equal, np.less_equal. This function
essentially does the following: ufunc(series, threshold_series). The
argument is passed as a tuple to bypass RuleSet logic.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified, only used
to test if index of other overlaps
other : pd.Series
other timeseries based on which suspect values are identified
ufunc : tuple
tuple containing ufunc (i.e. (numpy.greater_equal,) ). The function
must be callable according to `ufunc(series, threshold)`. The function
is passed as a tuple to bypass RuleSet logic.
threshold : float
value to compare timeseries to
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
ufunc = ufunc[0]
mask = ufunc(other, threshold)
shared_idx = series.index.intersection(other.loc[mask].index)
return mask_corrections_as_nan(series, shared_idx)
[docs]def rule_spike_detection(series, threshold=0.15, spike_tol=0.15, max_gap="7D"):
"""Detection rule, identify spikes in timeseries and set to NaN.
Spikes are sudden jumps in the value of a timeseries that last 1 timestep.
They can be both negative or positive.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
threshold : float, optional
the minimum size of the jump to qualify as a spike, by default 0.15
spike_tol : float, optional
offset between value of timeseries before spike and after spike,
by default 0.15. After a spike, the value of the timeseries is usually
close to but not identical to the value that preceded the spike. Use
this parameter to control how close the value has to be.
max_gap : str, optional
only considers observations within this maximum gap
between measurements to calculate diff, by default "7D".
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
upspikes, downspikes = spike_finder(
series, threshold=threshold, spike_tol=spike_tol, max_gap=max_gap
)
mask = upspikes.index.union(downspikes.index)
return mask_corrections_as_nan(series, mask)
[docs]def rule_offset_detection(
series, threshold=0.15, updown_diff=0.1, max_gap="7D", return_df=False
):
"""Detection rule, detect periods with an offset error.
This rule looks for jumps in both positive and negative direction that
are larger than a particular threshold. It then tries to match jumps
in upward direction to one in downward direction of a similar size. If
this is possible, all observations between two matching but oppposite
jumps are set to NaN.
Parameters
----------
series : pd.Series
timeseries in which to look for offset errors
threshold : float, optional
minimum jump to consider as offset error, by default 0.35
updown_diff : float, optional
the maximum difference between two opposite jumps to consider them
matching, by default 0.1
max_gap : str, optional
only considers observations within this maximum gap
between measurements to calculate diff, by default "7D".
return_df : bool, optional
return the dataframe containing the potential offsets,
by default False
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
# identify gaps and set diff value after gap to nan
diff = diff_with_gap_awareness(series, max_gap=max_gap)
diff_up = diff.copy()
diff_up.loc[diff < 0.0] = np.nan
diff_down = diff.copy()
diff_down.loc[diff > 0.0] = np.nan
mask = diff_up >= threshold
sd_up = diff_up.loc[mask]
mask = diff_down <= -threshold
sd_down = diff_down.loc[mask]
jump_df = pd.concat([sd_up, sd_down], sort=True, axis=0)
jump_df.sort_index(inplace=True)
if (len(jump_df.index) % 2) != 0:
print("Warning! Uneven no. of down and up jumps found.")
periods = []
df = pd.DataFrame(columns=["start", "end", "dh1", "dh2", "diff"])
j = 0
for i in jump_df.index:
if i not in periods:
dh = jump_df.loc[i]
idiff = jump_df.loc[jump_df.index.difference([i])] + dh
index_best_match = idiff.abs().idxmin()
if idiff.loc[index_best_match] <= updown_diff:
periods += [i, index_best_match]
df.loc[j] = [
i,
index_best_match,
dh,
jump_df.loc[index_best_match],
idiff.loc[index_best_match],
]
j += 1
corrections = pd.Series(
index=series.index, data=np.zeros(series.index.size), fastpath=True
)
for j in range(0, len(periods), 2):
corrections.loc[
periods[j] : periods[j + 1] - pd.Timedelta(seconds=30)
] = np.nan
if return_df:
return corrections, df
else:
return corrections
[docs]def rule_outside_n_sigma(series, n=2.0):
"""Detection rule, set values outside of n * standard deviation to NaN
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
n : float, optional
number of standard deviations to use, by default 2
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
mask = (series > series.mean() + n * series.std()) | (
series < series.mean() - n * series.std()
)
return mask_corrections_as_nan(series, mask)
[docs]def rule_diff_outside_of_n_sigma(series, n, max_gap="7D"):
"""Detection rule, calculate diff of series and identify suspect.
observations based on values outside of n * standard deviation of the
difference.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
n : float, optional
number of standard deviations to use, by default 2
max_gap : str, optional
only considers observations within this maximum gap
between measurements to calculate diff, by default "7D".
Returns
-------
corrections: pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
# identify gaps and set diff value after gap to nan
diff = diff_with_gap_awareness(series, max_gap=max_gap)
nsigma = n * diff.std()
mask = (diff.abs() - diff.mean()) > nsigma
return mask_corrections_as_nan(series, mask)
[docs]def rule_outside_bandwidth(series, lowerbound, upperbound):
"""Detection rule, set suspect values to NaN if outside bandwidth.
Parameters
----------
series : pd.Series
timeseries in which suspect values are identified
lowerbound : pd.Series
timeseries containing the lower bound, if bound values are less
frequent than series, bound is interpolated to series.index
upperbound : pd.Series
timeseries containing the upper bound, if bound values are less
frequent than series, bound is interpolated to series.index
Returns
-------
corrections : pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
if series.index.symmetric_difference(lowerbound.index).size > 0:
lowerbound = interpolate_series_to_new_index(lowerbound, series.index)
if series.index.symmetric_difference(upperbound.index).size > 0:
upperbound = interpolate_series_to_new_index(upperbound, series.index)
mask = (series > upperbound) | (series < lowerbound)
return mask_corrections_as_nan(series, mask)
[docs]def rule_pastas_outside_pi(
series,
ml,
ci=0.95,
min_ci=None,
smoothfreq=None,
tmin=None,
tmax=None,
savedir=None,
verbose=False,
):
"""Detection rule, flag values based on pastas model prediction interval.
Flag suspect outside prediction interval calculated by pastas timeseries
model. Uses a pastas.Model and a confidence interval as input.
Parameters
----------
series : pd.Series
timeseries to identify suspect observations in
ml : pastas.Model
timeseries model for series
ci : float, optional
confidence interval for calculating bandwidth, by default 0.95.
Higher confidence interval means that bandwidth is wider and more
observations will fall within the bounds.
min_ci : float, optional
value indicating minimum distance between upper and lower
bounds, if ci does not meet this requirement, this value is added
to the bounds. This can be used to prevent extremely narrow prediction
intervals. Default is None.
smoothfreq : str, optional
str indicating no. of periods and frequency str (i.e. "1D") for
smoothing upper and lower bounds only used if smoothbounds=True,
default is None.
tmin : str or pd.Timestamp, optional
set tmin for model simulation
tmax : str or pd.Timestamp, optional
set tmax for model simulation
savedir : str, optional
save calculated prediction interval to folder as pickle file.
Returns
-------
corrections : pd.Series
a series with same index as the input timeseries containing
corrections. Suspect values are set to np.nan.
"""
# no model
if ml is None:
if verbose:
print("Warning: No Pastas model found!")
corrections = mask_corrections_as_nan(
series, pd.Series(index=series.index, data=False)
)
corrections.name = "sim"
# no fit
elif ml.fit is None:
if verbose:
print("Warning: Pastas model fit attribute is None!")
corrections = mask_corrections_as_nan(
series, pd.Series(index=series.index, data=False)
)
corrections.name = "sim"
# calculate pi
else:
if tmin is not None:
ml.settings["tmin"] = tmin
if tmax is not None:
ml.settings["tmax"] = tmax
# calculate prediction interval
pi = ml.fit.prediction_interval(alpha=(1 - ci))
# prediction interval empty
if pi.empty:
if verbose:
print(
"Warning: calculated prediction interval with "
"Pastas model is empty!"
)
corrections = mask_corrections_as_nan(
series, pd.Series(index=series.index, data=False)
)
corrections.name = "sim"
else:
lower = pi.iloc[:, 0]
upper = pi.iloc[:, 1]
# apply time-shift smoothing
if smoothfreq is not None:
upper = smooth_upper_bound(upper, smoothfreq=smoothfreq)
lower = smooth_lower_bound(lower, smoothfreq=smoothfreq)
# apply minimum ci if passed
if min_ci is not None:
# check if mean of current interval is smaller than ci
if (upper - lower).mean() < min_ci:
# adjust bounds with half of min_ci each
upper = upper + min_ci / 2.0
lower = lower - min_ci / 2.0
corrections = rule_outside_bandwidth(series, lower, upper)
corrections.name = "sim (r^2={0:.3f})".format(ml.stats.rsq())
if savedir:
pi.to_pickle(os.path.join(savedir, f"pi_{ml.name}.pkl"))
return corrections
def rule_pastas_percentile_pi(
series, ml, alpha=0.1, tmin=None, tmax=None, verbose=False
):
# no model
if ml is None:
if verbose:
print("Warning: No Pastas model found!")
corrections = mask_corrections_as_nan(
series, pd.Series(index=series.index, data=False)
)
corrections.name = "sim"
# no fit
elif ml.fit is None:
if verbose:
print("Warning: Pastas model fit attribute is None!")
corrections = mask_corrections_as_nan(
series, pd.Series(index=series.index, data=False)
)
corrections.name = "sim"
# calculate realizations
# TODO: work in progress
[docs]def rule_shift_to_manual_obs(
series, hseries, method="linear", max_dt="1D", reset_dates=None
):
"""Adjustment rule, for shifting timeseries onto manual observations.
Used for shifting timeseries based on sensor observations onto manual
verification measurements. By default uses linear interpolation between
two manual verification observations.
Parameters
----------
series : pd.Series
timeseries to adjust
hseries : pd.Series
timeseries containing manual observations
method : str, optional
method to use for interpolating between two manual observations,
by default "linear". Other options are those that are accepted by
series.reindex(): 'bfill', 'ffill', 'nearest'.
max_dt : str, optional
maximum amount of time between manual observation and value in
series, by default "1D"
reset_dates : list, optional
list of dates (as str or pd.Timestamp) on which to reset the
adjustments to 0.0, by default None. Useful for resetting the
adjustments when the sensor is replaced, for example.
Returns
-------
adjusted_series : pd.Series
timeseries containing adjustments to shift series onto manual
observations.
"""
# check if time between manual obs and sensor obs
# are further apart than max_dt:
nearest = hseries.index.map(
lambda t: series.index.get_loc(t, method="nearest")
)
mask = np.abs((series.index[nearest] - hseries.index).total_seconds()) <= (
pd.Timedelta(max_dt) / pd.Timedelta("1S")
)
# interpolate raw obs to manual obs times
s_obs = (
series.reindex(series.index.join(hseries.index, how="outer"))
.interpolate(method="time")
.loc[hseries.index]
)
# calculate diff
diff = s_obs - hseries
# use only diff where mask is True (= time between obs < max_dt)
diff = diff.loc[mask]
if reset_dates is not None:
for i in reset_dates:
diff.loc[i] = 0.0
# interpolate w/ method
if method == "linear":
diff_full_index = (
diff.reindex(
series.index.join(diff.index, how="outer"), method=None
)
.interpolate(method="linear")
.fillna(0.0)
)
else:
diff_full_index = diff.reindex(series.index, method=method).fillna(0.0)
adjusted_series = series - diff_full_index
return adjusted_series
[docs]def rule_combine_nan_or(*args):
"""Combination rule, combine NaN values for any number of timeseries.
Used for combining intermediate results in branching algorithm trees to
create one final result, i.e. (s1.isna() OR s2.isna())
Returns
-------
corrections : pd.Series
a series with same index as the input timeseries containing
corrections. Contains NaNs where any of the input series
values is NaN.
"""
for i, series in enumerate(args):
if i == 0:
result = series.copy()
else:
result.loc[series.isna()] = np.nan
return result
[docs]def rule_combine_nan_and(*args):
"""Combination rule, combine NaN values for any number of timeseries.
Used for combining intermediate results in branching algorithm trees to
create one final result, i.e. (s1.isna() AND s2.isna())
Returns
-------
corrections : pd.Series
a series with same index as the input timeseries containing
corrections. Contains NaNs where any of the input series
values is NaN.
"""
for i, series in enumerate(args):
if i == 0:
mask = series.isna()
else:
mask = mask & series.isna()
result = args[0].copy()
result.loc[mask] = np.nan
return result
[docs]def rule_flat_signal(
series,
window,
min_obs,
std_threshold=7.5e-3,
qbelow=None,
qabove=None,
hbelow=None,
habove=None,
):
"""Detection rule, flag values based on dead signal in rolling window.
Flag values when variation in signal within a window falls below a
certain threshold value. Optionally provide quantiles below or above
which to look for dead/flat signals.
Parameters
----------
series : pd.Series
timeseries to analyse
window : int
number of days in window
min_obs : int
minimum number of observations in window to calculate
standard deviation
std_threshold : float, optional
standard deviation threshold value, by default 7.5e-3
qbelow : float, optional
quantile value between 0 and 1, signifying an upper
limit. Only search for flat signals below this limit.
By default None.
qabove : float, optional
quantile value between 0 and 1, signifying a lower
limit. Only search for flat signals above this limit.
By default None.
hbelow : float, optional
absolute value in units of timeseries signifying an upper limit.
Only search for flat signals below this limit. By default None.
habove : float, optional
absolute value in units of timeseries signifying a lower limit.
Only search for flat signals above this limit. By default None.
Returns
-------
corrections : pd.Series
a series with same index as the input timeseries containing
corrections. Contains NaNs where the signal is considered flat
or dead.
"""
s = series.dropna()
stdfilt = s.dropna().rolling(f"{int(window)}D", min_periods=min_obs).std()
stdmask = stdfilt < std_threshold
if qabove is None and qbelow is not None:
quantilemask = s < s.quantile(qbelow)
elif qabove is not None and qbelow is None:
quantilemask = s > s.quantile(qabove)
elif qabove is not None and qbelow is not None:
quantilemask = (s > s.quantile(qabove)) | (s < s.quantile(qbelow))
else:
quantilemask = pd.Series(index=s.index, data=True, dtype=bool)
if habove is None and hbelow is not None:
levelmask = s < hbelow
elif habove is not None and hbelow is None:
levelmask = s > habove
elif habove is not None and hbelow is not None:
levelmask = (s > habove) | (s < hbelow)
else:
levelmask = pd.Series(index=s.index, data=True, dtype=bool)
mask = stdmask & quantilemask & levelmask
mask = mask.reindex(series.index, fill_value=False)
return mask_corrections_as_nan(series, mask)