Skip to content

Commit

Permalink
Feature: Allow configuration of a rule evaluation delay (prometheus#1…
Browse files Browse the repository at this point in the history
…4061)

* [PATCH] Allow having evaluation delay for rule groups

Signed-off-by: Ganesh Vernekar <[email protected]>

* [PATCH] Fix lint

Signed-off-by: Ganesh Vernekar <[email protected]>

* [PATCH] Move the option to ManagerOptions

Signed-off-by: Ganesh Vernekar <[email protected]>

* [PATCH] Include evaluation_delay in the group config

Signed-off-by: Ganesh Vernekar <[email protected]>

* Fix comments

Signed-off-by: gotjosh <[email protected]>

* Add a server configuration option.

Signed-off-by: gotjosh <[email protected]>

* Appease the linter prometheus#1

Signed-off-by: gotjosh <[email protected]>

* Add the new server flag documentation

Signed-off-by: gotjosh <[email protected]>

* Improve documentation of the new flag and configuration

Signed-off-by: gotjosh <[email protected]>

* Use named parameters for clarity on the `Rule` interface

Signed-off-by: gotjosh <[email protected]>

* Add `initial` to the flag help

Signed-off-by: gotjosh <[email protected]>

* Change the CHANGELOG area from `ruler` to `rules`

Signed-off-by: gotjosh <[email protected]>

* Rename evaluation_delay to `rule_query_offset`/`query_offset` and make it a global configuration option.

Signed-off-by: gotjosh <[email protected]>

E Your branch is up to date with 'origin/gotjosh/evaluation-delay'.

* more docs

Signed-off-by: gotjosh <[email protected]>

* Improve wording on CHANGELOG

Signed-off-by: gotjosh <[email protected]>

* Add `RuleQueryOffset` to the default config in tests in case it changes

Signed-off-by: gotjosh <[email protected]>

* Update docs/configuration/recording_rules.md

Co-authored-by: Julius Volz <[email protected]>
Signed-off-by: gotjosh <[email protected]>

* Rename `RuleQueryOffset` to `QueryOffset` when in the group context.

Signed-off-by: gotjosh <[email protected]>

* Improve docstring and documentation on the `rule_query_offset`

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: Ganesh Vernekar <[email protected]>
Signed-off-by: gotjosh <[email protected]>
Co-authored-by: Ganesh Vernekar <[email protected]>
Co-authored-by: Julius Volz <[email protected]>
  • Loading branch information
3 people committed May 30, 2024
1 parent 6683895 commit 37b408c
Show file tree
Hide file tree
Showing 15 changed files with 458 additions and 367 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## unreleased

* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [FEATURE] Rules: Add new option `query_offset` for each rule group via rule group configuration file and `rule_query_offset` as part of the global configuration to have more resilience for remote write delays. #14061
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
Expand Down
3 changes: 3 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,9 @@ func main() {
ResendDelay: time.Duration(cfg.resendDelay),
MaxConcurrentEvals: cfg.maxConcurrentEvals,
ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
DefaultRuleQueryOffset: func() time.Duration {
return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset)
},
})
}

Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ var (
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(1 * time.Minute),
RuleQueryOffset: model.Duration(0 * time.Minute),
// When native histogram feature flag is enabled, ScrapeProtocols default
// changes to DefaultNativeHistogramScrapeProtocols.
ScrapeProtocols: DefaultScrapeProtocols,
Expand Down Expand Up @@ -397,6 +398,8 @@ type GlobalConfig struct {
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// How frequently to evaluate rules by default.
EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"`
// Offset the rule evaluation timestamp of this particular group by the specified duration into the past to ensure the underlying metrics have been received.
RuleQueryOffset model.Duration `yaml:"rule_query_offset"`
// File to which PromQL queries are logged.
QueryLogFile string `yaml:"query_log_file,omitempty"`
// The labels to add to any timeseries that this Prometheus instance scrapes.
Expand Down Expand Up @@ -556,6 +559,7 @@ func (c *GlobalConfig) isZero() bool {
c.ScrapeInterval == 0 &&
c.ScrapeTimeout == 0 &&
c.EvaluationInterval == 0 &&
c.RuleQueryOffset == 0 &&
c.QueryLogFile == "" &&
c.ScrapeProtocols == nil
}
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ global:

# How frequently to evaluate rules.
[ evaluation_interval: <duration> | default = 1m ]

# Offset the rule evaluation timestamp of this particular group by the specified duration into the past to ensure the underlying metrics have been received.
# Metric availability delays are more likely to occur when Prometheus is running as a remote write target, but can also occur when there's anomalies with scraping.
[ rule_query_offset: <duration> | default = 0s ]

# The labels to add to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/recording_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ name: <string>
# rule can produce. 0 is no limit.
[ limit: <int> | default = 0 ]

# Offset the rule evaluation timestamp of this particular group by the specified duration into the past.
[ query_offset: <duration> | default = global.rule_query_offset ]

rules:
[ - <rule> ... ]
```
Expand Down Expand Up @@ -148,6 +151,9 @@ the rule, active, pending, or inactive, are cleared as well. The event will be
recorded as an error in the evaluation, and as such no stale markers are
written.
# Rule query offset
This is useful to ensure the underlying metrics have been received and stored in Prometheus. Metric availability delays are more likely to occur when Prometheus is running as a remote write target due to the nature of distributed systems, but can also occur when there's anomalies with scraping and/or short evaluation intervals.
# Failed rule evaluations due to slow evaluation
If a rule group hasn't finished evaluating before its next evaluation is supposed to start (as defined by the `evaluation_interval`), the next evaluation will be skipped. Subsequent evaluations of the rule group will continue to be skipped until the initial evaluation either completes or times out. When this happens, there will be a gap in the metric produced by the recording rule. The `rule_group_iterations_missed_total` metric will be incremented for each missed iteration of the rule group.
9 changes: 5 additions & 4 deletions model/rulefmt/rulefmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {

// RuleGroup is a list of sequentially evaluated recording and alerting rules.
type RuleGroup struct {
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
QueryOffset *model.Duration `yaml:"query_offset,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
}

// Rule describes an alerting or recording rule.
Expand Down
9 changes: 4 additions & 5 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,9 @@ const resolvedRetention = 15 * time.Minute

// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
func (r *AlertingRule) Eval(ctx context.Context, queryOffset time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
ctx = NewOriginContext(ctx, NewRuleDetail(r))

res, err := query(ctx, r.vector.String(), ts)
res, err := query(ctx, r.vector.String(), ts.Add(-queryOffset))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -484,8 +483,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
}

if r.restored.Load() {
vec = append(vec, r.sample(a, ts))
vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
vec = append(vec, r.sample(a, ts.Add(-queryOffset)))
vec = append(vec, r.forStateSample(a, ts.Add(-queryOffset), float64(a.ActiveAt.Unix())))
}
}

