Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONTP-60] Improved telemetry on cluster check configs dangling #32508

Merged
merged 10 commits into from
Dec 30, 2024
37 changes: 37 additions & 0 deletions pkg/clusteragent/clusterchecks/dangling_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build clusterchecks

package clusterchecks

import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
)

type danglingConfigWrapper struct {
config integration.Config
timeCreated time.Time
unscheduledCheck bool
}

// createDanglingConfig creates a new danglingConfigWrapper
// This is used to keep track of the lifecycle of a dangling config
func createDanglingConfig(config integration.Config) *danglingConfigWrapper {
gabedos marked this conversation as resolved.
Show resolved Hide resolved
return &danglingConfigWrapper{
config: config,
timeCreated: time.Now(),
unscheduledCheck: false,
}
}

// isStuckScheduling returns true if the config has been in the store
// for longer than the unscheduledCheckThresholdSeconds
func (c *danglingConfigWrapper) isStuckScheduling(unscheduledCheckThresholdSeconds int64) bool {
expectCheckIsScheduledTime := c.timeCreated.Add(time.Duration(unscheduledCheckThresholdSeconds) * time.Second)
return time.Now().After(expectCheckIsScheduledTime)
}
43 changes: 30 additions & 13 deletions pkg/clusteragent/clusterchecks/dispatcher_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) {

response := types.StateResponse{
Warmup: !d.store.active,
Dangling: makeConfigArray(d.store.danglingConfigs),
Dangling: makeConfigArrayFromDangling(d.store.danglingConfigs),
}
for _, node := range d.store.nodes {
n := types.StateNodeResponse{
Expand All @@ -41,7 +41,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) {
return response, nil
}

func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) {
func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) bool {
d.store.Lock()
defer d.store.Unlock()

Expand All @@ -59,9 +59,12 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)

// No target node specified: store in danglingConfigs
if targetNodeName == "" {
danglingConfigs.Inc(le.JoinLeaderValue)
d.store.danglingConfigs[digest] = config
return
// Only update if it's a new dangling config
if _, found := d.store.danglingConfigs[digest]; !found {
danglingConfigs.Inc(le.JoinLeaderValue)
d.store.danglingConfigs[digest] = createDanglingConfig(config)
}
return false
}

currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest])
Expand All @@ -82,6 +85,8 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)
currentNode.removeConfig(digest)
currentNode.Unlock()
}

return true
}

