diff --git a/alertobserver/alertobserver.go b/alertobserver/alertobserver.go new file mode 100644 index 0000000000..cf05d9210c --- /dev/null +++ b/alertobserver/alertobserver.go @@ -0,0 +1,36 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "github.com/prometheus/alertmanager/types" +) + +const ( + EventAlertReceived string = "received" + EventAlertRejected string = "rejected" + EventAlertAddedToAggrGroup string = "addedAggrGroup" + EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup" + EventAlertPipelineStart string = "pipelineStart" + EventAlertPipelinePassStage string = "pipelinePassStage" + EventAlertMuted string = "muted" + EventAlertSent string = "sent" + EventAlertSendFailed string = "sendFailed" +) + +type AlertEventMeta map[string]interface{} + +type LifeCycleObserver interface { + Observe(event string, alerts []*types.Alert, meta AlertEventMeta) +} diff --git a/alertobserver/testing.go b/alertobserver/testing.go new file mode 100644 index 0000000000..66f774fbb7 --- /dev/null +++ b/alertobserver/testing.go @@ -0,0 +1,46 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "sync" + + "github.com/prometheus/alertmanager/types" +) + +type FakeLifeCycleObserver struct { + AlertsPerEvent map[string][]*types.Alert + PipelineStageAlerts map[string][]*types.Alert + MetaPerEvent map[string][]AlertEventMeta + Mtx sync.RWMutex +} + +func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) { + o.Mtx.Lock() + defer o.Mtx.Unlock() + if event == EventAlertPipelinePassStage { + o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...) + } else { + o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...) + } + o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta) +} + +func NewFakeLifeCycleObserver() *FakeLifeCycleObserver { + return &FakeLifeCycleObserver{ + PipelineStageAlerts: map[string][]*types.Alert{}, + AlertsPerEvent: map[string][]*types.Alert{}, + MetaPerEvent: map[string][]AlertEventMeta{}, + } +} diff --git a/api/api.go b/api/api.go index b48a38c324..e7f4e6e95f 100644 --- a/api/api.go +++ b/api/api.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/alertmanager/alertobserver" apiv2 "github.com/prometheus/alertmanager/api/v2" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" @@ -81,6 +82,9 @@ type Options struct { GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos // APICallback define the callback function that each api call will perform before returned. APICallback callback.Callback + // AlertLCObserver is used to add hooks to the different alert life cycle events. + // If nil then no observer methods will be invoked in the life cycle events. + AlertLCObserver alertobserver.LifeCycleObserver } func (o Options) validate() error { @@ -127,6 +131,7 @@ func New(opts Options) (*API, error) { opts.Peer, log.With(l, "version", "v2"), opts.Registry, + opts.AlertLCObserver, ) if err != nil { return nil, err diff --git a/api/v2/api.go b/api/v2/api.go index 3deda8acec..811402c34d 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -37,6 +37,7 @@ import ( alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist" "github.com/prometheus/alertmanager/util/callback" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi" @@ -77,8 +78,9 @@ type API struct { route *dispatch.Route setAlertStatus setAlertStatusFn - logger log.Logger - m *metrics.Alerts + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver Handler http.Handler } @@ -101,6 +103,7 @@ func NewAPI( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, + o alertobserver.LifeCycleObserver, ) (*API, error) { if apiCallback == nil { apiCallback = callback.NoopAPICallback{} @@ -116,6 +119,7 @@ func NewAPI( logger: l, m: metrics.NewAlerts(r), uptime: time.Now(), + alertLCObserver: o, } // Load embedded swagger file. @@ -403,12 +407,20 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } continue } validAlerts = append(validAlerts, a) } if err := api.alerts.Put(validAlerts...); err != nil { level.Error(logger).Log("msg", "Failed to create alerts", "err", err) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error()) } @@ -416,6 +428,9 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error()) return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error()) } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } return alert_ops.NewPostAlertsOK() } diff --git a/api/v2/api_test.go b/api/v2/api_test.go index 794a1219c1..904f3b943a 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -17,6 +17,8 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/prometheus/client_golang/prometheus" + "io" "net/http" "net/http/httptest" @@ -29,6 +31,8 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/api/metrics" alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert" alertgroup_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroup" alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist" @@ -1123,6 +1127,67 @@ func TestListAlertInfosHandler(t *testing.T) { } } +func TestPostAlertHandler(t *testing.T) { + now := time.Now() + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now, time.Time{}, false, 200}, + {time.Time{}, now.Add(time.Duration(-1) * time.Second), false, 200}, + {time.Time{}, now, false, 200}, + {time.Time{}, now.Add(time.Duration(1) * time.Second), false, 200}, + {now.Add(time.Duration(-2) * time.Second), now.Add(time.Duration(-1) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now.Add(time.Duration(2) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now, false, 400}, + } { + alerts, alertsBytes := createAlert(t, tc.start, tc.end) + api := API{ + uptime: time.Now(), + alerts: newFakeAlerts([]*types.Alert{}), + logger: log.NewNopLogger(), + m: metrics.NewAlerts(prometheus.NewRegistry()), + } + api.Update(&config.Config{ + Global: &config.GlobalConfig{ + ResolveTimeout: model.Duration(5), + }, + Route: &config.Route{}, + }, nil) + + r, err := http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + + w := httptest.NewRecorder() + p := runtime.TextProducer() + responder := api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + responder.WriteResponse(w, p) + body, _ := io.ReadAll(w.Result().Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + amAlert := OpenAPIAlertsToAlerts(alerts) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint()) + } + } +} + type limitNumberOfAlertsReturnedCallback struct { limit int } diff --git a/api/v2/testing.go b/api/v2/testing.go index fbb38a9d53..0f3f74a554 100644 --- a/api/v2/testing.go +++ b/api/v2/testing.go @@ -137,3 +137,23 @@ func newGetAlertStatus(f *fakeAlerts) func(model.Fingerprint) types.AlertStatus return status } } + +func createAlert(t *testing.T, start, ends time.Time) (open_api_models.PostableAlerts, []byte) { + startsAt := strfmt.DateTime(start) + endsAt := strfmt.DateTime(ends) + + alert := open_api_models.PostableAlert{ + StartsAt: startsAt, + EndsAt: endsAt, + Annotations: open_api_models.LabelSet{"annotation1": "some text"}, + Alert: open_api_models.Alert{ + Labels: open_api_models.LabelSet{"label1": "test1"}, + GeneratorURL: "http://localhost:3000", + }, + } + alerts := open_api_models.PostableAlerts{} + alerts = append(alerts, &alert) + b, err := json.Marshal(alerts) + require.NoError(t, err) + return alerts, b +} diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 531708acdf..139b8aa4c2 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -437,6 +437,7 @@ func run() int { intervener, notificationLog, pipelinePeer, + nil, ) configuredReceivers.Set(float64(len(activeReceivers))) @@ -448,7 +449,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics, nil) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index eca9221527..a0acb494fc 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/store" @@ -94,7 +95,8 @@ type Dispatcher struct { ctx context.Context cancel func() - logger log.Logger + logger log.Logger + alertLCObserver alertobserver.LifeCycleObserver } // Limits describes limits used by Dispatcher. @@ -115,19 +117,21 @@ func NewDispatcher( lim Limits, l log.Logger, m *DispatcherMetrics, + o alertobserver.LifeCycleObserver, ) *Dispatcher { if lim == nil { lim = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - timeout: to, - logger: log.With(l, "component", "dispatcher"), - metrics: m, - limits: lim, + alerts: ap, + stage: s, + route: r, + timeout: to, + logger: log.With(l, "component", "dispatcher"), + metrics: m, + limits: lim, + alertLCObserver: o, } return disp } @@ -368,13 +372,25 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { ag, ok := routeGroups[fp] if ok { ag.insert(alert) + if d.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "groupKey": ag.GroupKey(), + "routeId": ag.routeID, + "groupId": ag.GroupID(), + } + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, m) + } return } // If the group does not exist, create it. But check the limit first. if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { d.metrics.aggrGroupLimitReached.Inc() - level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + errMsg := "Too many aggregation groups, cannot create new group for alert" + level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg}) + } return } @@ -382,6 +398,14 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() + if d.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "groupKey": ag.GroupKey(), + "routeId": ag.routeID, + "groupId": ag.GroupID(), + } + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, m) + } // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st @@ -498,6 +522,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { // Populate context with information needed along the pipeline. ctx = notify.WithGroupKey(ctx, ag.GroupKey()) + ctx = notify.WithGroupId(ctx, ag.GroupID()) ctx = notify.WithGroupLabels(ctx, ag.labels) ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 546405f797..8e5c641f3e 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/mem" @@ -107,6 +108,9 @@ func TestAggrGroup(t *testing.T) { if _, ok := notify.GroupKey(ctx); !ok { t.Errorf("group key missing") } + if _, ok := notify.GroupId(ctx); !ok { + t.Errorf("group id missing") + } if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) { t.Errorf("wrong group labels: %q", lbls) } @@ -374,7 +378,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -516,7 +520,7 @@ route: recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} lim := limits{groups: 6} m := NewDispatcherMetrics(true, prometheus.NewRegistry()) - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, nil) go dispatcher.Run() defer dispatcher.Stop() @@ -612,7 +616,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -726,6 +730,74 @@ route: }, alertGroupInfos) } +func TestGroupsAlertLCObserver(t *testing.T) { + confData := `receivers: +- name: 'testing' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'testing'` + conf, err := config.Load(confData) + if err != nil { + t.Fatal(err) + } + + logger := log.NewNopLogger() + route := NewRoute(conf.Route, nil) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + timeout := func(d time.Duration) time.Duration { return time.Duration(0) } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + m := NewDispatcherMetrics(true, prometheus.NewRegistry()) + observer := alertobserver.NewFakeLifeCycleObserver() + lim := limits{groups: 1} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer) + go dispatcher.Run() + defer dispatcher.Stop() + + // Create alerts. the dispatcher will automatically create the groups. + alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}) + alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"}) + err = alerts.Put(alert1) + if err != nil { + t.Fatal(err) + } + // Let alerts get processed. + for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + err = alerts.Put(alert2) + if err != nil { + t.Fatal(err) + } + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + observer.Mtx.RLock() + defer observer.Mtx.RUnlock() + require.Equal(t, 1, len(recorder.Alerts())) + require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) + groupFp := getGroupLabels(alert1, route).Fingerprint() + group := dispatcher.aggrGroupsPerRoute[route][groupFp] + groupKey := group.GroupKey() + groupId := group.GroupID() + routeId := group.routeID + require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string)) + require.Equal(t, groupId, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupId"].(string)) + require.Equal(t, routeId, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["routeId"].(string)) + + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup])) + require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint()) +} + type recordStage struct { mtx sync.RWMutex alerts map[string]map[model.Fingerprint]*types.Alert @@ -790,7 +862,7 @@ func TestDispatcherRace(t *testing.T) { defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() dispatcher.Stop() } @@ -818,7 +890,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() diff --git a/notify/notify.go b/notify/notify.go index 30861a3027..043f7fe8ff 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "sort" + "strings" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" @@ -119,6 +121,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyGroupId ) // WithReceiverName populates a context with a receiver name. @@ -131,6 +134,11 @@ func WithGroupKey(ctx context.Context, s string) context.Context { return context.WithValue(ctx, keyGroupKey, s) } +// WithGroupId populates a context with a group id. +func WithGroupId(ctx context.Context, s string) context.Context { + return context.WithValue(ctx, keyGroupId, s) +} + // WithFiringAlerts populates a context with a slice of firing alerts. func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context { return context.WithValue(ctx, keyFiringAlerts, alerts) @@ -186,6 +194,13 @@ func GroupKey(ctx context.Context) (string, bool) { return v, ok } +// GroupId extracts a group id from the context. Iff none exists, the +// second argument is false. +func GroupId(ctx context.Context) (string, bool) { + v, ok := ctx.Value(keyGroupId).(string) + return v, ok +} + // GroupLabels extracts grouping label set from the context. Iff none exists, the // second argument is false. func GroupLabels(ctx context.Context) (model.LabelSet, bool) { @@ -383,18 +398,26 @@ func (pb *PipelineBuilder) New( intervener *timeinterval.Intervener, notificationLog NotificationLog, peer Peer, + o alertobserver.LifeCycleObserver, ) RoutingStage { - rs := make(RoutingStage, len(receivers)) + rs := RoutingStage{ + stages: make(map[string]Stage, len(receivers)), + alertLCObserver: o, + } ms := NewGossipSettleStage(peer) - is := NewMuteStage(inhibitor, pb.metrics) + + is := NewMuteStage(inhibitor, pb.metrics, o) tas := NewTimeActiveStage(intervener, pb.metrics) tms := NewTimeMuteStage(intervener, pb.metrics) - ss := NewMuteStage(silencer, pb.metrics) - + ss := NewMuteStage(silencer, pb.metrics, o) + for name := range receivers { - st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) - rs[name] = MultiStage{ms, is, tas, tms, ss, st} + st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics, o) + rs.stages[name] = MultiStage{ + alertLCObserver: o, + stages: []Stage{ms, is, tas, tms, ss, st}, + } } pb.metrics.InitializeFor(receivers) @@ -409,6 +432,7 @@ func createReceiverStage( wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, + o alertobserver.LifeCycleObserver, ) Stage { var fs FanoutStage for i := range integrations { @@ -417,20 +441,23 @@ func createReceiverStage( Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } - var s MultiStage + var s []Stage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) - s = append(s, NewRetryStage(integrations[i], name, metrics)) + s = append(s, NewRetryStage(integrations[i], name, metrics, o)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) - fs = append(fs, s) + fs = append(fs, MultiStage{stages: s, alertLCObserver: o}) } return fs } // RoutingStage executes the inner stages based on the receiver specified in // the context. -type RoutingStage map[string]Stage +type RoutingStage struct { + stages map[string]Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { @@ -439,21 +466,28 @@ func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types. return ctx, nil, errors.New("receiver missing") } - s, ok := rs[receiver] + s, ok := rs.stages[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } + if rs.alertLCObserver != nil { + rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, alertobserver.AlertEventMeta{"ctx": ctx}) + } + return s.Exec(ctx, l, alerts...) } // A MultiStage executes a series of stages sequentially. -type MultiStage []Stage +type MultiStage struct { + stages []Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error - for _, s := range ms { + for _, s := range ms.stages { if len(alerts) == 0 { return ctx, nil, nil } @@ -462,6 +496,10 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al if err != nil { return ctx, nil, err } + if ms.alertLCObserver != nil { + p := strings.Split(fmt.Sprintf("%T", s), ".") + ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, alertobserver.AlertEventMeta{"ctx": ctx, "stageName": p[len(p)-1]}) + } } return ctx, alerts, nil } @@ -522,13 +560,14 @@ const ( // MuteStage filters alerts through a Muter. type MuteStage struct { - muter types.Muter - metrics *Metrics + muter types.Muter + metrics *Metrics + alertLCObserver alertobserver.LifeCycleObserver } // NewMuteStage return a new MuteStage. -func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage { - return &MuteStage{muter: m, metrics: metrics} +func NewMuteStage(m types.Muter, metrics *Metrics, o alertobserver.LifeCycleObserver) *MuteStage { + return &MuteStage{muter: m, metrics: metrics, alertLCObserver: o} } // Exec implements the Stage interface. @@ -547,6 +586,7 @@ func (n *MuteStage) Exec(ctx context.Context, logger log.Logger, alerts ...*type } // TODO(fabxc): increment muted alerts counter if muted. } + if len(muted) > 0 { level.Debug(logger).Log("msg", "Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted)) @@ -561,6 +601,9 @@ func (n *MuteStage) Exec(ctx context.Context, logger log.Logger, alerts ...*type n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted))) } + if n.alertLCObserver != nil { + n.alertLCObserver.Observe(alertobserver.EventAlertMuted, muted, alertobserver.AlertEventMeta{"ctx": ctx}) + } return ctx, filtered, nil } @@ -733,14 +776,15 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration - groupName string - metrics *Metrics - labelValues []string + integration Integration + groupName string + metrics *Metrics + labelValues []string + alertLCObserver alertobserver.LifeCycleObserver } // NewRetryStage returns a new instance of a RetryStage. -func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { +func NewRetryStage(i Integration, groupName string, metrics *Metrics, o alertobserver.LifeCycleObserver) *RetryStage { labelValues := []string{i.Name()} if metrics.ff.EnableReceiverNamesInMetrics() { @@ -748,16 +792,17 @@ func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStag } return &RetryStage{ - integration: i, - groupName: groupName, - metrics: metrics, - labelValues: labelValues, + integration: i, + groupName: groupName, + metrics: metrics, + labelValues: labelValues, + alertLCObserver: o, } } func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc() - ctx, alerts, err := r.exec(ctx, l, alerts...) + ctx, alerts, sent, err := r.exec(ctx, l, alerts...) failureReason := DefaultReason.String() if err != nil { @@ -766,11 +811,26 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale failureReason = e.Reason.String() } r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc() + if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m) + } + } else if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m) } return ctx, alerts, err } -func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { +func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, []*types.Alert, error) { var sent []*types.Alert // If we shouldn't send notifications for resolved alerts, but there are only @@ -779,10 +839,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if !r.integration.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { - return ctx, nil, errors.New("firing alerts missing") + return ctx, nil, nil, errors.New("firing alerts missing") } if len(firing) == 0 { - return ctx, alerts, nil + return ctx, alerts, sent, nil } for _, a := range alerts { if a.Status() != model.AlertResolved { @@ -824,9 +884,9 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } if iErr != nil { - return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) + return ctx, nil, sent, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) } - return ctx, nil, nil + return ctx, nil, sent, nil default: } @@ -840,7 +900,9 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc() if !retry { - return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) + + return ctx, alerts, sent, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) + } if ctx.Err() == nil { if iErr == nil || err.Error() != iErr.Error() { @@ -858,7 +920,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } lvl.Log("msg", "Notify success", "attempts", i, "duration", dur) - return ctx, alerts, nil + return ctx, alerts, sent, nil } case <-ctx.Done(): if iErr == nil { @@ -870,9 +932,9 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } } if iErr != nil { - return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) + return ctx, nil, sent, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) } - return ctx, nil, nil + return ctx, nil, sent, nil } } } diff --git a/notify/notify_test.go b/notify/notify_test.go index d5ce9612e3..fb3ad74787 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/alertmanager/alertobserver" "io" "reflect" "testing" @@ -306,7 +307,7 @@ func TestMultiStage(t *testing.T) { alerts3 = []*types.Alert{{}, {}, {}} ) - stage := MultiStage{ + stages := []Stage{ StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of MultiStage") @@ -326,7 +327,9 @@ func TestMultiStage(t *testing.T) { return ctx, alerts3, nil }), } - + stage := MultiStage{ + stages: stages, + } _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) @@ -335,13 +338,28 @@ func TestMultiStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts3) { t.Fatal("Output of MultiStage is not equal to the output of the last stage") } + + // Rerun multistage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + ctx := WithGroupKey(context.Background(), "test") + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + require.Equal(t, 1, len(observer.PipelineStageAlerts)) + require.Equal(t, 5, len(observer.PipelineStageAlerts["StageFunc"])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelinePassStage][0]["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) } func TestMultiStageFailure(t *testing.T) { var ( ctx = context.Background() s1 = failStage{} - stage = MultiStage{s1} + stage = MultiStage{stages: []Stage{s1}} ) _, _, err := stage.Exec(ctx, log.NewNopLogger(), nil) @@ -356,7 +374,7 @@ func TestRoutingStage(t *testing.T) { alerts2 = []*types.Alert{{}, {}} ) - stage := RoutingStage{ + s := map[string]Stage{ "name": StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of RoutingStage") @@ -365,6 +383,9 @@ func TestRoutingStage(t *testing.T) { }), "not": failStage{}, } + stage := RoutingStage{ + stages: s, + } ctx := WithReceiverName(context.Background(), "name") @@ -376,6 +397,20 @@ func TestRoutingStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts2) { t.Fatal("Output of RoutingStage is not equal to the output of the inner stage") } + + // Rerun RoutingStage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, len(alerts1), len(observer.AlertsPerEvent[alertobserver.EventAlertPipelineStart])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelineStart][0]["ctx"].(context.Context) + + _, ok := ReceiverName(metaCtx) + require.True(t, ok) + } func TestRetryStageWithError(t *testing.T) { @@ -392,7 +427,7 @@ func TestRetryStageWithError(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -404,6 +439,7 @@ func TestRetryStageWithError(t *testing.T) { ctx := context.Background() ctx = WithFiringAlerts(ctx, []uint64{0}) + ctx = WithGroupKey(ctx, "test") // Notify with a recoverable error should retry and succeed. resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) @@ -412,13 +448,40 @@ func TestRetryStageWithError(t *testing.T) { require.Equal(t, alerts, sent) require.NotNil(t, resctx) + // Rerun recoverable error but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSent])) + meta := observer.MetaPerEvent[alertobserver.EventAlertSent][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx := meta["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) + // Notify with an unrecoverable error should fail. sent = sent[:0] fail = true retry = false + r.alertLCObserver = nil resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.Error(t, err) require.NotNil(t, resctx) + + // Rerun the unrecoverable error but with alert life cycle observer + fail = true + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.NotNil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSendFailed])) + meta = observer.MetaPerEvent[alertobserver.EventAlertSendFailed][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx = meta["ctx"].(context.Context) + _, ok = GroupKey(metaCtx) + require.True(t, ok) } func TestRetryStageWithErrorCode(t *testing.T) { @@ -445,7 +508,7 @@ func TestRetryStageWithErrorCode(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -480,7 +543,7 @@ func TestRetryStageWithContextCanceled(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -511,7 +574,7 @@ func TestRetryStageNoResolved(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -562,7 +625,7 @@ func TestRetryStageSendResolved(t *testing.T) { }), rs: sendResolved(true), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -667,7 +730,7 @@ func TestMuteStage(t *testing.T) { }) metrics := NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}) - stage := NewMuteStage(muter, metrics) + stage := NewMuteStage(muter, metrics, nil) in := []model.LabelSet{ {}, @@ -729,7 +792,7 @@ func TestMuteStageWithSilences(t *testing.T) { marker := types.NewMarker(reg) silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) metrics := NewMetrics(reg, featurecontrol.NoopFlags{}) - stage := NewMuteStage(silencer, metrics) + stage := NewMuteStage(silencer, metrics, nil) in := []model.LabelSet{ {}, @@ -820,6 +883,46 @@ func TestMuteStageWithSilences(t *testing.T) { } } +func TestMuteStageWithAlertObserver(t *testing.T) { + silences, err := silence.New(silence.Options{Retention: time.Hour}) + if err != nil { + t.Fatal(err) + } + _, err = silences.Set(&silencepb.Silence{ + EndsAt: utcNow().Add(time.Hour), + Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, + }) + if err != nil { + t.Fatal(err) + } + + marker := types.NewMarker(prometheus.NewRegistry()) + silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) + observer := alertobserver.NewFakeLifeCycleObserver() + metrics := NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}) + stage := NewMuteStage(silencer, metrics, observer) + + in := []model.LabelSet{ + {"test": "set"}, + {"mute": "me"}, + {"foo": "bar", "test": "set"}, + } + + var inAlerts []*types.Alert + for _, lset := range in { + inAlerts = append(inAlerts, &types.Alert{ + Alert: model.Alert{Labels: lset}, + }) + } + + _, _, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertMuted])) + require.Equal(t, inAlerts[1], observer.AlertsPerEvent[alertobserver.EventAlertMuted][0]) +} + func TestTimeMuteStage(t *testing.T) { // Route mutes alerts outside business hours in November, using the +1100 timezone. muteIn := `