From 1b1fbc8c82b43be07b66c885a5b16e0633258678 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 17 Dec 2024 16:22:52 -0800 Subject: [PATCH] dump goroutine profile in receive --- cmd/thanos/receive.go | 77 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index a29deea086..1b7c7d5a5f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -11,6 +11,9 @@ import ( "net/http" "os" "path" + "path/filepath" + "runtime" + "runtime/pprof" "strconv" "strings" "time" @@ -546,6 +549,75 @@ func runReceive( }) } + if receiveMode == receive.IngestorOnly { + level.Debug(logger).Log("msg", "setting up periodic profiling") + ctx, cancel := context.WithCancel(context.Background()) + + checkProfileLimit := func(profileDir string, maxFiles int) (bool, error) { + files, err := os.ReadDir(profileDir) + if err != nil { + if os.IsNotExist(err) { + return false, nil // Directory doesn't exist yet, safe to proceed + } + return false, err // Real error + } + count := 0 + for _, file := range files { + if !file.IsDir() && filepath.Ext(file.Name()) == ".prof" { + count++ + } + } + return count >= maxFiles, nil + } + + dumpGoroutineProfile := func() error { + numGoroutines := runtime.NumGoroutine() + if numGoroutines < conf.goroutineProfileThreshold { + return nil + } + + profileDir := filepath.Join(conf.dataDir, "profiles") + limitReached, err := checkProfileLimit(profileDir, 10) + if err != nil { + level.Error(logger).Log("msg", "failed to check profile directory", "err", err) + return err + } + if limitReached { + level.Warn(logger).Log("msg", "profile limit reached, skipping profiling") + return nil + } + + if err := os.MkdirAll(profileDir, os.ModePerm); err != nil { + level.Error(logger).Log("msg", "failed to create profiles directory", "err", err) + return err + } + + timestamp := time.Now().UTC().Format("20060102_150405") + fileName := filepath.Join(profileDir, fmt.Sprintf("goroutine_UTC%s.prof", timestamp)) + + file, err := os.Create(fileName) + if err != nil { + level.Error(logger).Log("msg", "failed to create profile file", "err", err) + return err + } + defer file.Close() + + if err := pprof.Lookup("goroutine").WriteTo(file, 1); err != nil { + level.Error(logger).Log("msg", "failed to write goroutine profile", "err", err) + return err + } + + level.Info(logger).Log("msg", "Goroutine profile dumped", "num_goroutines", numGoroutines) + return nil + } + + g.Add(func() error { + return runutil.Repeat(conf.profilingInterval, ctx.Done(), dumpGoroutineProfile) + }, func(err error) { + cancel() + }) + } + { if limiter.CanReload() { ctx, cancel := context.WithCancel(context.Background()) @@ -988,6 +1060,8 @@ type receiveConfig struct { topMetricsUpdateInterval time.Duration matcherConverterCacheCapacity int maxPendingGrpcWriteRequests int + profilingInterval time.Duration + goroutineProfileThreshold int featureList *[]string } @@ -1154,6 +1228,9 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { Default("0").IntVar(&rc.matcherConverterCacheCapacity) cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature."). Default("0").IntVar(&rc.maxPendingGrpcWriteRequests) + cmd.Flag("receive.profiling-interval", "The interval at which profiling data is collected."). + Default("1m").DurationVar(&rc.profilingInterval) + cmd.Flag("receive.goroutine-profile-threshold", "Dump a profile if more goroutine than this threshold").Default("100000").IntVar(&rc.goroutineProfileThreshold) rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() }