Skip to content

Commit

Permalink
Add anomaly detector based on standard deviation range from mean
Browse files Browse the repository at this point in the history
  • Loading branch information
mazzma12 committed Sep 8, 2023
1 parent 6041f5b commit 7621454
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/adtk/detector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PersistAD,
QuantileAD,
SeasonalAD,
VolatilityRangeAD,
ThresholdAD,
VolatilityShiftAD,
)
Expand Down Expand Up @@ -56,6 +57,7 @@ def print_all_models() -> None:
"ThresholdAD",
"QuantileAD",
"InterQuartileRangeAD",
"VolatilityRangeAD",
"GeneralizedESDTestAD",
"PersistAD",
"LevelShiftAD",
Expand Down
59 changes: 59 additions & 0 deletions src/adtk/detector/_detector_1d.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,65 @@ def _predict_core(self, s: pd.Series) -> pd.Series:
return predicted


class VolatilityRangeAD(_TrainableUnivariateDetector):
"""Anomaly detector based on standard deviation range.
This detector flags anomalies for values that are outside the range of [mean - c * std, mean + c * std].
Parameters
----------
c : float, optional (default=1.0)
The multiplier for the standard deviation to set the range for anomaly detection.
"""

def __init__(
self,
c: Union[
Optional[float], Tuple[Optional[float], Optional[float]]
] = 3.0,
) -> None:
super().__init__()
self.c = c

@property
def _param_names(self) -> Tuple[str, ...]:
return ("c",)

def _fit_core(self, s: pd.Series) -> None:
if s.count() == 0:
raise RuntimeError("Valid values are not enough for training.")
mean_val = s.mean()
std_val = s.std()

self.abs_low_ = (
(
mean_val
- std_val
* (self.c if (not isinstance(self.c, tuple)) else self.c[0])
)
if (
(self.c if (not isinstance(self.c, tuple)) else self.c[0])
is not None
)
else -float("inf")
)
self.abs_high_ = (
mean_val
+ std_val
* (self.c if (not isinstance(self.c, tuple)) else self.c[1])
if (
(self.c if (not isinstance(self.c, tuple)) else self.c[1])
is not None
)
else float("inf")
)

def _predict_core(self, s: pd.Series) -> pd.Series:
predicted = (s > self.abs_high_) | (s < self.abs_low_)
predicted[s.isna()] = np.nan
return predicted


class GeneralizedESDTestAD(_TrainableUnivariateDetector):
"""Detector that detects anomaly based on generalized ESD test.
Expand Down

0 comments on commit 7621454

Please sign in to comment.