Skip to content

Commit

Permalink
Use dep injection for config and ensure fixed window is recalced for …
Browse files Browse the repository at this point in the history
…pruning
  • Loading branch information
aayush-se committed Nov 22, 2024
1 parent a32f3bf commit f3839de
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 13 deletions.
21 changes: 14 additions & 7 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from seer.anomaly_detection.models.converters import convert_external_ts_to_internal
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session, TaskStatus
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,8 +85,13 @@ def combine_anomalies(

class DbAlertDataAccessor(AlertDataAccessor):

@inject
@sentry_sdk.trace
def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
def _hydrate_alert(
self,
db_alert: DbDynamicAlert,
mp_config: MPConfig = injected,
) -> DynamicAlert:
rand_offset = random.randint(0, 24)
timestamp_threshold = (datetime.now() - timedelta(days=28, hours=rand_offset)).timestamp()
num_old_points = 0
Expand All @@ -107,7 +113,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
and "mp_suss" not in timeseries[-1].anomaly_algo_data
and "mp_fixed" not in timeseries[-1].anomaly_algo_data
):
timeseries = self._recalculate_batch_detection(db_alert)
timeseries = self._recalculate_batch_detection(db_alert, mp_config)

for point in timeseries:
ts.append(point.timestamp.timestamp())
Expand All @@ -118,7 +124,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
if point.anomaly_algo_data is not None:
algo_data = MPTimeSeriesAnomalies.extract_algo_data(point.anomaly_algo_data)

if algo_data["mp_suss"]:
if "mp_suss" in algo_data:
mp_suss_data = [
algo_data["mp_suss"]["dist"],
algo_data["mp_suss"]["idx"],
Expand All @@ -135,9 +141,9 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
algo_data["mp_fixed"]["r_idx"],
]
mp_fixed.append(mp_fixed_data)

original_flags.append(algo_data["original_flag"])
use_suss.append(algo_data["use_suss"])

if point.timestamp.timestamp() < timestamp_threshold:
num_old_points += 1
# Default value is "none" for original flags
Expand All @@ -160,7 +166,7 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
matrix_profile_fixed=stumpy.mparray.mparray(
mp_fixed,
k=1,
m=MPConfig().fixed_window_size,
m=mp_config.fixed_window_size,
excl_zone_denom=stumpy.config.STUMPY_EXCL_ZONE_DENOM,
),
window_size=window_size,
Expand All @@ -185,9 +191,10 @@ def _hydrate_alert(self, db_alert: DbDynamicAlert) -> DynamicAlert:
),
)

@inject
@sentry_sdk.trace
def _recalculate_batch_detection(
self, db_alert: DbDynamicAlert
self, db_alert: DbDynamicAlert, mp_config: MPConfig = injected
) -> list[DbDynamicAlertTimeSeries]:
"""
Recalculates the matrix profiles for SuSS and Fixed windows for an alert then returns the updated timeseries
Expand All @@ -214,7 +221,7 @@ def _recalculate_batch_detection(
anomalies_fixed = batch_detector.detect(
convert_external_ts_to_internal(timeseries),
ad_config,
window_size=MPConfig().fixed_window_size,
window_size=mp_config.fixed_window_size,
)
recalculated_anomalies = self.combine_anomalies(
anomalies_suss, anomalies_fixed, [True] * len(timeseries)
Expand Down
4 changes: 3 additions & 1 deletion src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ def __init__(self):
)
stream.update(6.0)

@inject
@sentry_sdk.trace
def _batch_detect(
self,
timeseries: List[TimeSeriesPoint],
config: AnomalyDetectionConfig,
window_size: int | None = None,
mp_config: MPConfig = injected,
) -> Tuple[List[TimeSeriesPoint], MPTimeSeriesAnomalies]:
"""
Stateless batch anomaly detection on entire timeseries as provided. In batch mode, analysis of a
Expand All @@ -87,7 +89,7 @@ def _batch_detect(
anomalies_fixed = batch_detector.detect(
convert_external_ts_to_internal(timeseries),
config,
window_size=MPConfig().fixed_window_size,
window_size=mp_config.fixed_window_size,
)
anomalies = DbAlertDataAccessor().combine_anomalies(
anomalies_suss, anomalies_fixed, [True] * len(timeseries)
Expand Down
2 changes: 1 addition & 1 deletion src/seer/anomaly_detection/anomaly_detection_di.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def mp_scorer_provider() -> MPScorer:
@anomaly_detection_module.provider
def mpconfig_provider() -> MPConfig:
return MPConfig(
ignore_trivial=True, normalize_mp=False
ignore_trivial=True, normalize_mp=False, fixed_window_size=10
) # Avoiding complexities around normalizing matrix profile across stream computation for now.


Expand Down
4 changes: 2 additions & 2 deletions src/seer/anomaly_detection/detectors/mp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ class MPConfig(BaseModel):
"""

ignore_trivial: bool = Field(
True,
...,
description="Flag that tells the stumpy library to ignore trivial matches to speed up MP computation",
)
normalize_mp: bool = Field(
False,
...,
description="Flag to control if the matrix profile is normalized first",
)
fixed_window_size: int = Field(
Expand Down
21 changes: 19 additions & 2 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import sentry_sdk

from celery_app.app import celery_app
from seer.anomaly_detection.accessors import DbAlertDataAccessor
from seer.anomaly_detection.detectors import MPConfig
from seer.anomaly_detection.detectors.anomaly_detectors import MPBatchAnomalyDetector
from seer.anomaly_detection.models.external import AnomalyDetectionConfig
from seer.anomaly_detection.models.timeseries import TimeSeries
from seer.db import DbDynamicAlert, Session, TaskStatus
from seer.dependency_injection import inject, injected

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,16 +73,30 @@ def delete_old_timeseries_points(alert: DbDynamicAlert, date_threshold: float):
return deleted_count


def update_matrix_profiles(alert: DbDynamicAlert, anomaly_detection_config: AnomalyDetectionConfig):
@inject
def update_matrix_profiles(
alert: DbDynamicAlert,
anomaly_detection_config: AnomalyDetectionConfig,
mp_config: MPConfig = injected,
):

timeseries = TimeSeries(
timestamps=np.array([timestep.timestamp.timestamp() for timestep in alert.timeseries]),
values=np.array([timestep.value for timestep in alert.timeseries]),
)

anomalies = MPBatchAnomalyDetector()._compute_matrix_profile(
anomalies_suss = MPBatchAnomalyDetector()._compute_matrix_profile(
timeseries=timeseries, config=anomaly_detection_config
)
anomalies_fixed = MPBatchAnomalyDetector()._compute_matrix_profile(
timeseries=timeseries,
config=anomaly_detection_config,
window_size=mp_config.fixed_window_size,
)
anomalies = DbAlertDataAccessor().combine_anomalies(
anomalies_suss, anomalies_fixed, [True] * len(timeseries.timestamps)
)

algo_data_map = dict(
zip(timeseries.timestamps, anomalies.get_anomaly_algo_data(len(timeseries.timestamps)))
)
Expand Down

0 comments on commit f3839de

Please sign in to comment.