Skip to content

Commit

Permalink
[CONTP-60] Improved telemetry on cluster check configs dangling (#32508)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabedos authored Dec 30, 2024
1 parent 9f2238e commit dc3b8fb
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 49 deletions.
37 changes: 37 additions & 0 deletions pkg/clusteragent/clusterchecks/dangling_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build clusterchecks

package clusterchecks

import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
)

type danglingConfigWrapper struct {
config integration.Config
timeCreated time.Time
unscheduledCheck bool
}

// createDanglingConfig creates a new danglingConfigWrapper
// This is used to keep track of the lifecycle of a dangling config
func createDanglingConfig(config integration.Config) *danglingConfigWrapper {
return &danglingConfigWrapper{
config: config,
timeCreated: time.Now(),
unscheduledCheck: false,
}
}

// isStuckScheduling returns true if the config has been in the store
// for longer than the unscheduledCheckThresholdSeconds
func (c *danglingConfigWrapper) isStuckScheduling(unscheduledCheckThresholdSeconds int64) bool {
expectCheckIsScheduledTime := c.timeCreated.Add(time.Duration(unscheduledCheckThresholdSeconds) * time.Second)
return time.Now().After(expectCheckIsScheduledTime)
}
43 changes: 30 additions & 13 deletions pkg/clusteragent/clusterchecks/dispatcher_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) {

response := types.StateResponse{
Warmup: !d.store.active,
Dangling: makeConfigArray(d.store.danglingConfigs),
Dangling: makeConfigArrayFromDangling(d.store.danglingConfigs),
}
for _, node := range d.store.nodes {
n := types.StateNodeResponse{
Expand All @@ -41,7 +41,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) {
return response, nil
}

func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) {
func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) bool {
d.store.Lock()
defer d.store.Unlock()

Expand All @@ -59,9 +59,12 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)

// No target node specified: store in danglingConfigs
if targetNodeName == "" {
danglingConfigs.Inc(le.JoinLeaderValue)
d.store.danglingConfigs[digest] = config
return
// Only update if it's a new dangling config
if _, found := d.store.danglingConfigs[digest]; !found {
danglingConfigs.Inc(le.JoinLeaderValue)
d.store.danglingConfigs[digest] = createDanglingConfig(config)
}
return false
}

currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest])
Expand All @@ -82,6 +85,8 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)
currentNode.removeConfig(digest)
currentNode.Unlock()
}

return true
}