Expand Down
32 changes: 16 additions & 16 deletions rules/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestAlertingRuleTemplateWithHistogram(t *testing.T) {
)

evalTime := time.Now()
res, err := rule.Eval(context.TODO(), evalTime, q, nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, q, nil, 0)
require.NoError(t, err)

require.Len(t, res, 2)
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
Expand All @@ -247,7 +247,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand All @@ -329,7 +329,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
}

res, err = ruleWithExternalLabels.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand All @@ -422,7 +422,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
}

res, err = ruleWithExternalURL.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval(
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
Expand Down Expand Up @@ -544,7 +544,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
close(getDoneCh)
}()
_, err = ruleWithQueryInTemplate.Eval(
context.TODO(), evalTime, slowQueryFunc, nil, 0,
context.TODO(), 0, evalTime, slowQueryFunc, nil, 0,
)
require.NoError(t, err)
}
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"",
true, log.NewNopLogger(),
)
_, err := rule.Eval(ctx, now, EngineQueryFunc(engine, storage), nil, 0)
_, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func TestAlertingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0)

for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
Expand Down Expand Up @@ -871,7 +871,7 @@ func TestKeepFiringFor(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)

var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
Expand All @@ -888,7 +888,7 @@ func TestKeepFiringFor(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -925,7 +925,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {

baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime)
res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)

require.Len(t, res, 2)
Expand All @@ -940,7 +940,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
}

evalTime := baseTime.Add(time.Minute)
res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
Expand Down Expand Up @@ -974,7 +974,7 @@ func TestAlertingEvalWithOrigin(t *testing.T) {
true, log.NewNopLogger(),
)

_, err = rule.Eval(ctx, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
_, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
detail = FromOriginContext(ctx)
return nil, nil
}, nil, 0)
Expand Down
24 changes: 21 additions & 3 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Group struct {
name string
file string
interval time.Duration
queryOffset *time.Duration
limit int
rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule.
Expand Down Expand Up @@ -90,6 +91,7 @@ type GroupOptions struct {
Rules []Rule
ShouldRestore bool
Opts *ManagerOptions
QueryOffset *time.Duration
done chan struct{}
EvalIterationFunc GroupEvalIterationFunc
}
Expand Down Expand Up @@ -126,6 +128,7 @@ func NewGroup(o GroupOptions) *Group {
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
Expand Down Expand Up @@ -443,6 +446,8 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
wg sync.WaitGroup
)

ruleQueryOffset := g.QueryOffset()

for i, rule := range g.rules {
select {
case <-g.done:
Expand Down Expand Up @@ -473,7 +478,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {

g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
Expand Down Expand Up @@ -562,7 +567,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
_, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
Expand Down Expand Up @@ -601,14 +606,27 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.cleanupStaleSeries(ctx, ts)
}

func (g *Group) QueryOffset() time.Duration {
if g.queryOffset != nil {
return *g.queryOffset
}

if g.opts.DefaultRuleQueryOffset != nil {
return g.opts.DefaultRuleQueryOffset()
}

return time.Duration(0)
}

func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
if len(g.staleSeries) == 0 {
return
}
app := g.opts.Appendable.Appender(ctx)
queryOffset := g.QueryOffset()
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
_, err := app.Append(0, s, timestamp.FromTime(ts.Add(-queryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
Expand Down
2 changes: 2 additions & 0 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type ManagerOptions struct {
ForGracePeriod time.Duration
ResendDelay time.Duration
GroupLoader GroupLoader
DefaultRuleQueryOffset func() time.Duration
MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
Expand Down Expand Up @@ -336,6 +337,7 @@ func (m *Manager) LoadGroups(
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
QueryOffset: (*time.Duration)(rg.QueryOffset),
done: m.done,
EvalIterationFunc: groupEvalIterationFunc,
})
Expand Down
Loading

0 comments on commit 37b408c

Please sign in to comment.