Skip to content

Commit

Permalink
feat(anomaly detection): Cron to cleanup disabled alerts (#1455)
Browse files Browse the repository at this point in the history
- Currently, cleanup only happens upon detection call leaving stale
timestamps for disabled alerts to accumulate
- Created cron which runs weekly to cleanup (delete) disabled alerts
based on when they were last queued for detection (28 days) or if
last_queued is null then deletes old alerts
- Cleaning alerts as a byproduct cleans the timeseries that are
associated with them because of the parent-child relationship
  • Loading branch information
aayush-se authored Nov 21, 2024
1 parent 0fe1c90 commit f0f7c7f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 4 deletions.
9 changes: 8 additions & 1 deletion src/celery_app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import seer.app # noqa: F401
from celery_app.app import celery_app as celery # noqa: F401
from celery_app.config import CeleryQueues
from seer.anomaly_detection.tasks import cleanup_timeseries # noqa: F401
from seer.anomaly_detection.tasks import cleanup_disabled_alerts, cleanup_timeseries # noqa: F401
from seer.automation.autofix.tasks import check_and_mark_recent_autofix_runs
from seer.automation.tasks import delete_data_for_ttl
from seer.configuration import AppConfig
Expand Down Expand Up @@ -35,3 +35,10 @@ def setup_periodic_tasks(sender, config: AppConfig = injected, **kwargs):
try_grpc_client.signature(kwargs={}, queue=CeleryQueues.DEFAULT),
name="Try executing grpc request every minute.",
)

if config.ANOMALY_DETECTION_ENABLED:
sender.add_periodic_task(
crontab(minute="0", hour="0", day_of_week="0"), # Run once a week on Sunday
cleanup_disabled_alerts.signature(kwargs={}, queue=CeleryQueues.DEFAULT),
name="Clean up old disabled timeseries every week",
)
37 changes: 37 additions & 0 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from datetime import datetime, timedelta
from operator import and_, or_

import numpy as np
import sentry_sdk
Expand Down Expand Up @@ -109,3 +111,38 @@ def toggle_data_purge_flag(alert_id: int):
)
alert.data_purge_flag = new_flag
session.commit()


@celery_app.task
@sentry_sdk.trace
def cleanup_disabled_alerts():

date_threshold = datetime.now() - timedelta(days=28)

logger.info(
f"Cleaning up timeseries data for alerts that have been inactive (detection has not been run) since {date_threshold}"
)

with Session() as session:
# Get and delete alerts that haven't been queued for detection in the last 28 days indicating that they are disabled and are safe to cleanup
alerts = (
session.query(DbDynamicAlert)
.filter(
or_(
DbDynamicAlert.last_queued_at < date_threshold,
and_(
DbDynamicAlert.last_queued_at.is_(None),
DbDynamicAlert.created_at < date_threshold,
),
)
)
.all()
)

deleted_count = len(alerts)

for alert in alerts:
session.delete(alert)

session.commit()
logger.info(f"Deleted {deleted_count} alerts")
63 changes: 61 additions & 2 deletions tests/seer/anomaly_detection/test_cleanup_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from seer.anomaly_detection.models import MPTimeSeriesAnomalies
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.anomaly_detection.models.timeseries import TimeSeries
from seer.anomaly_detection.tasks import cleanup_timeseries
from seer.db import DbDynamicAlert, Session, TaskStatus
from seer.anomaly_detection.tasks import cleanup_disabled_alerts, cleanup_timeseries
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session, TaskStatus


class TestCleanupTasks(unittest.TestCase):
Expand Down Expand Up @@ -154,3 +154,62 @@ def test_cleanup_timeseries(self):
# Fails due to invalid alert_id
with self.assertRaises(Exception):
cleanup_timeseries(999, date_threshold)

def test_cleanup_disabled_alerts(self):
# Create and save alerts with old points
external_alert_id1, _, _, _ = self._save_alert(1000, 0)
external_alert_id2, _, _, _ = self._save_alert(500, 0)
external_alert_id3, _, _, _ = self._save_alert(0, 500)
external_alert_id4, _, _, _ = self._save_alert(0, 500)

# Set last_queued_at to be over 28 days ago for alerts 1 and 2
with Session() as session:
for alert_id in [external_alert_id1, external_alert_id2]:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == alert_id)
.one_or_none()
)
assert alert is not None
alert.last_queued_at = datetime.now() - timedelta(days=29)

for alert_id in [external_alert_id3]:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == alert_id)
.one_or_none()
)
assert alert is not None
alert.created_at = datetime.now() - timedelta(days=29)
alert.last_queued_at = None

session.commit()

cleanup_disabled_alerts()

with Session() as session:
for alert_id in [external_alert_id1, external_alert_id2, external_alert_id3]:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == alert_id)
.one_or_none()
)
assert alert is None

timeseries = (
session.query(DbDynamicAlertTimeSeries)
.filter(DbDynamicAlertTimeSeries.dynamic_alert_id == alert_id)
.all()
)
assert len(timeseries) == 0

# Confirm that alert 4 and its respective timeseries are not deleted
with Session() as session:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == external_alert_id4)
.one_or_none()
)

assert alert is not None
assert len(alert.timeseries) == 500
8 changes: 7 additions & 1 deletion tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def test_detected_celery_jobs():
assert set(k for k in celery_app.tasks.keys() if not k.startswith("celery.")) == set(
[
"seer.anomaly_detection.tasks.cleanup_timeseries",
"seer.anomaly_detection.tasks.cleanup_disabled_alerts",
"seer.automation.autofix.steps.change_describer_step.autofix_change_describer_task",
"seer.automation.autofix.steps.coding_step.autofix_coding_task",
"seer.automation.autofix.steps.root_cause_step.root_cause_task",
Expand All @@ -30,6 +31,7 @@ def test_detected_celery_jobs():
[
"Check and mark recent autofix runs every hour",
"Delete old Automation runs for 90 day time-to-live",
"Clean up old disabled timeseries every week",
]
)

Expand All @@ -43,7 +45,11 @@ def test_anomaly_beat_jobs():
app_config.ANOMALY_DETECTION_ENABLED = True
app.finalize()

assert set(k for k in app.conf.beat_schedule.keys()) == set([])
assert set(k for k in app.conf.beat_schedule.keys()) == set(
[
"Clean up old disabled timeseries every week",
]
)


def test_autofix_beat_jobs():
Expand Down

0 comments on commit f0f7c7f

Please sign in to comment.