Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AD to predict only based on past values #102

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ dmypy.json

# Pyre type checker
.pyre/
.idea
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = adtk
version = 0.6.0
version = 0.6.1
author = Arundo Analytics, Inc.
maintainer = Tailai Wen
maintainer_email = [email protected]
Expand Down
49 changes: 38 additions & 11 deletions src/adtk/detector/_detector_1d.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ class PersistAD(_TrainableUnivariateDetector):
Aggregation operation of the time window, either "mean" or "median".
Default: "median".

center: bool, optional
If True, the current point is the right edge of right window;
Otherwise, it is the right edge of left window.
Default: True.

Attributes
----------
pipe_: adtk.pipe.Pipenet
Expand All @@ -420,14 +425,15 @@ def __init__(
side: str = "both",
min_periods: Optional[int] = None,
agg: str = "median",
center: bool = True
) -> None:
self.pipe_ = Pipenet(
{
"diff_abs": {
"model": DoubleRollingAggregate(
agg=agg,
window=(window, 1),
center=True,
center=center,
min_periods=(min_periods, 1),
diff="l1",
),
Expand All @@ -441,7 +447,7 @@ def __init__(
"model": DoubleRollingAggregate(
agg=agg,
window=(window, 1),
center=True,
center=center,
min_periods=(min_periods, 1),
diff="diff",
),
Expand Down Expand Up @@ -482,11 +488,12 @@ def __init__(
self.window = window
self.min_periods = min_periods
self.agg = agg
self.center = center
self._sync_params()

@property
def _param_names(self) -> Tuple[str, ...]:
return ("window", "c", "side", "min_periods", "agg")
return ("window", "c", "side", "min_periods", "agg", "center")

def _sync_params(self) -> None:
if self.agg not in ["median", "mean"]:
Expand All @@ -501,12 +508,14 @@ def _sync_params(self) -> None:
agg=self.agg,
window=(self.window, 1),
min_periods=(self.min_periods, 1),
center=self.center,
)
self.pipe_.steps["iqr_ad"]["model"].set_params(c=(None, self.c))
self.pipe_.steps["diff"]["model"].set_params(
agg=self.agg,
window=(self.window, 1),
min_periods=(self.min_periods, 1),
center=self.center,
)
self.pipe_.steps["sign_check"]["model"].set_params(
high=(
Expand Down Expand Up @@ -570,6 +579,11 @@ class LevelShiftAD(_TrainableUnivariateDetector):
for that window. If 2-tuple, it defines the left and right window
respectively. Default: None, i.e. all observations must have values.

center: bool, optional
If True, the current point is the right edge of right window;
Otherwise, it is the right edge of left window.
Default: True.

Attributes
----------
pipe_: adtk.pipe.Pipenet
Expand All @@ -587,14 +601,15 @@ def __init__(
min_periods: Union[
Optional[int], Tuple[Optional[int], Optional[int]]
] = None,
center: bool = True
) -> None:
self.pipe_ = Pipenet(
{
"diff_abs": {
"model": DoubleRollingAggregate(
agg="median",
window=window,
center=True,
center=center,
min_periods=min_periods,
diff="l1",
),
Expand All @@ -608,7 +623,7 @@ def __init__(
"model": DoubleRollingAggregate(
agg="median",
window=window,
center=True,
center=center,
min_periods=min_periods,
diff="diff",
),
Expand Down Expand Up @@ -648,23 +663,24 @@ def __init__(
self.side = side
self.window = window
self.min_periods = min_periods
self.center = center
self._sync_params()

@property
def _param_names(self) -> Tuple[str, ...]:
return ("window", "c", "side", "min_periods")
return ("window", "c", "side", "min_periods", "center")

def _sync_params(self) -> None:
if self.side not in ["both", "positive", "negative"]:
raise ValueError(
"Parameter `side` must be 'both', 'positive' or 'negative'."
)
self.pipe_.steps["diff_abs"]["model"].set_params(
window=self.window, min_periods=self.min_periods
window=self.window, min_periods=self.min_periods, center=self.center
)
self.pipe_.steps["iqr_ad"]["model"].set_params(c=(None, self.c))
self.pipe_.steps["diff"]["model"].set_params(
window=self.window, min_periods=self.min_periods
window=self.window, min_periods=self.min_periods, center=self.center
)
self.pipe_.steps["sign_check"]["model"].set_params(
high=(
Expand Down Expand Up @@ -1051,6 +1067,11 @@ class SeasonalAD(_TrainableUnivariateDetector):
trend: bool, optional
Whether to extract trend during decomposition. Default: False.

two_sided: bool, optional
The moving average method used in filtering out trend.
If True (default), a centered moving average is computed.
If False, the filter coefficients are for past values only.

Attributes
----------
freq_: int
Expand All @@ -1072,12 +1093,17 @@ def __init__(
side: str = "both",
c: float = 3.0,
trend: bool = False,
two_sided: bool = True
) -> None:
self.pipe_ = Pipenet(
{
"deseasonal_residual": {
"model": (
ClassicSeasonalDecomposition(freq=freq, trend=trend)
ClassicSeasonalDecomposition(
freq=freq,
trend=trend,
two_sided=two_sided
)
),
"input": "original",
},
Expand Down Expand Up @@ -1123,15 +1149,16 @@ def __init__(
self.side = side
self.c = c
self.trend = trend
self.two_sided = two_sided
self._sync_params()

@property
def _param_names(self) -> Tuple[str, ...]:
return ("freq", "side", "c", "trend")
return ("freq", "side", "c", "trend", "two_sided")

def _sync_params(self) -> None:
self.pipe_.steps["deseasonal_residual"]["model"].set_params(
freq=self.freq, trend=self.trend
freq=self.freq, trend=self.trend, two_sided=self.two_sided
)
self.pipe_.steps["iqr_ad"]["model"].set_params(c=(None, self.c))
self.pipe_.steps["sign_check"]["model"].set_params(
Expand Down
2 changes: 1 addition & 1 deletion src/adtk/detector/_detector_hd.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def _fit_core(self, df: pd.DataFrame) -> None:
if df.dropna().empty:
raise RuntimeError("Valid values are not enough for training.")
clustering_result = self.model.fit_predict(df.dropna())
cluster_count = Counter(clustering_result)
cluster_count = Counter(clustering_result) # type: Counter
self._anomalous_cluster_id = cluster_count.most_common()[-1][0]

def _predict_core(self, df: pd.DataFrame) -> pd.Series:
Expand Down
21 changes: 15 additions & 6 deletions src/adtk/transformer/_transformer_1d.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,11 @@ class ClassicSeasonalDecomposition(_TrainableUnivariateTransformer):
If False, the time series will be assumed the sum of seasonal pattern
and residual. Default: False.

two_sided: bool, optional
The moving average method used in filtering out trend.
If True (default), a centered moving average is computed.
If False, the filter coefficients are for past values only.

Attributes
----------
freq_: int
Expand All @@ -669,15 +674,19 @@ class ClassicSeasonalDecomposition(_TrainableUnivariateTransformer):
"""

def __init__(
self, freq: Optional[int] = None, trend: bool = False
self,
freq: Optional[int] = None,
trend: bool = False,
two_sided: bool = True
) -> None:
super().__init__()
self.freq = freq
self.trend = trend
self.two_sided = two_sided

@property
def _param_names(self) -> Tuple[str, ...]:
return ("freq", "trend")
return ("freq", "trend", "two_sided")

def _fit_core(self, s: pd.Series) -> None:
if not (
Expand Down Expand Up @@ -718,9 +727,9 @@ def _fit_core(self, s: pd.Series) -> None:
# get seasonal pattern
if self.trend:
seasonal_decompose_results = (
seasonal_decompose(s, period=self.freq_)
seasonal_decompose(s, period=self.freq_, two_sided=self.two_sided)
if parse(statsmodels.__version__) >= parse("0.11")
else seasonal_decompose(s, freq=self.freq_)
else seasonal_decompose(s, freq=self.freq_, two_sided=self.two_sided)
)
self.seasonal_ = getattr(seasonal_decompose_results, "seasonal")[
: self.freq_
Expand Down Expand Up @@ -801,9 +810,9 @@ def _predict_core(self, s: pd.Series) -> pd.Series:
# remove trend
if self.trend:
seasonal_decompose_results = (
seasonal_decompose(s, period=self.freq_)
seasonal_decompose(s, period=self.freq_, two_sided=self.two_sided)
if parse(statsmodels.__version__) >= parse("0.11")
else seasonal_decompose(s, freq=self.freq_)
else seasonal_decompose(s, freq=self.freq_, two_sided=self.two_sided)
)
s_trend = getattr(seasonal_decompose_results, "trend")
s_detrended = s - s_trend
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def test_pipeline():
)

assert my_pipe.get_params() == {
"deseasonal_residual": {"freq": 6, "trend": False},
"deseasonal_residual": {"freq": 6, "trend": False, "two_sided": True},
"abs_residual": {
"fit_func": None,
"fit_func_params": None,
Expand Down