func (d *dispatcher) removeConfig(digest string) {
Expand All @@ -94,7 +99,7 @@ func (d *dispatcher) removeConfig(digest string) {

delete(d.store.digestToNode, digest)
delete(d.store.digestToConfig, digest)
delete(d.store.danglingConfigs, digest)
d.deleteDangling([]string{digest})

// This is a list because each instance in a config has its own check ID and
// all of them need to be deleted.
Expand Down Expand Up @@ -131,16 +136,28 @@ func (d *dispatcher) shouldDispatchDangling() bool {
return len(d.store.danglingConfigs) > 0 && len(d.store.nodes) > 0
}

// retrieveAndClearDangling extracts dangling configs from the store
func (d *dispatcher) retrieveAndClearDangling() []integration.Config {
d.store.Lock()
defer d.store.Unlock()
configs := makeConfigArray(d.store.danglingConfigs)
d.store.clearDangling()
danglingConfigs.Set(0, le.JoinLeaderValue)
// retrieveDangling extracts dangling configs from the store
func (d *dispatcher) retrieveDangling() []integration.Config {
d.store.RLock()
defer d.store.RUnlock()

configs := makeConfigArrayFromDangling(d.store.danglingConfigs)
return configs
}

// deleteDangling clears the dangling configs from the store
func (d *dispatcher) deleteDangling(ids []string) {
for _, id := range ids {
if c, found := d.store.danglingConfigs[id]; found {
delete(d.store.danglingConfigs, id)
danglingConfigs.Dec(le.JoinLeaderValue)
if c.unscheduledCheck {
unscheduledCheck.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}
}

// patchConfiguration transforms the configuration from AD into a config
// ready to use by node agents. It does the following changes:
// - empty the ADIdentifiers array, to avoid node-agents detecting them as templates
Expand Down
65 changes: 51 additions & 14 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,35 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/clusteragent"
"github.com/DataDog/datadog-agent/pkg/util/containers"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/clustername"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// dispatcher holds the management logic for cluster-checks
type dispatcher struct {
store *clusterStore
nodeExpirationSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
excludedChecks map[string]struct{}
excludedChecksFromDispatching map[string]struct{}
rebalancingPeriod time.Duration
store *clusterStore
nodeExpirationSeconds int64
unscheduledCheckThresholdSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
excludedChecks map[string]struct{}
excludedChecksFromDispatching map[string]struct{}
rebalancingPeriod time.Duration
}

func newDispatcher(tagger tagger.Component) *dispatcher {
d := &dispatcher{
store: newClusterStore(),
}
d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout")
d.unscheduledCheckThresholdSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.unscheduled_check_threshold")

if d.unscheduledCheckThresholdSeconds < d.nodeExpirationSeconds {
log.Warnf("The unscheduled_check_threshold value should be larger than node_expiration_timeout, setting it to the same value")
d.unscheduledCheckThresholdSeconds = d.nodeExpirationSeconds
}

// Attach the cluster agent's global tags to all dispatched checks
// as defined in the tagger's workloadmeta collector
Expand Down Expand Up @@ -162,15 +170,19 @@ func (d *dispatcher) Unschedule(configs []integration.Config) {
}

// reschdule sends configurations to dispatching without checking or patching them as Schedule does.
func (d *dispatcher) reschedule(configs []integration.Config) {
func (d *dispatcher) reschedule(configs []integration.Config) []string {
addedConfigIDs := make([]string, 0, len(configs))
for _, c := range configs {
log.Debugf("Rescheduling the check %s:%s", c.Name, c.Digest())
d.add(c)
if d.add(c) {
addedConfigIDs = append(addedConfigIDs, c.Digest())
}
}
return addedConfigIDs
}

// add stores and delegates a given configuration
func (d *dispatcher) add(config integration.Config) {
func (d *dispatcher) add(config integration.Config) bool {
target := d.getNodeToScheduleCheck()
if target == "" {
// If no node is found, store it in the danglingConfigs map for retrying later.
Expand All @@ -179,7 +191,7 @@ func (d *dispatcher) add(config integration.Config) {
log.Infof("Dispatching configuration %s:%s to node %s", config.Name, config.Digest(), target)
}

d.addConfig(config, target)
return d.addConfig(config, target)
}

// remove deletes a given configuration
Expand All @@ -196,6 +208,22 @@ func (d *dispatcher) reset() {
d.store.reset()
}

// scanExtendedDanglingConfigs scans the store for extended dangling configs
// The attemptLimit is the number of times a reschedule is attempted before
gabedos marked this conversation as resolved.
Show resolved Hide resolved
// considering a config as extended dangling.
func (d *dispatcher) scanUnscheduledChecks() {
d.store.Lock()
defer d.store.Unlock()

for _, c := range d.store.danglingConfigs {
if !c.unscheduledCheck && c.isStuckScheduling(d.unscheduledCheckThresholdSeconds) {
log.Warnf("Detected unscheduled check config. Name:%s, Source:%s", c.config.Name, c.config.Source)
c.unscheduledCheck = true
unscheduledCheck.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}

// run is the main management goroutine for the dispatcher
func (d *dispatcher) run(ctx context.Context) {
d.store.Lock()
Expand All @@ -211,6 +239,9 @@ func (d *dispatcher) run(ctx context.Context) {
rebalanceTicker := time.NewTicker(d.rebalancingPeriod)
defer rebalanceTicker.Stop()

unscheduledCheckTicker := time.NewTicker(time.Duration(d.unscheduledCheckThresholdSeconds) * time.Second)
defer unscheduledCheckTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -223,9 +254,15 @@ func (d *dispatcher) run(ctx context.Context) {

// Re-dispatch dangling configs
if d.shouldDispatchDangling() {
danglingConfs := d.retrieveAndClearDangling()
d.reschedule(danglingConfs)
danglingConfigs := d.retrieveDangling()
scheduledConfigIDs := d.reschedule(danglingConfigs)
d.store.Lock()
d.deleteDangling(scheduledConfigIDs)
d.store.Unlock()
}
case <-unscheduledCheckTicker.C:
// Check for configs that have been dangling longer than expected
d.scanUnscheduledChecks()
case <-rebalanceTicker.C:
if d.advancedDispatching {
d.rebalance(false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() {
for digest, config := range node.digestToConfig {
delete(d.store.digestToNode, digest)
log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest)
d.store.danglingConfigs[digest] = config
d.store.danglingConfigs[digest] = createDanglingConfig(config)
danglingConfigs.Inc(le.JoinLeaderValue)

// TODO: Use partial label matching when it becomes available:
Expand Down
8 changes: 5 additions & 3 deletions pkg/clusteragent/clusterchecks/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,10 @@ func TestRescheduleDanglingFromExpiredNodes(t *testing.T) {

// Ensure we have 1 dangling to schedule, as new available node is registered
assert.True(t, dispatcher.shouldDispatchDangling())
configs := dispatcher.retrieveAndClearDangling()
configs := dispatcher.retrieveDangling()
// Assert the check is scheduled
dispatcher.reschedule(configs)
scheduledIDs := dispatcher.reschedule(configs)
dispatcher.deleteDangling(scheduledIDs)
danglingConfig, err := dispatcher.getAllConfigs()
assert.NoError(t, err)
assert.Equal(t, 1, len(danglingConfig))
Expand Down Expand Up @@ -423,7 +424,8 @@ func TestDanglingConfig(t *testing.T) {
assert.True(t, dispatcher.shouldDispatchDangling())

// get the danglings and make sure they are removed from the store
configs := dispatcher.retrieveAndClearDangling()
configs := dispatcher.retrieveDangling()
dispatcher.deleteDangling([]string{config.Digest()})
assert.Len(t, configs, 1)
assert.Equal(t, 0, len(dispatcher.store.danglingConfigs))
gabedos marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/clusteragent/clusterchecks/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf
return configSlice
}

// makeConfigArrayFromDangling flattens a map of configs into a slice. Creating a new slice
// allows for thread-safe usage by other external, as long as the field values in
// the config objects are not modified.
func makeConfigArrayFromDangling(configMap map[string]*danglingConfigWrapper) []integration.Config {
configSlice := make([]integration.Config, 0, len(configMap))
for _, c := range configMap {
configSlice = append(configSlice, c.config)
}
return configSlice
}

func timestampNowNano() int64 {
return time.Now().UnixNano()
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/clusteragent/clusterchecks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (
danglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dangling",
[]string{le.JoinLeaderLabel}, "Number of check configurations not dispatched.",
telemetry.Options{NoDoubleUnderscoreSep: true})
unscheduledCheck = telemetry.NewGaugeWithOpts("cluster_checks", "unscheduled_check",
[]string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not scheduled.",
telemetry.Options{NoDoubleUnderscoreSep: true})
dispatchedConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dispatched",
[]string{"node", le.JoinLeaderLabel}, "Number of check configurations dispatched, by node.",
telemetry.Options{NoDoubleUnderscoreSep: true})
Expand Down
19 changes: 13 additions & 6 deletions pkg/clusteragent/clusterchecks/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,19 @@ func (d *dispatcher) getStats() *types.Stats {
for _, m := range d.store.digestToConfig {
checkNames[m.Name] = struct{}{}
}
unscheduledChecks := 0
for _, c := range d.store.danglingConfigs {
if c.unscheduledCheck {
unscheduledChecks++
}
}
return &types.Stats{
Active: d.store.active,
NodeCount: len(d.store.nodes),
ActiveConfigs: len(d.store.digestToNode),
DanglingConfigs: len(d.store.danglingConfigs),
TotalConfigs: len(d.store.digestToConfig),
CheckNames: checkNames,
Active: d.store.active,
NodeCount: len(d.store.nodes),
ActiveConfigs: len(d.store.digestToNode),
DanglingConfigs: len(d.store.danglingConfigs),
UnscheduledChecks: unscheduledChecks,
TotalConfigs: len(d.store.digestToConfig),
CheckNames: checkNames,
}
}
6 changes: 3 additions & 3 deletions pkg/clusteragent/clusterchecks/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type clusterStore struct {
digestToConfig map[string]integration.Config // All configurations to dispatch
digestToNode map[string]string // Node running a config
nodes map[string]*nodeStore // All nodes known to the cluster-agent
danglingConfigs map[string]integration.Config // Configs we could not dispatch to any node
danglingConfigs map[string]*danglingConfigWrapper // Configs we could not dispatch to any node
endpointsConfigs map[string]map[string]integration.Config // Endpoints configs to be consumed by node agents
idToDigest map[checkid.ID]string // link check IDs to check configs
}
Expand All @@ -44,7 +44,7 @@ func (s *clusterStore) reset() {
s.digestToConfig = make(map[string]integration.Config)
s.digestToNode = make(map[string]string)
s.nodes = make(map[string]*nodeStore)
s.danglingConfigs = make(map[string]integration.Config)
s.danglingConfigs = make(map[string]*danglingConfigWrapper)
s.endpointsConfigs = make(map[string]map[string]integration.Config)
s.idToDigest = make(map[checkid.ID]string)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (s *clusterStore) getOrCreateNodeStore(nodeName, clientIP string) *nodeStor

// clearDangling resets the danglingConfigs map to a new empty one
func (s *clusterStore) clearDangling() {
s.danglingConfigs = make(map[string]integration.Config)
s.danglingConfigs = make(map[string]*danglingConfigWrapper)
}

// nodeStore holds the state store for one node.
Expand Down
15 changes: 8 additions & 7 deletions pkg/clusteragent/clusterchecks/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ type Stats struct {
LeaderIP string

// Leading
Leader bool
Active bool
NodeCount int
ActiveConfigs int
DanglingConfigs int
TotalConfigs int
CheckNames map[string]struct{}
Leader bool
Active bool
NodeCount int
ActiveConfigs int
DanglingConfigs int
UnscheduledChecks int
TotalConfigs int
CheckNames map[string]struct{}
}

// LeaderIPCallback describes the leader-election method we
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ func InitConfig(config pkgconfigmodel.Setup) {
// Cluster check Autodiscovery
config.BindEnvAndSetDefault("cluster_checks.support_hybrid_ignore_ad_tags", false) // TODO(CINT)(Agent 7.53+) Remove this flag when hybrid ignore_ad_tags is fully deprecated
config.BindEnvAndSetDefault("cluster_checks.enabled", false)
config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.unscheduled_check_threshold", 60) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name")
config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{})
config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false)
Expand Down
Loading