Skip to content

Commit

Permalink
Add cortex_ruler_rule_groups_in_store metric (cortexproject#5869)
Browse files Browse the repository at this point in the history
  • Loading branch information
emanlodovice authored Aug 20, 2024
1 parent 90ad777 commit bdf677e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869

## 1.18.0 in progress

Expand Down
3 changes: 3 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func TestRulerSharding(t *testing.T) {
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
// Even with rules sharded, we expect rulers to have the same cortex_ruler_rule_groups_in_store metric values
require.NoError(t, ruler1.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
Expand Down
36 changes: 36 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,39 @@ func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
m.RulerQuerySeconds.DeleteLabelValues(userID)
}
}

type RuleGroupMetrics struct {
RuleGroupsInStore *prometheus.GaugeVec
tenants map[string]struct{}
allowedTenants *util.AllowedTenants
}

func NewRuleGroupMetrics(reg prometheus.Registerer, allowedTenants *util.AllowedTenants) *RuleGroupMetrics {
m := &RuleGroupMetrics{
RuleGroupsInStore: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ruler_rule_groups_in_store",
Help: "The number of rule groups a tenant has in store.",
}, []string{"user"}),
allowedTenants: allowedTenants,
}
return m
}

// UpdateRuleGroupsInStore updates the cortex_ruler_rule_groups_in_store metric with the provided number of rule
// groups per tenant and removing the metrics for tenants that are not present anymore
func (r *RuleGroupMetrics) UpdateRuleGroupsInStore(ruleGroupsCount map[string]int) {
tenants := make(map[string]struct{}, len(ruleGroupsCount))
for userID, count := range ruleGroupsCount {
if !r.allowedTenants.IsAllowed(userID) { // if the tenant is disabled just ignore its rule groups
continue
}
tenants[userID] = struct{}{}
r.RuleGroupsInStore.WithLabelValues(userID).Set(float64(count))
}
for userID := range r.tenants {
if _, ok := tenants[userID]; !ok {
r.RuleGroupsInStore.DeleteLabelValues(userID)
}
}
r.tenants = tenants
}
38 changes: 38 additions & 0 deletions pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,41 @@ func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}
}

func TestRuleGroupMetrics(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
m := NewRuleGroupMetrics(reg, util.NewAllowedTenants(nil, []string{"fake3"}))
m.UpdateRuleGroupsInStore(map[string]int{
"fake1": 10,
"fake2": 20,
"fake3": 30,
})
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 2, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake1",
}, float64(10))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[1], map[string]string{
"user": "fake2",
}, float64(20))
m.UpdateRuleGroupsInStore(map[string]int{
"fake2": 30,
})
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 1, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake2",
}, float64(30))
m.UpdateRuleGroupsInStore(make(map[string]int))
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Nil(t, mfm["cortex_ruler_rule_groups_in_store"])
}
15 changes: 15 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ type Ruler struct {
ruleGroupStoreLoadDuration prometheus.Gauge
ruleGroupSyncDuration prometheus.Gauge
rulerGetRulesFailures *prometheus.CounterVec
ruleGroupMetrics *RuleGroupMetrics

allowedTenants *util.AllowedTenants

Expand Down Expand Up @@ -342,6 +343,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
Help: "The total number of failed rules request sent to rulers in getShardedRules.",
}, []string{"ruler"}),
}
ruler.ruleGroupMetrics = NewRuleGroupMetrics(reg, ruler.allowedTenants)

if len(cfg.EnabledTenants) > 0 {
level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
Expand Down Expand Up @@ -667,7 +669,9 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
if err != nil {
return nil, nil, err
}
ruleGroupCounts := make(map[string]int, len(allRuleGroups))
for userID, groups := range allRuleGroups {
ruleGroupCounts[userID] = len(groups)
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
if len(disabledRuleGroupsForUser) == 0 {
continue
Expand All @@ -682,6 +686,7 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
}
allRuleGroups[userID] = filteredGroupsForUser
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return allRuleGroups, nil, nil
}

Expand All @@ -691,9 +696,11 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
return nil, nil, err
}

ruleGroupCounts := make(map[string]int, len(configs))
ownedConfigs := make(map[string]rulespb.RuleGroupList)
backedUpConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
ruleGroupCounts[userID] = len(groups)
owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(owned) > 0 {
ownedConfigs[userID] = owned
Expand All @@ -705,6 +712,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
}
}
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return ownedConfigs, backedUpConfigs, nil
}

Expand Down Expand Up @@ -732,6 +740,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

if len(userRings) == 0 {
r.ruleGroupMetrics.UpdateRuleGroupsInStore(make(map[string]int))
return nil, nil, nil
}

Expand All @@ -744,6 +753,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
mu := sync.Mutex{}
owned := map[string]rulespb.RuleGroupList{}
backedUp := map[string]rulespb.RuleGroupList{}
gLock := sync.Mutex{}
ruleGroupCounts := make(map[string]int, len(userRings))

concurrency := loadRulesConcurrency
if len(userRings) < concurrency {
Expand All @@ -758,6 +769,9 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
if err != nil {
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}
gLock.Lock()
ruleGroupCounts[userID] = len(groups)
gLock.Unlock()

filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
var filterBackup []*rulespb.RuleGroupDesc
Expand All @@ -781,6 +795,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

err = g.Wait()
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return owned, backedUp, err
}

Expand Down
19 changes: 16 additions & 3 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,23 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler {
ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))
rgs, err := store.ListAllRuleGroups(context.Background())
require.NoError(t, err)

// Ensure all rules are loaded before usage
ruler.syncRules(context.Background(), rulerSyncReasonInitial)

// Wait to ensure syncRules has finished and all rules are loaded before usage
deadline := time.Now().Add(3 * time.Second)
for {
loaded := true
for tenantId := range rgs {
if len(ruler.manager.GetRules(tenantId)) == 0 {
loaded = false
}
}
if time.Now().After(deadline) || loaded {
break
}
time.Sleep(50 * time.Millisecond)
}
return ruler
}

Expand Down

0 comments on commit bdf677e

Please sign in to comment.