from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from .ts_comparison import SeriesComparison, SeriesComparisonRelative
from .ts_utils import unique_nans_in_series
[docs]class Detector:
"""Detector object for applying error detection algorithms to timeseries.
The Detector is used to apply error detection algorithms to a timeseries
and optionally contains a 'truth' series, to which the error detection
result can be compared. An example of a 'truth' series is a manually
validated timeseries. Custom error detection algorithms can be defined
using the RuleSet object.
Parameters
----------
series : pd.Series or pd.DataFrame
timeseries to check
truth : pd.Series or pd.DataFrame, optional
series that represents the 'truth', i.e. a benchmark to which
the error detection result can be compared, by default None
Examples
--------
Given a timeseries 'series' and some ruleset 'rset':
>>> d = Detector(series)
>>> d.apply_ruleset(rset)
>>> d.plot_overview()
See also
--------
traval.RuleSet : object for defining detection algorithms
"""
def __init__(self, series, truth=None):
"""Initialize Detector object.
Parameters
----------
series : pd.Series or pd.DataFrame
timeseries to check
truth : pd.Series or pd.DataFrame, optional
series that represents the 'truth', i.e. a benchmark to which
the error detection result can be compared, by default None
"""
# validate input series
name = self._validate_input_series(series)
if name is None:
self.name = ""
else:
self.name = name
self.series = series
if truth is not None:
self.set_truth(truth)
else:
self.truth = None
def __repr__(self):
"""String representation of Detector object."""
return f"Detector: <{self.name}>"
[docs] def reset(self):
"""Reset Detector object."""
for attr in ["ruleset", "results",
"corrections", "comparisons"]:
if hasattr(self, attr):
delattr(self, attr)
[docs] def apply_ruleset(self, ruleset, compare=True):
"""Apply RuleSet to series.
Parameters
----------
ruleset : traval.RuleSet
RuleSet object containing detection rules
compare : bool or list of int, optional
if True, compare all results to original series and store in
dictionary under comparisons attribute, default is True. If False,
do not store comparisons. If list of int, store only those step
numbers as comparisons. Note: value of -1 refers to last step
for convenience.
See also
--------
traval.RuleSet : object for defining detection algorithms
"""
self.ruleset = ruleset
d, c = self.ruleset(self.series)
# store corrections, results
self.corrections = c
self.results = d
# if compare is not False do comparison
if compare:
self.comparisons = {}
base = d[0].copy()
# if compare is not list, get all step numbers
if not isinstance(compare, list):
compare = d.keys()
# do comparison
for k in compare:
# if k is negative, convert to step number from end
if k < 0:
k = len(d.keys()) + k
# only do comparison for steps, not base series
if k > 0:
s = d[k]
s.name = self.ruleset.get_step_name(k)
base.name = "base series"
if self.truth is None:
self.comparisons[k] = SeriesComparison(s, base)
else:
self.comparisons[k] = SeriesComparisonRelative(
s, self.truth, base)
[docs] def set_truth(self, truth):
"""Set 'truth' series.
Used for comparison with detection result.
Parameters
----------
truth : pd.Series or pd.DataFrame
Series or DataFrame containing the "truth", i.e. a benchmark
to compare the detection result to.
"""
self._validate_input_series(truth)
self.truth = truth
[docs] def confusion_matrix(self, steps=None, truth=None):
"""Calculate confusion matrix stats for detection rules.
Note: the calculated statistics per rule contain overlapping counts,
i.e. multiple rules can mark the same observatin as suspect.
Parameters
----------
steps : int, list of int or None, optional
steps for which to calculate confusion matrix statistics, by
default None which uses all steps.
truth : pd.Series or pd.DataFrame, optional
series representing the "truth", i.e. a benchmark to which the
resulting series is compared. By default None, which uses the
stored truth series. Argument is included so a different truth
can be passed.
Returns
-------
df : pd.DataFrame
dataframe containing confusion matrix data, i.e. counts of true
positives, false positives, true negatives and false negatives.
"""
# get list of step integers
if isinstance(steps, int):
steps = [steps]
if not isinstance(steps, list):
steps = self.results.keys()
# use truth if provided, else use stored truth
if truth is None:
truth = self.truth
# get rule names
rulenames = [self.ruleset.get_step_name(i) for i in steps]
df = pd.DataFrame(index=steps,
columns=["rule", "TP", "FP", "FN", "TN"])
df.loc[:, "rule"] = rulenames
base = self.results[0]
base.name = "base series"
# loop over steps
for k in steps:
# if k is negative, convert to step number from end
if k < 0:
k = len(self.results.keys()) + k
# only do comparison for steps, not base series
if k > 0:
s = self.results[k]
s.name = rulenames[k]
cp = SeriesComparisonRelative(s, truth, base)
# store stats
df.loc[k, ["TP", "FP", "FN", "TN"]] = (cp.bc.tp,
cp.bc.fp,
cp.bc.fn,
cp.bc.tn)
return df
[docs] def uniqueness(self, truth=None):
"""Calculate unique contribution per rule to stats.
Note: the calculated statistics per rule contain an undercount,
i.e. when multiple rules mark the same observatin as suspect it is
not contained in this result.
Parameters
----------
steps : int, list of int or None, optional
steps for which to calculate confusion matrix statistics, by
default None which uses all steps.
truth : pd.Series or pd.DataFrame, optional
series representing the "truth", i.e. a benchmark to which the
resulting series is compared. By default None, which uses the
stored truth series. Argument is included so a different truth
can be passed.
Returns
-------
df : pd.DataFrame
dataframe containing confusion matrix data, i.e. unique counts
of true positives, false positives, true negatives and
false negatives.
"""
steps = list(self.results.keys())[1:]
# use truth if provided, else use stored truth
if truth is None:
truth = self.truth
base = self.results[0]
base.name = "base series"
# last step, skip in comparison as this presumably contains all NaNs
last_step = max(steps)
steps.remove(last_step)
# get rule names
rulenames = [self.ruleset.get_step_name(i) for i in steps]
df = pd.DataFrame(index=steps,
columns=["rule", "TP", "FP", "FN", "TN"])
df.loc[:, "rule"] = rulenames
for j, k in enumerate(steps):
series_list = deepcopy(self.results)
s = series_list.pop(k)
series_list.pop(last_step)
other_series = list(series_list.values())
mask = unique_nans_in_series(s, *other_series)
s.loc[~mask & s.isna()] = -9999. # some random non-NaN number
s.name = rulenames[j]
cp = SeriesComparisonRelative(s, truth, base)
# store stats
df.loc[k, ["TP", "FP", "FN", "TN"]] = (cp.bc.tp,
cp.bc.fp,
cp.bc.fn,
cp.bc.tn)
return df
[docs] def get_series(self, step, category=None):
base = self.results[0]
base.name = "base series"
series = [base, self.results[step]]
if self.truth is not None:
truth = self.truth
series.append(truth)
df = pd.concat(series, axis=1)
if category is not None:
idx = self.get_indices(category=category, step=step)
df = df.loc[idx]
return df
[docs] def get_indices(self, category, step, truth=None):
s = self.results[step]
base = self.results[0]
base.name = "base series"
if truth is None:
truth = self.truth
cp = SeriesComparisonRelative(s, truth, base)
if category.lower() in ["tp", "true_positives"]:
idx = cp.idx_r_flagged_in_both
elif category.lower() in ["fp", "false_positives"]:
idx = cp.idx_r_flagged_in_s1
elif category.lower() in ["fn", "false_negatives"]:
idx = cp.idx_r_flagged_in_s2
elif category.lower() in ["tn", "true_negatives"]:
idx = cp.idx_r_kept_in_both
else:
raise ValueError(f"Category '{category}' not recognized, must "
"be one of ('tp', 'fp', 'fn', 'tn')")
return idx
[docs] def get_results_dataframe(self):
"""Get results as DataFrame.
Returns
-------
df : pandas.DataFrame
results with flagged values set to NaN per applied rule.
"""
df = pd.concat(self.results.values(), axis=1)
df.columns = ["base series"] + list(self.ruleset.rules.keys())
return df
[docs] def get_final_result(self):
"""Get final timeseries with flagged values set to NaN.
Returns
-------
series : pandas.Series
Timeseries produced by final step in RuleSet with flagged
values set to NaN.
"""
key = len(self.results.keys()) - 1
s = self.results[key]
s.name = self.name
return s
[docs] def get_corrections_dataframe(self):
"""Get DataFrame containing corrections.
Returns
-------
df : pandas.DataFrame
DataFrame containing corrections. NaN means value is flagged
as suspicious, 0.0 means no correction.
"""
clist = []
for s in self.corrections.values():
if isinstance(s, np.ndarray):
s = pd.Series(dtype=float)
clist.append(s.fillna(-9999))
# corrections are nan, 0.0 means nothing is changed
df = (pd.concat(clist, axis=1)
.isna()
.astype(float)
.replace(0.0, np.nan)
.replace(1.0, 0.0))
df.columns = list(self.ruleset.rules.keys())
return df
[docs] def get_corrections_comparison(self, truth=None):
if truth is None and self.truth is not None:
truth = self.truth
else:
raise ValueError("Supply a time series for 'truth'!")
comments_traval = self.get_comment_series()
comments_traval.name = "traval_comment"
mask_truth_corrections = truth.iloc[:, 0].isna()
comments_truth = truth.loc[mask_truth_corrections]
k = list(self.comparisons.keys())[-1]
comparison = self.comparisons[k].comparison_series()
translate = {
-1: "Value modified",
0: "Flagged in both",
1: "Only flagged in 'truth' series",
2: "Only flagged in 'traval' series",
-9999: "NaN in both"
}
comparison = comparison.apply(lambda v: translate[v])
comparison.name = "comparison_label"
raw_index = (comments_traval.index
.union(comments_truth.index))
truth.columns = ["truth_series", "truth_comment"]
traval_series = self.get_final_result()
traval_series.name = "traval_series"
df = pd.concat([
self.series.loc[raw_index.intersection(self.series.index)],
traval_series.loc[raw_index.intersection(traval_series.index)],
comments_traval,
truth.loc[raw_index.intersection(truth.index)],
comparison.loc[raw_index.intersection(comparison.index)]
], axis=1)
return df
[docs] def plot_overview(self, mark_suspects=True, **kwargs):
"""Plot timeseries with flagged values per applied rule.
Parameters
----------
mark_suspects : bool, optional
mark suspect values with red X, by default True
Returns
-------
ax : list of matplotlib.pyplot.Axes
axes objects
"""
resultsdf = self.get_results_dataframe()
if "figsize" in kwargs:
figsize = kwargs.pop("figsize")
else:
figsize = (12, 5)
fig, axes = plt.subplots(len(self.corrections) + 1, 1,
sharex=True, sharey=True, figsize=figsize,
**kwargs)
for iax, icol in zip(axes, resultsdf):
iax.plot(resultsdf.index, resultsdf[icol], label=icol)
if mark_suspects:
if icol != resultsdf.columns[0]:
corr = self.corrections[resultsdf.columns.get_loc(icol)]
if isinstance(corr, pd.Series):
iax.plot(corr.index,
resultsdf.loc[corr.index].iloc[:, 0],
marker="x", c="C3", ls="none",
label="flagged")
iax.legend(loc="upper left", ncol=2)
iax.grid(b=True)
fig.tight_layout()
return axes