Skip to content

Commit

Permalink
Convert to counting number of reschedule attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
gabedos committed Dec 24, 2024
1 parent ed19ca8 commit e7b7ccb
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 56 deletions.
20 changes: 9 additions & 11 deletions pkg/clusteragent/clusterchecks/dangling_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,27 @@
package clusterchecks

import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
)

type danglingConfigWrapper struct {
config integration.Config
time time.Time
rescheduleAttempts int
detectedExtendedDangling bool
}

// createConfigEntry creates a new integrationConfigEntry
// This is used to keep track of the time a config was added to the store
func createConfigEntry(config integration.Config) *danglingConfigWrapper {
// createDanglingConfig creates a new danglingConfigWrapper
// This is used to keep track of the lifecycle of a dangling config
func createDanglingConfig(config integration.Config) *danglingConfigWrapper {
return &danglingConfigWrapper{
config: config,
time: time.Now(),
rescheduleAttempts: 0,
detectedExtendedDangling: false,
}
}

// isStuckScheduling returns true if the config has been in the
// store for longer than expectedScheduleTime
func (e *danglingConfigWrapper) isStuckScheduling(expectedScheduleTime time.Duration) bool {
return time.Since(e.time) > expectedScheduleTime
// isStuckScheduling returns true if the config has been attempted
// rescheduling more than attemptLimit times
func (c *danglingConfigWrapper) isStuckScheduling(attemptLimit int) bool {
return c.rescheduleAttempts > attemptLimit
}
6 changes: 3 additions & 3 deletions pkg/clusteragent/clusterchecks/dispatcher_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) {

response := types.StateResponse{
Warmup: !d.store.active,
Dangling: makeConfigArrayFromEntry(d.store.danglingConfigs),
Dangling: makeConfigArrayFromDangling(d.store.danglingConfigs),
}
for _, node := range d.store.nodes {
n := types.StateNodeResponse{
Expand Down Expand Up @@ -62,7 +62,7 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string)
// Only update if it's a new dangling config
if _, found := d.store.danglingConfigs[digest]; !found {
danglingConfigs.Inc(le.JoinLeaderValue)
d.store.danglingConfigs[digest] = createConfigEntry(config)
d.store.danglingConfigs[digest] = createDanglingConfig(config)
}
return false
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (d *dispatcher) retrieveDangling() []integration.Config {
d.store.RLock()
defer d.store.RUnlock()

configs := makeConfigArrayFromEntry(d.store.danglingConfigs)
configs := makeConfigArrayFromDangling(d.store.danglingConfigs)
return configs
}

Expand Down
33 changes: 12 additions & 21 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ import (

// dispatcher holds the management logic for cluster-checks
type dispatcher struct {
store *clusterStore
nodeExpirationSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
excludedChecks map[string]struct{}
excludedChecksFromDispatching map[string]struct{}
rebalancingPeriod time.Duration
store *clusterStore
nodeExpirationSeconds int64
extendedDanglingAttemptThreshold int
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
excludedChecks map[string]struct{}
excludedChecksFromDispatching map[string]struct{}
rebalancingPeriod time.Duration
}

func newDispatcher(tagger tagger.Component) *dispatcher {
d := &dispatcher{
store: newClusterStore(),
}
d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout")
d.extendedDanglingAttemptThreshold = pkgconfigsetup.Datadog().GetInt("cluster_checks.extended_dangling_attempt_threshold")

// Attach the cluster agent's global tags to all dispatched checks
// as defined in the tagger's workloadmeta collector
Expand Down Expand Up @@ -209,8 +211,7 @@ func (d *dispatcher) run(ctx context.Context) {
healthProbe := health.RegisterLiveness("clusterchecks-dispatch")
defer health.Deregister(healthProbe) //nolint:errcheck

cleanUpTimeout := time.Duration(d.nodeExpirationSeconds/2) * time.Second
cleanupTicker := time.NewTicker(cleanUpTimeout)
cleanupTicker := time.NewTicker(time.Duration(d.nodeExpirationSeconds/2) * time.Second)
defer cleanupTicker.Stop()

rebalanceTicker := time.NewTicker(d.rebalancingPeriod)
Expand All @@ -226,27 +227,17 @@ func (d *dispatcher) run(ctx context.Context) {
// Expire old nodes, orphaned configs are moved to dangling
d.expireNodes()

log.Error("Clean up ticker signal received")

// Re-dispatch dangling configs
if d.shouldDispatchDangling() {
danglingConfigs := d.retrieveDangling()

log.Errorf("Dangling configs to be dispatched: %d", len(danglingConfigs))

scheduledConfigIDs := d.reschedule(danglingConfigs)

log.Errorf("Dangling configs successfully rescheduled: %d", len(scheduledConfigIDs))

d.store.Lock()
d.deleteDangling(scheduledConfigIDs)
d.store.Unlock()
}

// pkg/clusteragent/clusterchecks/dispatcher_main.go:239

// Check for configs that have been dangling longer than expected
scanExtendedDanglingConfigs(d.store, cleanUpTimeout*2)
scanExtendedDanglingConfigs(d.store, d.extendedDanglingAttemptThreshold)
case <-rebalanceTicker.C:
if d.advancedDispatching {
d.rebalance(false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() {
for digest, config := range node.digestToConfig {
delete(d.store.digestToNode, digest)
log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest)
d.store.danglingConfigs[digest] = createConfigEntry(config)
d.store.danglingConfigs[digest] = createDanglingConfig(config)
danglingConfigs.Inc(le.JoinLeaderValue)

// TODO: Use partial label matching when it becomes available:
Expand Down
23 changes: 21 additions & 2 deletions pkg/clusteragent/clusterchecks/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

var (
Expand All @@ -33,10 +35,10 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf
return configSlice
}

// makeConfigArrayFromEntry flattens a map of configs into a slice. Creating a new slice
// makeConfigArrayFromDangling flattens a map of configs into a slice. Creating a new slice
// allows for thread-safe usage by other external, as long as the field values in
// the config objects are not modified.
func makeConfigArrayFromEntry(configMap map[string]*danglingConfigWrapper) []integration.Config {
func makeConfigArrayFromDangling(configMap map[string]*danglingConfigWrapper) []integration.Config {
configSlice := make([]integration.Config, 0, len(configMap))
for _, c := range configMap {
configSlice = append(configSlice, c.config)
Expand Down Expand Up @@ -83,3 +85,20 @@ func orderedKeys(m map[string]int) []string {
sort.Strings(keys)
return keys
}

// scanExtendedDanglingConfigs scans the store for extended dangling configs
// The attemptLimit is the number of times a reschedule is attempted before
// considering a config as extended dangling.
func scanExtendedDanglingConfigs(store *clusterStore, attemptLimit int) {
store.Lock()
defer store.Unlock()

for _, c := range store.danglingConfigs {
c.rescheduleAttempts += 1
if !c.detectedExtendedDangling && c.isStuckScheduling(attemptLimit) {
log.Warnf("Detected extended dangling config. Name:%s, Source:%s", c.config.Name, c.config.Source)
c.detectedExtendedDangling = true
extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}
27 changes: 27 additions & 0 deletions pkg/clusteragent/clusterchecks/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ package clusterchecks
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
)

Expand Down Expand Up @@ -151,3 +154,27 @@ func Test_calculateBusyness(t *testing.T) {
})
}
}

func Test_scanExtendedDanglingConfigs(t *testing.T) {
attemptLimit := 3
store := newClusterStore()
c1 := createDanglingConfig(integration.Config{
Name: "config1",
Source: "source1",
})
store.danglingConfigs[c1.config.Digest()] = c1

for i := 0; i < attemptLimit; i++ {
scanExtendedDanglingConfigs(store, attemptLimit)
}

assert.Equal(t, attemptLimit, c1.rescheduleAttempts)
assert.False(t, c1.detectedExtendedDangling)
assert.False(t, c1.isStuckScheduling(attemptLimit))

scanExtendedDanglingConfigs(store, attemptLimit)

assert.Equal(t, attemptLimit+1, c1.rescheduleAttempts)
assert.True(t, c1.detectedExtendedDangling)
assert.True(t, c1.isStuckScheduling(attemptLimit))
}
18 changes: 0 additions & 18 deletions pkg/clusteragent/clusterchecks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
package clusterchecks

import (
"time"

"github.com/DataDog/datadog-agent/pkg/telemetry"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

var (
Expand Down Expand Up @@ -56,18 +53,3 @@ var (
[]string{"node", le.JoinLeaderLabel}, "Utilization predicted by the rebalance algorithm",
telemetry.Options{NoDoubleUnderscoreSep: true})
)

func scanExtendedDanglingConfigs(store *clusterStore, expectedScheduleTime time.Duration) {
store.Lock()
defer store.Unlock()

for _, c := range store.danglingConfigs {
if !c.detectedExtendedDangling && c.isStuckScheduling(expectedScheduleTime) {
log.Errorf("Stuck scheduling")
extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source)
c.detectedExtendedDangling = true
} else {
log.Errorf("Not stuck scheduling")
}
}
}
1 change: 1 addition & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func InitConfig(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("cluster_checks.enabled", false)
config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.extended_dangling_attempt_threshold", 3)
config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name")
config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{})
config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false)
Expand Down

0 comments on commit e7b7ccb

Please sign in to comment.