Skip to content

Commit

Permalink
Revert back to checking on time: base impl
Browse files Browse the repository at this point in the history
  • Loading branch information
gabedos committed Dec 26, 2024
1 parent 7c4d85e commit eea7b33
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 85 deletions.
22 changes: 12 additions & 10 deletions pkg/clusteragent/clusterchecks/dangling_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,29 @@
package clusterchecks

import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
)

type danglingConfigWrapper struct {
config integration.Config
rescheduleAttempts int
detectedExtendedDangling bool
config integration.Config
timeCreated time.Time
unscheduledCheck bool
}

// createDanglingConfig creates a new danglingConfigWrapper
// This is used to keep track of the lifecycle of a dangling config
func createDanglingConfig(config integration.Config) *danglingConfigWrapper {
return &danglingConfigWrapper{
config: config,
rescheduleAttempts: 0,
detectedExtendedDangling: false,
config: config,
timeCreated: time.Now(),
unscheduledCheck: false,
}
}

// isStuckScheduling returns true if the config has been attempted
// rescheduling more than attemptLimit times
func (c *danglingConfigWrapper) isStuckScheduling(attemptLimit int) bool {
return c.rescheduleAttempts > attemptLimit
// isStuckScheduling returns true if the config has been in the store
// for longer than the unscheduledCheckThresholdSeconds
func (c *danglingConfigWrapper) isStuckScheduling(unscheduledCheckThresholdSeconds int64) bool {
return time.Since(c.timeCreated).Seconds() > float64(unscheduledCheckThresholdSeconds)
}
4 changes: 2 additions & 2 deletions pkg/clusteragent/clusterchecks/dispatcher_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (d *dispatcher) deleteDangling(ids []string) {
if c, found := d.store.danglingConfigs[id]; found {
delete(d.store.danglingConfigs, id)
danglingConfigs.Dec(le.JoinLeaderValue)
if c.detectedExtendedDangling {
extendedDanglingConfigs.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source)
if c.unscheduledCheck {
unscheduledCheck.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}
Expand Down
33 changes: 29 additions & 4 deletions pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/clusteragent"
"github.com/DataDog/datadog-agent/pkg/util/containers"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/clustername"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -28,7 +29,7 @@ import (
type dispatcher struct {
store *clusterStore
nodeExpirationSeconds int64
extendedDanglingAttemptThreshold int
unscheduledCheckThresholdSeconds int64
extraTags []string
clcRunnersClient clusteragent.CLCRunnerClientInterface
advancedDispatching bool
Expand All @@ -42,7 +43,12 @@ func newDispatcher(tagger tagger.Component) *dispatcher {
store: newClusterStore(),
}
d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout")
d.extendedDanglingAttemptThreshold = pkgconfigsetup.Datadog().GetInt("cluster_checks.extended_dangling_attempt_threshold")
d.unscheduledCheckThresholdSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.unscheduled_check_threshold")

if d.unscheduledCheckThresholdSeconds < d.nodeExpirationSeconds {
log.Warnf("The unscheduled_check_threshold value should be larger than node_expiration_timeout, setting it to the same value")
d.unscheduledCheckThresholdSeconds = d.nodeExpirationSeconds
}

// Attach the cluster agent's global tags to all dispatched checks
// as defined in the tagger's workloadmeta collector
Expand Down Expand Up @@ -202,6 +208,22 @@ func (d *dispatcher) reset() {
d.store.reset()
}

// scanExtendedDanglingConfigs scans the store for extended dangling configs
// The attemptLimit is the number of times a reschedule is attempted before
// considering a config as extended dangling.
func (d *dispatcher) scanUnscheduledChecks() {
d.store.Lock()
defer d.store.Unlock()

for _, c := range d.store.danglingConfigs {
if !c.unscheduledCheck && c.isStuckScheduling(d.unscheduledCheckThresholdSeconds) {
log.Warnf("Detected unscheduled check config. Name:%s, Source:%s", c.config.Name, c.config.Source)
c.unscheduledCheck = true
unscheduledCheck.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}

// run is the main management goroutine for the dispatcher
func (d *dispatcher) run(ctx context.Context) {
d.store.Lock()
Expand All @@ -217,6 +239,9 @@ func (d *dispatcher) run(ctx context.Context) {
rebalanceTicker := time.NewTicker(d.rebalancingPeriod)
defer rebalanceTicker.Stop()

unscheduledCheckTicker := time.NewTicker(time.Duration(d.unscheduledCheckThresholdSeconds) * time.Second)
defer unscheduledCheckTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -235,9 +260,9 @@ func (d *dispatcher) run(ctx context.Context) {
d.deleteDangling(scheduledConfigIDs)
d.store.Unlock()
}

case <-unscheduledCheckTicker.C:
// Check for configs that have been dangling longer than expected
scanExtendedDanglingConfigs(d.store, d.extendedDanglingAttemptThreshold)
d.scanUnscheduledChecks()
case <-rebalanceTicker.C:
if d.advancedDispatching {
d.rebalance(false)
Expand Down
19 changes: 0 additions & 19 deletions pkg/clusteragent/clusterchecks/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

var (
Expand Down Expand Up @@ -85,20 +83,3 @@ func orderedKeys(m map[string]int) []string {
sort.Strings(keys)
return keys
}

// scanExtendedDanglingConfigs scans the store for extended dangling configs
// The attemptLimit is the number of times a reschedule is attempted before
// considering a config as extended dangling.
func scanExtendedDanglingConfigs(store *clusterStore, attemptLimit int) {
store.Lock()
defer store.Unlock()

for _, c := range store.danglingConfigs {
c.rescheduleAttempts++
if !c.detectedExtendedDangling && c.isStuckScheduling(attemptLimit) {
log.Warnf("Detected extended dangling config. Name:%s, Source:%s", c.config.Name, c.config.Source)
c.detectedExtendedDangling = true
extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source)
}
}
}
27 changes: 0 additions & 27 deletions pkg/clusteragent/clusterchecks/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ package clusterchecks
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
)

Expand Down Expand Up @@ -154,27 +151,3 @@ func Test_calculateBusyness(t *testing.T) {
})
}
}

func Test_scanExtendedDanglingConfigs(t *testing.T) {
attemptLimit := 3
store := newClusterStore()
c1 := createDanglingConfig(integration.Config{
Name: "config1",
Source: "source1",
})
store.danglingConfigs[c1.config.Digest()] = c1

for i := 0; i < attemptLimit; i++ {
scanExtendedDanglingConfigs(store, attemptLimit)
}

assert.Equal(t, attemptLimit, c1.rescheduleAttempts)
assert.False(t, c1.detectedExtendedDangling)
assert.False(t, c1.isStuckScheduling(attemptLimit))

scanExtendedDanglingConfigs(store, attemptLimit)

assert.Equal(t, attemptLimit+1, c1.rescheduleAttempts)
assert.True(t, c1.detectedExtendedDangling)
assert.True(t, c1.isStuckScheduling(attemptLimit))
}
4 changes: 2 additions & 2 deletions pkg/clusteragent/clusterchecks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var (
danglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dangling",
[]string{le.JoinLeaderLabel}, "Number of check configurations not dispatched.",
telemetry.Options{NoDoubleUnderscoreSep: true})
extendedDanglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_extended_dangling",
[]string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not dispatched, for extended number of scheduling attempts.",
unscheduledCheck = telemetry.NewGaugeWithOpts("cluster_checks", "unscheduled_check",
[]string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not scheduled.",
telemetry.Options{NoDoubleUnderscoreSep: true})
dispatchedConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dispatched",
[]string{"node", le.JoinLeaderLabel}, "Number of check configurations dispatched, by node.",
Expand Down
20 changes: 10 additions & 10 deletions pkg/clusteragent/clusterchecks/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ func (d *dispatcher) getStats() *types.Stats {
for _, m := range d.store.digestToConfig {
checkNames[m.Name] = struct{}{}
}
extendedDanglingConfigs := 0
unscheduledChecks := 0
for _, c := range d.store.danglingConfigs {
if c.detectedExtendedDangling {
extendedDanglingConfigs++
if c.unscheduledCheck {
unscheduledChecks++
}
}
return &types.Stats{
Active: d.store.active,
NodeCount: len(d.store.nodes),
ActiveConfigs: len(d.store.digestToNode),
DanglingConfigs: len(d.store.danglingConfigs),
ExtendedDanglingConfigs: extendedDanglingConfigs,
TotalConfigs: len(d.store.digestToConfig),
CheckNames: checkNames,
Active: d.store.active,
NodeCount: len(d.store.nodes),
ActiveConfigs: len(d.store.digestToNode),
DanglingConfigs: len(d.store.danglingConfigs),
UnscheduledChecks: unscheduledChecks,
TotalConfigs: len(d.store.digestToConfig),
CheckNames: checkNames,
}
}
16 changes: 8 additions & 8 deletions pkg/clusteragent/clusterchecks/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ type Stats struct {
LeaderIP string

// Leading
Leader bool
Active bool
NodeCount int
ActiveConfigs int
DanglingConfigs int
ExtendedDanglingConfigs int
TotalConfigs int
CheckNames map[string]struct{}
Leader bool
Active bool
NodeCount int
ActiveConfigs int
DanglingConfigs int
UnscheduledChecks int
TotalConfigs int
CheckNames map[string]struct{}
}

// LeaderIPCallback describes the leader-election method we
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,9 @@ func InitConfig(config pkgconfigmodel.Setup) {
// Cluster check Autodiscovery
config.BindEnvAndSetDefault("cluster_checks.support_hybrid_ignore_ad_tags", false) // TODO(CINT)(Agent 7.53+) Remove this flag when hybrid ignore_ad_tags is fully deprecated
config.BindEnvAndSetDefault("cluster_checks.enabled", false)
config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.extended_dangling_attempt_threshold", 3)
config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.unscheduled_check_threshold", 60) // value in seconds
config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name")
config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{})
config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false)
Expand Down

0 comments on commit eea7b33

Please sign in to comment.