func (d *dispatcher) removeConfig(digest string) {
Expand All @@ -94,7 +99,7 @@ func (d *dispatcher) removeConfig(digest string) {

delete(d.store.digestToNode, digest)
delete(d.store.digestToConfig, digest)
delete(d.store.danglingConfigs, digest)
d.deleteDangling([]string{digest})

// This is a list because each instance in a config has its own check ID and
// all of them need to be deleted.
Expand Down Expand Up @@ -131,16 +136,28 @@ func (d *dispatcher) shouldDispatchDangling() bool {
return len(d.store.danglingConfigs) > 0 && len(d.store.nodes) > 0
}

// retrieveAndClearDangling extracts dangling configs from the store
func (d *dispatcher) retrieveAndClearDangling() []integration.Config {
d.store.Lock()
defer d.store.Unlock()
configs := makeConfigArray(d.store.danglingConfigs)
d.store.clearDangling()
danglingConfigs.Set(0, le.JoinLeaderValue)
// retrieveDangling extracts dangling configs from the store
func (d *dispatcher) retrieveDangling() []integration.Config {
d.store.RLock()
defer d.store.RUnlock()

configs := makeConfigArrayFromDangling(d.store.danglingConfigs)
return configs
}

// deleteDangling clears the dangling configs from the store
func (d *dispatcher) deleteDangling(ids []string) {
for _, id := range ids {
if c, found := d.store.danglingConfigs[id]; found {
delete(d.store.danglingConfigs, id)
danglingConfigs.Dec(le.JoinLeaderValue)
if c.unscheduledCheck {
unscheduledCheck.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}
}

// patchConfiguration transforms the configuration from AD into a config
// ready to use by node agents. It does the following changes:
// - empty the ADIdentifiers array, to avoid node-agents detecting them as templates
Expand Down
64 changes: 50 additions & 14 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,35 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/clusteragent"
"github.com/DataDog/datadog-agent/pkg/util/containers"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/clustername"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// dispatcher holds the management logic for cluster-checks
type dispatcher struct {
store *clusterStore
nodeExpirationSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
excludedChecks map[string]struct{}
excludedChecksFromDispatching map[string]struct{}
rebalancingPeriod time.Duration
store *clusterStore
nodeExpirationSeconds int64
unscheduledCheckThresholdSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
excludedChecks map[string]struct{}
excludedChecksFromDispatching map[string]struct{}
rebalancingPeriod time.Duration
}

func newDispatcher(tagger tagger.Component) *dispatcher {
d := &dispatcher{
store: newClusterStore(),
}
d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout")
d.unscheduledCheckThresholdSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.unscheduled_check_threshold")

if d.unscheduledCheckThresholdSeconds < d.nodeExpirationSeconds {
log.Warnf("The unscheduled_check_threshold value should be larger than node_expiration_timeout, setting it to the same value")
d.unscheduledCheckThresholdSeconds = d.nodeExpirationSeconds
}

// Attach the cluster agent's global tags to all dispatched checks
// as defined in the tagger's workloadmeta collector
Expand Down Expand Up @@ -162,15 +170,19 @@ func (d *dispatcher) Unschedule(configs []integration.Config) {
}

// reschdule sends configurations to dispatching without checking or patching them as Schedule does.
func (d *dispatcher) reschedule(configs []integration.Config) {
func (d *dispatcher) reschedule(configs []integration.Config) []string {
addedConfigIDs := make([]string, 0, len(configs))
for _, c := range configs {
log.Debugf("Rescheduling the check %s:%s", c.Name, c.Digest())
d.add(c)
if d.add(c) {
addedConfigIDs = append(addedConfigIDs, c.Digest())
}
}
return addedConfigIDs
}

// add stores and delegates a given configuration
func (d *dispatcher) add(config integration.Config) {
func (d *dispatcher) add(config integration.Config) bool {
target := d.getNodeToScheduleCheck()
if target == "" {
// If no node is found, store it in the danglingConfigs map for retrying later.
Expand All @@ -179,7 +191,7 @@ func (d *dispatcher) add(config integration.Config) {
log.Infof("Dispatching configuration %s:%s to node %s", config.Name, config.Digest(), target)
}

d.addConfig(config, target)
return d.addConfig(config, target)
}

// remove deletes a given configuration
Expand All @@ -196,6 +208,21 @@ func (d *dispatcher) reset() {
d.store.reset()
}

// scanUnscheduledChecks scans the store for configs that have been
// unscheduled for longer than the unscheduledCheckThresholdSeconds
func (d *dispatcher) scanUnscheduledChecks() {
d.store.Lock()
defer d.store.Unlock()

for _, c := range d.store.danglingConfigs {
if !c.unscheduledCheck && c.isStuckScheduling(d.unscheduledCheckThresholdSeconds) {
log.Warnf("Detected unscheduled check config. Name:%s, Source:%s", c.config.Name, c.config.Source)
c.unscheduledCheck = true
unscheduledCheck.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}

// run is the main management goroutine for the dispatcher
func (d *dispatcher) run(ctx context.Context) {
d.store.Lock()
Expand All @@ -211,6 +238,9 @@ func (d *dispatcher) run(ctx context.Context) {
rebalanceTicker := time.NewTicker(d.rebalancingPeriod)
defer rebalanceTicker.Stop()

unscheduledCheckTicker := time.NewTicker(time.Duration(d.unscheduledCheckThresholdSeconds) * time.Second)
defer unscheduledCheckTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -223,9 +253,15 @@ func (d *dispatcher) run(ctx context.Context) {

// Re-dispatch dangling configs
if d.shouldDispatchDangling() {
danglingConfs := d.retrieveAndClearDangling()
d.reschedule(danglingConfs)
danglingConfigs := d.retrieveDangling()
scheduledConfigIDs := d.reschedule(danglingConfigs)
d.store.Lock()
d.deleteDangling(scheduledConfigIDs)
d.store.Unlock()
}
case <-unscheduledCheckTicker.C:
// Check for configs that have been dangling longer than expected
d.scanUnscheduledChecks()
case <-rebalanceTicker.C:
if d.advancedDispatching {
d.rebalance(false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() {
for digest, config := range node.digestToConfig {
delete(d.store.digestToNode, digest)
log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest)
d.store.danglingConfigs[digest] = config
d.store.danglingConfigs[digest] = createDanglingConfig(config)
danglingConfigs.Inc(le.JoinLeaderValue)

// TODO: Use partial label matching when it becomes available:
Expand Down
19 changes: 16 additions & 3 deletions pkg/clusteragent/clusterchecks/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package clusterchecks
import (
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -348,9 +349,10 @@ func TestRescheduleDanglingFromExpiredNodes(t *testing.T) {

// Ensure we have 1 dangling to schedule, as new available node is registered
assert.True(t, dispatcher.shouldDispatchDangling())
configs := dispatcher.retrieveAndClearDangling()
configs := dispatcher.retrieveDangling()
// Assert the check is scheduled
dispatcher.reschedule(configs)
scheduledIDs := dispatcher.reschedule(configs)
dispatcher.deleteDangling(scheduledIDs)
danglingConfig, err := dispatcher.getAllConfigs()
assert.NoError(t, err)
assert.Equal(t, 1, len(danglingConfig))
Expand Down Expand Up @@ -401,6 +403,9 @@ func TestDispatchFourConfigsTwoNodes(t *testing.T) {
}

func TestDanglingConfig(t *testing.T) {
mockConfig := configmock.New(t)
mockConfig.SetWithoutSource("cluster_checks.unscheduled_check_threshold", 1)
mockConfig.SetWithoutSource("cluster_checks.node_expiration_timeout", 1)
fakeTagger := mock.SetupFakeTagger(t)
dispatcher := newDispatcher(fakeTagger)
config := integration.Config{
Expand All @@ -418,12 +423,20 @@ func TestDanglingConfig(t *testing.T) {
// shouldDispatchDangling is still false because no node is available
assert.False(t, dispatcher.shouldDispatchDangling())

// force config to dangle long enough to be classified as unscheduled check
assert.False(t, dispatcher.store.danglingConfigs[config.Digest()].unscheduledCheck)
require.Eventually(t, func() bool {
dispatcher.scanUnscheduledChecks()
return dispatcher.store.danglingConfigs[config.Digest()].unscheduledCheck
}, 2*time.Second, 250*time.Millisecond)

// register a node, shouldDispatchDangling will become true
dispatcher.processNodeStatus("nodeA", "10.0.0.1", types.NodeStatus{})
assert.True(t, dispatcher.shouldDispatchDangling())

// get the danglings and make sure they are removed from the store
configs := dispatcher.retrieveAndClearDangling()
configs := dispatcher.retrieveDangling()
dispatcher.deleteDangling([]string{config.Digest()})
assert.Len(t, configs, 1)
assert.Equal(t, 0, len(dispatcher.store.danglingConfigs))
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/clusteragent/clusterchecks/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf
return configSlice
}

// makeConfigArrayFromDangling flattens a map of configs into a slice. Creating a new slice
// allows for thread-safe usage by other external, as long as the field values in
// the config objects are not modified.
func makeConfigArrayFromDangling(configMap map[string]*danglingConfigWrapper) []integration.Config {
configSlice := make([]integration.Config, 0, len(configMap))
for _, c := range configMap {
configSlice = append(configSlice, c.config)
}
return configSlice
}

func timestampNowNano() int64 {
return time.Now().UnixNano()
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/clusteragent/clusterchecks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (
danglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dangling",
[]string{le.JoinLeaderLabel}, "Number of check configurations not dispatched.",
telemetry.Options{NoDoubleUnderscoreSep: true})
unscheduledCheck = telemetry.NewGaugeWithOpts("cluster_checks", "unscheduled_check",
[]string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not scheduled.",
telemetry.Options{NoDoubleUnderscoreSep: true})
dispatchedConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dispatched",
[]string{"node", le.JoinLeaderLabel}, "Number of check configurations dispatched, by node.",
telemetry.Options{NoDoubleUnderscoreSep: true})
Expand Down
19 changes: 13 additions & 6 deletions pkg/clusteragent/clusterchecks/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,19 @@ func (d *dispatcher) getStats() *types.Stats {
for _, m := range d.store.digestToConfig {
checkNames[m.Name] = struct{}{}
}
unscheduledChecks := 0
for _, c := range d.store.danglingConfigs {
if c.unscheduledCheck {
unscheduledChecks++
}
}
return &types.Stats{
Active: d.store.active,
NodeCount: len(d.store.nodes),
ActiveConfigs: len(d.store.digestToNode),
DanglingConfigs: len(d.store.danglingConfigs),
TotalConfigs: len(d.store.digestToConfig),
CheckNames: checkNames,
Active: d.store.active,
NodeCount: len(d.store.nodes),
ActiveConfigs: len(d.store.digestToNode),
DanglingConfigs: len(d.store.danglingConfigs),
UnscheduledChecks: unscheduledChecks,
TotalConfigs: len(d.store.digestToConfig),
CheckNames: checkNames,
}
}
6 changes: 3 additions & 3 deletions pkg/clusteragent/clusterchecks/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type clusterStore struct {
digestToConfig map[string]integration.Config // All configurations to dispatch
digestToNode map[string]string // Node running a config
nodes map[string]*nodeStore // All nodes known to the cluster-agent
danglingConfigs map[string]integration.Config // Configs we could not dispatch to any node
danglingConfigs map[string]*danglingConfigWrapper // Configs we could not dispatch to any node
endpointsConfigs map[string]map[string]integration.Config // Endpoints configs to be consumed by node agents
idToDigest map[checkid.ID]string // link check IDs to check configs
}
Expand All @@ -44,7 +44,7 @@ func (s *clusterStore) reset() {
s.digestToConfig = make(map[string]integration.Config)
s.digestToNode = make(map[string]string)
s.nodes = make(map[string]*nodeStore)
s.danglingConfigs = make(map[string]integration.Config)
s.danglingConfigs = make(map[string]*danglingConfigWrapper)
s.endpointsConfigs = make(map[string]map[string]integration.Config)
s.idToDigest = make(map[checkid.ID]string)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (s *clusterStore) getOrCreateNodeStore(nodeName, clientIP string) *nodeStor

// clearDangling resets the danglingConfigs map to a new empty one
func (s *clusterStore) clearDangling() {
s.danglingConfigs = make(map[string]integration.Config)
s.danglingConfigs = make(map[string]*danglingConfigWrapper)
}

// nodeStore holds the state store for one node.
Expand Down
Loading

0 comments on commit dc3b8fb

Please sign in to comment.