Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anomaly detection): Cron to cleanup disabled alerts #1455

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/celery_app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import seer.app # noqa: F401
from celery_app.app import celery_app as celery # noqa: F401
from celery_app.config import CeleryQueues
from seer.anomaly_detection.tasks import cleanup_timeseries # noqa: F401
from seer.anomaly_detection.tasks import cleanup_disabled_alerts, cleanup_timeseries # noqa: F401
from seer.automation.autofix.tasks import check_and_mark_recent_autofix_runs
from seer.automation.tasks import delete_data_for_ttl
from seer.configuration import AppConfig
Expand Down Expand Up @@ -35,3 +35,10 @@ def setup_periodic_tasks(sender, config: AppConfig = injected, **kwargs):
try_grpc_client.signature(kwargs={}, queue=CeleryQueues.DEFAULT),
name="Try executing grpc request every minute.",
)

if config.ANOMALY_DETECTION_ENABLED:
sender.add_periodic_task(
crontab(minute="0", hour="0", day_of_week="0"), # Run once a week on Sunday
cleanup_disabled_alerts.signature(kwargs={}, queue=CeleryQueues.DEFAULT),
name="Clean up old disabled timeseries every week",
)
37 changes: 37 additions & 0 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from datetime import datetime, timedelta
from operator import and_, or_

import numpy as np
import sentry_sdk
Expand Down Expand Up @@ -109,3 +111,38 @@ def toggle_data_purge_flag(alert_id: int):
)
alert.data_purge_flag = new_flag
session.commit()


@celery_app.task
@sentry_sdk.trace
def cleanup_disabled_alerts():

logger.info(
"Cleaning up timeseries data for alerts that have been inactive (detection has not been run) for over 28 days"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit - add date_threshold in this info message

)

date_threshold = datetime.now() - timedelta(days=28)

with Session() as session:
# Get and delete alerts that haven't been queued for detection in the last 28 days indicating that they are disabled and are safe to cleanup
alerts = (
session.query(DbDynamicAlert)
.filter(
or_(
DbDynamicAlert.last_queued_at < date_threshold,
and_(
DbDynamicAlert.last_queued_at.is_(None),
DbDynamicAlert.created_at < date_threshold,
),
)
)
.all()
)

deleted_count = len(alerts)

for alert in alerts:
session.delete(alert)

session.commit()
logger.info(f"Deleted {deleted_count} alerts")
63 changes: 61 additions & 2 deletions tests/seer/anomaly_detection/test_cleanup_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from seer.anomaly_detection.models import MPTimeSeriesAnomalies
from seer.anomaly_detection.models.external import AnomalyDetectionConfig, TimeSeriesPoint
from seer.anomaly_detection.models.timeseries import TimeSeries
from seer.anomaly_detection.tasks import cleanup_timeseries
from seer.db import DbDynamicAlert, Session, TaskStatus
from seer.anomaly_detection.tasks import cleanup_disabled_alerts, cleanup_timeseries
from seer.db import DbDynamicAlert, DbDynamicAlertTimeSeries, Session, TaskStatus


class TestCleanupTasks(unittest.TestCase):
Expand Down Expand Up @@ -154,3 +154,62 @@ def test_cleanup_timeseries(self):
# Fails due to invalid alert_id
with self.assertRaises(Exception):
cleanup_timeseries(999, date_threshold)

def test_cleanup_disabled_alerts(self):
# Create and save alerts with old points
external_alert_id1, _, _, _ = self._save_alert(1000, 0)
external_alert_id2, _, _, _ = self._save_alert(500, 0)
external_alert_id3, _, _, _ = self._save_alert(0, 500)
external_alert_id4, _, _, _ = self._save_alert(0, 500)

# Set last_queued_at to be over 28 days ago for alerts 1 and 2
with Session() as session:
for alert_id in [external_alert_id1, external_alert_id2]:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == alert_id)
.one_or_none()
)
assert alert is not None
alert.last_queued_at = datetime.now() - timedelta(days=29)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a unit that includes one alert with last_queued_at as null


for alert_id in [external_alert_id3]:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == alert_id)
.one_or_none()
)
assert alert is not None
alert.created_at = datetime.now() - timedelta(days=29)
alert.last_queued_at = None

session.commit()

cleanup_disabled_alerts()

with Session() as session:
for alert_id in [external_alert_id1, external_alert_id2, external_alert_id3]:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == alert_id)
.one_or_none()
)
assert alert is None

timeseries = (
session.query(DbDynamicAlertTimeSeries)
.filter(DbDynamicAlertTimeSeries.dynamic_alert_id == alert_id)
.all()
)
assert len(timeseries) == 0

# Confirm that alert 4 and its respective timeseries are not deleted
with Session() as session:
alert = (
session.query(DbDynamicAlert)
.filter(DbDynamicAlert.external_alert_id == external_alert_id4)
.one_or_none()
)

assert alert is not None
assert len(alert.timeseries) == 500
8 changes: 7 additions & 1 deletion tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def test_detected_celery_jobs():
assert set(k for k in celery_app.tasks.keys() if not k.startswith("celery.")) == set(
[
"seer.anomaly_detection.tasks.cleanup_timeseries",
"seer.anomaly_detection.tasks.cleanup_disabled_alerts",
"seer.automation.autofix.steps.change_describer_step.autofix_change_describer_task",
"seer.automation.autofix.steps.coding_step.autofix_coding_task",
"seer.automation.autofix.steps.root_cause_step.root_cause_task",
Expand All @@ -30,6 +31,7 @@ def test_detected_celery_jobs():
[
"Check and mark recent autofix runs every hour",
"Delete old Automation runs for 90 day time-to-live",
"Clean up old disabled timeseries every week",
]
)

Expand All @@ -43,7 +45,11 @@ def test_anomaly_beat_jobs():
app_config.ANOMALY_DETECTION_ENABLED = True
app.finalize()

assert set(k for k in app.conf.beat_schedule.keys()) == set([])
assert set(k for k in app.conf.beat_schedule.keys()) == set(
[
"Clean up old disabled timeseries every week",
]
)


def test_autofix_beat_jobs():
Expand Down
Loading