from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .ts_comparison import SeriesComparison, SeriesComparisonRelative
from .ts_utils import (
corrections_as_float,
corrections_as_nan,
mask_corrections_modified_value,
mask_corrections_no_comparison_value,
unique_nans_in_series,
)
[docs]
class Detector:
"""Detector object for applying error detection algorithms to time series.
The Detector is used to apply error detection algorithms to a time series
and optionally contains a 'truth' series, to which the error detection
result can be compared. An example of a 'truth' series is a manually
validated time series. Custom error detection algorithms can be defined
using the RuleSet object.
Parameters
----------
series : pd.Series or pd.DataFrame
time series to check
truth : pd.Series or pd.DataFrame, optional
series that represents the 'truth', i.e. a benchmark to which
the error detection result can be compared, by default None
Examples
--------
Given a time series 'series' and some ruleset 'rset':
>>> d = Detector(series)
>>> d.apply_ruleset(rset)
>>> d.plot_overview()
See Also
--------
traval.RuleSet : object for defining detection algorithms
"""
def __init__(self, series, truth=None):
"""Initialize Detector object.
Parameters
----------
series : pd.Series or pd.DataFrame
time series to check
truth : pd.Series or pd.DataFrame, optional
series that represents the 'truth', i.e. a benchmark to which
the error detection result can be compared, by default None
"""
# validate input series
name = self._validate_input_series(series)
if name is None:
self.name = ""
else:
self.name = name
self.series = series
if truth is not None:
self.set_truth(truth)
else:
self.truth = None
def __repr__(self):
"""String representation of Detector object."""
return f"Detector: <{self.name}>"
[docs]
def reset(self):
"""Reset Detector object."""
for attr in ["ruleset", "results", "corrections", "comparisons"]:
if hasattr(self, attr):
delattr(self, attr)
[docs]
def apply_ruleset(self, ruleset, compare=True):
"""Apply RuleSet to series.
Parameters
----------
ruleset : traval.RuleSet
RuleSet object containing detection rules
compare : bool or list of int, optional
if True, compare all results to original series and store in
dictionary under comparisons attribute, default is True. If False,
do not store comparisons. If list of int, store only those step
numbers as comparisons. Note: value of -1 refers to last step
for convenience.
See Also
--------
traval.RuleSet : object for defining detection algorithms
"""
self.ruleset = ruleset
d, c = self.ruleset(self.series)
# store corrections, results
self.corrections = c
self.results = d
# if compare is not False do comparison
if compare:
self.comparisons = {}
base = d[0].copy()
# if compare is not list, get all step numbers
if not isinstance(compare, list):
compare = d.keys()
# do comparison
for k in compare:
# if k is negative, convert to step number from end
if k < 0:
k = len(d.keys()) + k
# only do comparison for steps, not base series
if k > 0:
s = d[k]
s.name = self.ruleset.get_step_name(k)
base.name = "base series"
if self.truth is None:
self.comparisons[k] = SeriesComparison(s, base)
else:
self.comparisons[k] = SeriesComparisonRelative(
s, self.truth, base
)
[docs]
def set_truth(self, truth):
"""Set 'truth' series.
Used for comparison with detection result.
Parameters
----------
truth : pd.Series or pd.DataFrame
Series or DataFrame containing the "truth", i.e. a benchmark
to compare the detection result to.
"""
self._validate_input_series(truth)
self.truth = truth
[docs]
def confusion_matrix(self, steps=None, truth=None):
"""Calculate confusion matrix stats for detection rules.
Note: the calculated statistics per rule contain overlapping counts,
i.e. multiple rules can mark the same observatin as suspect.
Parameters
----------
steps : int, list of int or None, optional
steps for which to calculate confusion matrix statistics, by
default None which uses all steps.
truth : pd.Series or pd.DataFrame, optional
series representing the "truth", i.e. a benchmark to which the
resulting series is compared. By default None, which uses the
stored truth series. Argument is included so a different truth
can be passed.
Returns
-------
df : pd.DataFrame
dataframe containing confusion matrix data, i.e. counts of true
positives, false positives, true negatives and false negatives.
"""
# get list of step integers
if isinstance(steps, int):
steps = [steps]
if not isinstance(steps, list):
steps = self.results.keys()
# use truth if provided, else use stored truth
if truth is None:
truth = self.truth
# get rule names
rulenames = [self.ruleset.get_step_name(i) for i in steps]
df = pd.DataFrame(index=steps, columns=["rule", "TP", "FP", "FN", "TN"])
df.loc[:, "rule"] = rulenames
base = self.results[0]
base.name = "base series"
# loop over steps
for k in steps:
# if k is negative, convert to step number from end
if k < 0:
k = len(self.results.keys()) + k
# only do comparison for steps, not base series
if k > 0:
s = self.results[k]
s.name = rulenames[k]
cp = SeriesComparisonRelative(s, truth, base)
# store stats
df.loc[k, ["TP", "FP", "FN", "TN"]] = (
cp.bc.tp,
cp.bc.fp,
cp.bc.fn,
cp.bc.tn,
)
return df
[docs]
def uniqueness(self, truth=None):
"""Calculate unique contribution per rule to stats.
Note: the calculated statistics per rule are under counted,
i.e. when multiple rules mark the same observation as suspect it is
not contained in this result.
Parameters
----------
steps : int, list of int or None, optional
steps for which to calculate confusion matrix statistics, by
default None which uses all steps.
truth : pd.Series or pd.DataFrame, optional
series representing the "truth", i.e. a benchmark to which the
resulting series is compared. By default None, which uses the
stored truth series. Argument is included so a different truth
can be passed.
Returns
-------
df : pd.DataFrame
dataframe containing confusion matrix data, i.e. unique counts
of true positives, false positives, true negatives and
false negatives.
"""
steps = list(self.results.keys())[1:]
# use truth if provided, else use stored truth
if truth is None:
truth = self.truth
base = self.results[0]
base.name = "base series"
# last step, skip in comparison as this presumably contains all NaNs
last_step = max(steps)
steps.remove(last_step)
# get rule names
rulenames = [self.ruleset.get_step_name(i) for i in steps]
df = pd.DataFrame(index=steps, columns=["rule", "TP", "FP", "FN", "TN"])
df.loc[:, "rule"] = rulenames
for j, k in enumerate(steps):
series_list = deepcopy(self.results)
s = series_list.pop(k)
series_list.pop(last_step)
other_series = list(series_list.values())
mask = unique_nans_in_series(s, *other_series)
s.loc[~mask & s.isna()] = -9999.0 # some random non-NaN number
s.name = rulenames[j]
cp = SeriesComparisonRelative(s, truth, base)
# store stats
df.loc[k, ["TP", "FP", "FN", "TN"]] = (
cp.bc.tp,
cp.bc.fp,
cp.bc.fn,
cp.bc.tn,
)
return df
[docs]
def get_series(self, step, category=None):
base = self.results[0]
base.name = "base series"
series = [base, self.results[step]]
if self.truth is not None:
truth = self.truth
series.append(truth)
df = pd.concat(series, axis=1)
if category is not None:
idx = self.get_indices(category=category, step=step)
df = df.loc[idx]
return df
[docs]
def get_indices(self, category, step, truth=None):
s = self.results[step]
base = self.results[0]
base.name = "base series"
if truth is None:
truth = self.truth
cp = SeriesComparisonRelative(s, truth, base)
if category.lower() in ["tp", "true_positives"]:
idx = cp.idx_r_flagged_in_both
elif category.lower() in ["fp", "false_positives"]:
idx = cp.idx_r_flagged_in_s1
elif category.lower() in ["fn", "false_negatives"]:
idx = cp.idx_r_flagged_in_s2
elif category.lower() in ["tn", "true_negatives"]:
idx = cp.idx_r_kept_in_both
else:
raise ValueError(
f"Category '{category}' not recognized, must "
"be one of ('tp', 'fp', 'fn', 'tn')"
)
return idx
[docs]
def get_results_dataframe(self):
"""Get results as DataFrame.
Returns
-------
df : pandas.DataFrame
results with flagged values set to NaN per applied rule.
"""
df = pd.concat(self.results.values(), axis=1)
df.columns = ["base series"] + list(self.ruleset.rules.keys())
return df
[docs]
def get_final_result(self):
"""Get final time series with flagged values set to NaN.
Returns
-------
series : pandas.Series
time series produced by final step in RuleSet with flagged
values set to NaN.
"""
key = len(self.results.keys()) - 1
s = self.results[key]
s.name = self.name
return s
[docs]
def get_corrections_dataframe(self, as_correction_codes=False, as_addable_df=False):
"""Get DataFrame containing corrections.
Parameters
----------
as_correction_codes : bool, optional
return DataFrame with correction codes, by default False
as_addable_df : bool, optional
return DataFrame with corrections dataframe that you can add to the original
time series to obtain the final result. Corrections are NaN when errors are
detected, and nonzero where observations are shifted, and zero everywhere
else.
Returns
-------
df : pandas.DataFrame
DataFrame containing corrections.
"""
if as_correction_codes and as_addable_df:
raise ValueError(
"Only one of 'as_correction_codes' and 'as_addable_df' can be True!"
)
clist = []
for s in self.corrections.values():
if isinstance(s, np.ndarray):
if as_addable_df:
s = pd.Series()
else:
s = pd.Series(name="correction_code")
elif isinstance(s, pd.DataFrame) and "correction_code" in s.columns:
if as_addable_df:
s = corrections_as_nan(s) + corrections_as_float(s)
else:
s = s["correction_code"]
elif isinstance(s, pd.Series):
if as_correction_codes:
s = mask_corrections_no_comparison_value(s, s.isna()).add(
mask_corrections_modified_value(s, s, (s.notnull() & s != 0.0)),
fill_value=0,
)
s = s["correction_code"]
clist.append(s)
# corrections, 0 means nothing is changed, nan means value is missing
df = pd.concat(clist, axis=1)
if as_correction_codes:
df = df.infer_objects(copy=False).fillna(0).astype(int)
df.columns = list(self.ruleset.rules.keys())
return df
[docs]
def get_corrections_comparison(self, truth=None):
if truth is None and self.truth is not None:
truth = self.truth
else:
raise ValueError("Supply a time series for 'truth'!")
comments_traval = self.get_comment_series()
comments_traval.name = "traval_comment"
mask_truth_corrections = truth.iloc[:, 0].isna()
comments_truth = truth.loc[mask_truth_corrections]
k = list(self.comparisons.keys())[-1]
comparison = self.comparisons[k].comparison_series()
translate = {
-1: "Value modified",
0: "Flagged in both",
1: "Only flagged in 'truth' series",
2: "Only flagged in 'traval' series",
-9999: "NaN in both",
}
comparison = comparison.apply(lambda v: translate[v])
comparison.name = "comparison_label"
raw_index = comments_traval.index.union(comments_truth.index)
truth.columns = ["truth_series", "truth_comment"]
traval_series = self.get_final_result()
traval_series.name = "traval_series"
df = pd.concat(
[
self.series.loc[raw_index.intersection(self.series.index)],
traval_series.loc[raw_index.intersection(traval_series.index)],
comments_traval,
truth.loc[raw_index.intersection(truth.index)],
comparison.loc[raw_index.intersection(comparison.index)],
],
axis=1,
)
return df
[docs]
def plot_overview(self, mark_suspects=True, **kwargs):
"""Plot time series with flagged values per applied rule.
Parameters
----------
mark_suspects : bool, optional
mark suspect values with red X, by default True
Returns
-------
ax : list of matplotlib.pyplot.Axes
axes objects
"""
if "figsize" in kwargs:
figsize = kwargs.pop("figsize")
else:
figsize = (12, 5)
fig, axes = plt.subplots(
len(self.corrections) + 1,
1,
sharex=True,
sharey=True,
figsize=figsize,
**kwargs,
)
for icol, iax in enumerate(axes):
iresult = self.results[icol]
iax.plot(iresult.index, iresult, label=self.ruleset.get_step_name(icol))
if mark_suspects:
if icol != 0:
icorr = self.corrections[icol]
if isinstance(icorr, pd.DataFrame):
iax.plot(
icorr.index,
self.results[0].loc[icorr.index],
marker="x",
c="C3",
ls="none",
label="flagged",
)
iax.legend(loc="upper left", ncol=2)
iax.grid(True)
fig.tight_layout()
return axes