diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 33081e059..bbb171df8 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -121,4 +121,6 @@ const ( ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size" ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota" PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage" + + WorkerUsageCollectorPanic = CadenceMetricsPrefix + "worker-metrics-collector-panic" ) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 40f825b56..f270327a6 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -281,7 +281,8 @@ func newWorkflowTaskWorkerInternal( taskWorker: poller, identity: params.Identity, workerType: "DecisionWorker", - shutdownTimeout: params.WorkerStopTimeout}, + shutdownTimeout: params.WorkerStopTimeout, + sync: ¶ms.Sync}, params.Logger, params.MetricsScope, nil, @@ -304,7 +305,8 @@ func newWorkflowTaskWorkerInternal( taskWorker: localActivityTaskPoller, identity: params.Identity, workerType: "LocalActivityWorker", - shutdownTimeout: params.WorkerStopTimeout}, + shutdownTimeout: params.WorkerStopTimeout, + sync: ¶ms.Sync}, params.Logger, params.MetricsScope, nil, @@ -482,7 +484,8 @@ func newActivityTaskWorker( identity: workerParams.Identity, workerType: workerType, shutdownTimeout: workerParams.WorkerStopTimeout, - userContextCancel: workerParams.UserContextCancel}, + userContextCancel: workerParams.UserContextCancel, + sync: &workerParams.Sync}, workerParams.Logger, workerParams.MetricsScope, sessionTokenBucket, diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b51048094..98fa552b6 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -28,12 +28,10 @@ import ( "errors" "fmt" "os" - "runtime" "sync" "syscall" "time" - "github.com/shirou/gopsutil/cpu" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -57,7 +55,7 @@ var ( var errShutdown = errors.New("worker shutting down") -var collectHardwareUsageOnce sync.Once +var emitOnce sync.Once type ( // resultHandler that returns result @@ -127,6 +125,7 @@ type ( shutdownTimeout time.Duration userContextCancel context.CancelFunc host string + sync *oncePerHost } // baseWorker that wraps worker activities. @@ -143,15 +142,20 @@ type ( logger *zap.Logger metricsScope tally.Scope - pollerRequestCh chan struct{} - pollerAutoScaler *pollerAutoScaler - taskQueueCh chan interface{} - sessionTokenBucket *sessionTokenBucket + pollerRequestCh chan struct{} + pollerAutoScaler *pollerAutoScaler + workerUsageCollector *workerUsageCollector + taskQueueCh chan interface{} + sessionTokenBucket *sessionTokenBucket } polledTask struct { task interface{} } + + oncePerHost interface { + Do(func()) + } ) func createPollRetryPolicy() backoff.RetryPolicy { @@ -177,16 +181,36 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t ) } + var once oncePerHost + if options.sync == nil { + once = &emitOnce + } else { + once = *options.sync + } + + // for now it's default to be enabled + workerUC := newWorkerUsageCollector( + workerUsageCollectorOptions{ + Enabled: true, + Cooldown: 30 * time.Second, + MetricsScope: metricsScope, + WorkerType: options.workerType, + EmitOnce: once, + }, + logger, + ) + bw := &baseWorker{ - options: options, - shutdownCh: make(chan struct{}), - taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), - retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), - logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), - metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), - pollerRequestCh: make(chan struct{}, options.maxConcurrentTask), - pollerAutoScaler: pollerAS, - taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. + options: options, + shutdownCh: make(chan struct{}), + taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), + retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), + logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), + metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), + pollerRequestCh: make(chan struct{}, options.maxConcurrentTask), + pollerAutoScaler: pollerAS, + workerUsageCollector: workerUC, + taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. limiterContext: ctx, limiterContextCancel: cancel, @@ -210,6 +234,10 @@ func (bw *baseWorker) Start() { bw.pollerAutoScaler.Start() } + if bw.workerUsageCollector != nil { + bw.workerUsageCollector.Start() + } + for i := 0; i < bw.options.pollerCount; i++ { bw.shutdownWG.Add(1) go bw.runPoller() @@ -218,11 +246,6 @@ func (bw *baseWorker) Start() { bw.shutdownWG.Add(1) go bw.runTaskDispatcher() - // We want the emit function run once per host instead of run once per worker - // since the emit function is host level metric. - bw.shutdownWG.Add(1) - go bw.emitHardwareUsage() - bw.isWorkerStarted = true traceLog(func() { bw.logger.Info("Started Worker", @@ -406,6 +429,9 @@ func (bw *baseWorker) Stop() { if bw.pollerAutoScaler != nil { bw.pollerAutoScaler.Stop() } + if bw.workerUsageCollector != nil { + bw.workerUsageCollector.Stop() + } if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success { traceLog(func() { @@ -419,53 +445,3 @@ func (bw *baseWorker) Stop() { } return } - -func (bw *baseWorker) emitHardwareUsage() { - defer func() { - if p := recover(); p != nil { - bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1) - topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType) - st := getStackTraceRaw(topLine, 7, 0) - bw.logger.Error("Unhandled panic in hardware emitting.", - zap.String(tagPanicError, fmt.Sprintf("%v", p)), - zap.String(tagPanicStack, st)) - } - }() - defer bw.shutdownWG.Done() - collectHardwareUsageOnce.Do( - func() { - ticker := time.NewTicker(hardwareMetricsCollectInterval) - for { - select { - case <-bw.shutdownCh: - ticker.Stop() - return - case <-ticker.C: - host := bw.options.host - scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host}) - - cpuPercent, err := cpu.Percent(0, false) - if err != nil { - bw.logger.Warn("Failed to get cpu percent", zap.Error(err)) - return - } - cpuCores, err := cpu.Counts(false) - if err != nil { - bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) - return - } - scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores)) - scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0]) - - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - - scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine())) - scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys)) - scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse)) - scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse)) - } - } - }) - -} diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 827abe0e0..52258e024 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -56,8 +56,19 @@ type ( mockCtrl *gomock.Controller service *workflowservicetest.MockClient } + + // fakeSyncOnce is a fake implementation of oncePerHost interface + // that DOES NOT ensure run only once per host + fakeSyncOnce struct { + } ) +var fakeSyncOnceValue fakeSyncOnce + +func (m *fakeSyncOnce) Do(f func()) { + f() +} + func helloWorldWorkflowFunc(ctx Context, input []byte) error { queryResult := startingQueryValue SetQueryHandler(ctx, queryType, func() (string, error) { @@ -179,12 +190,12 @@ func (s *InterfacesTestSuite) TestInterface() { domain := "testDomain" // Workflow execution parameters. workflowExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 4, MaxConcurrentDecisionTaskPollers: 4, Logger: zaptest.NewLogger(s.T()), - Tracer: opentracing.NoopTracer{}}, + Tracer: opentracing.NoopTracer{}, + Sync: &fakeSyncOnce{}}, } domainStatus := m.DomainStatusRegistered @@ -216,7 +227,8 @@ func (s *InterfacesTestSuite) TestInterface() { MaxConcurrentActivityTaskPollers: 10, MaxConcurrentDecisionTaskPollers: 10, Logger: zaptest.NewLogger(s.T()), - Tracer: opentracing.NoopTracer{}}, + Tracer: opentracing.NoopTracer{}, + Sync: &fakeSyncOnce{}}, } // Register activity instances and launch the worker. diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index fde69d7e1..611405314 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -367,6 +367,7 @@ func createShadowWorker( return createWorkerWithThrottle(t, service, 0, WorkerOptions{ EnableShadowWorker: true, ShadowOptions: *shadowOptions, + Sync: &fakeSyncOnce{}, }) } @@ -409,6 +410,7 @@ func createWorkerWithThrottle( workerOptions.TaskListActivitiesPerSecond = activitiesPerSecond workerOptions.Logger = zaptest.NewLogger(t) workerOptions.EnableSessionWorker = true + workerOptions.Sync = &fakeSyncOnce{} // Start Worker. worker := NewWorker( @@ -423,14 +425,14 @@ func createWorkerWithDataConverter( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter(), Sync: &fakeSyncOnce{}}) } func createWorkerWithAutoscaler( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}, Sync: &fakeSyncOnce{}}) } func createWorkerWithStrictNonDeterminismDisabled( @@ -444,7 +446,7 @@ func createWorkerWithHost( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host", Sync: &fakeSyncOnce{}}) } func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) { @@ -1031,7 +1033,7 @@ func TestActivityNilArgs(t *testing.T) { func TestWorkerOptionDefaults(t *testing.T) { domain := "worker-options-test" taskList := "worker-options-tl" - aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{Sync: &fakeSyncOnce{}}) decisionWorker := aggWorker.workflowWorker require.True(t, decisionWorker.executionParameters.Identity != "") require.NotNil(t, decisionWorker.executionParameters.Logger) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go new file mode 100644 index 000000000..357f4b5bc --- /dev/null +++ b/internal/internal_worker_usage_collector.go @@ -0,0 +1,134 @@ +package internal + +import ( + "context" + "fmt" + "github.com/shirou/gopsutil/cpu" + "github.com/uber-go/tally" + "go.uber.org/cadence/internal/common/metrics" + "go.uber.org/zap" + "runtime" + "sync" + "time" +) + +type ( + workerUsageCollector struct { + workerType string + cooldownTime time.Duration + logger *zap.Logger + ctx context.Context + shutdownCh chan struct{} + wg *sync.WaitGroup // graceful stop + cancel context.CancelFunc + metricsScope tally.Scope + emitOncePerHost oncePerHost + } + + workerUsageCollectorOptions struct { + Enabled bool + Cooldown time.Duration + MetricsScope tally.Scope + WorkerType string + EmitOnce oncePerHost + } + + hardwareUsage struct { + NumCPUCores int + CPUPercent float64 + NumGoRoutines int + TotalMemory float64 + MemoryUsedHeap float64 + MemoryUsedStack float64 + } +) + +func newWorkerUsageCollector( + options workerUsageCollectorOptions, + logger *zap.Logger, +) *workerUsageCollector { + if !options.Enabled { + return nil + } + ctx, cancel := context.WithCancel(context.Background()) + return &workerUsageCollector{ + workerType: options.WorkerType, + cooldownTime: options.Cooldown, + metricsScope: options.MetricsScope, + logger: logger, + ctx: ctx, + cancel: cancel, + wg: &sync.WaitGroup{}, + emitOncePerHost: options.EmitOnce, + shutdownCh: make(chan struct{}), + } +} + +func (w *workerUsageCollector) Start() { + w.logger.Info("Starting worker usage collector", zap.String("workerType", w.workerType)) + + w.emitOncePerHost.Do( + func() { + w.wg.Add(1) + w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) + go w.runHardwareCollector() + }) + +} + +func (w *workerUsageCollector) Stop() { + close(w.shutdownCh) + w.wg.Wait() + w.cancel() + +} + +func (w *workerUsageCollector) runHardwareCollector() { + defer w.wg.Done() + ticker := time.NewTicker(w.cooldownTime) + defer ticker.Stop() + w.logger.Info(fmt.Sprintf("Started worker usage collector for workertype: %v", w.workerType)) + for { + select { + case <-w.shutdownCh: + return + case <-ticker.C: + hardwareUsageData := w.collectHardwareUsage() + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) + } + } + } +} + +func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { + cpuPercent, err := cpu.Percent(0, false) + if err != nil { + w.logger.Warn("Failed to get cpu percent", zap.Error(err)) + } + cpuCores, err := cpu.Counts(false) + if err != nil { + w.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) + } + + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + return hardwareUsage{ + NumCPUCores: cpuCores, + CPUPercent: cpuPercent[0], + NumGoRoutines: runtime.NumGoroutine(), + TotalMemory: float64(memStats.Sys), + MemoryUsedHeap: float64(memStats.HeapAlloc), + MemoryUsedStack: float64(memStats.StackInuse), + } +} + +// emitHardwareUsage emits collected hardware usage metrics to metrics scope +func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { + w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) + w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) + w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) + w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) + w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) + w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) +} diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 49d4041b2..b9b973472 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -95,7 +95,8 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { TaskList: "testTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentDecisionTaskPollers: 5, - Logger: logger}, + Logger: logger, + Sync: &fakeSyncOnce{}}, UserContext: ctx, UserContextCancel: cancel, } @@ -127,7 +128,8 @@ func (s *WorkersTestSuite) testActivityWorker(useLocallyDispatched bool) { TaskList: "testTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 5, - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, } overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler(), useLocallyDispatchedActivityPoller: useLocallyDispatched} a := &greeterActivity{} @@ -174,6 +176,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { MaxConcurrentActivityTaskPollers: 5, MaxConcurrentActivityExecutionSize: 2, Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}, }, ), UserContext: ctx, @@ -212,7 +215,8 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() { TaskList: "testDecisionTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentDecisionTaskPollers: 5, - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, } overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()} workflowWorker := newWorkflowWorkerInternal( @@ -339,6 +343,7 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() { Logger: zaptest.NewLogger(s.T()), DisableActivityWorker: true, Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -514,6 +519,7 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() { // and we can force clear the cache when polling the query task. // See the mock function for the second PollForDecisionTask call above. MaxConcurrentDecisionTaskExecutionSize: 1, + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -637,6 +643,7 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { Logger: zaptest.NewLogger(s.T()), DisableActivityWorker: true, Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -747,6 +754,7 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() { options := WorkerOptions{ Logger: zaptest.NewLogger(s.T()), Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -812,6 +820,7 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() { options := WorkerOptions{ Logger: zaptest.NewLogger(s.T()), Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, nil).AnyTimes() diff --git a/internal/worker.go b/internal/worker.go index abef02a5b..15a3fe91a 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -271,6 +271,12 @@ type ( // // Deprecated: All bugports are always deprecated and may be removed at any time. WorkerBugPorts WorkerBugPorts + + // Optional: This implementation ensures that a specific function is executed only once per instance. + // The mechanism can be overridden by other interfaces that implement the 'Do()' method. + // + // default: nil, that would ensure some functions are executed only once + Sync oncePerHost } // WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily diff --git a/internal/workflow_shadower_worker_test.go b/internal/workflow_shadower_worker_test.go index 29b38983c..30a89f4a2 100644 --- a/internal/workflow_shadower_worker_test.go +++ b/internal/workflow_shadower_worker_test.go @@ -70,7 +70,8 @@ func (s *shadowWorkerSuite) TestNewShadowWorker() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, registry, ) @@ -102,7 +103,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_InvalidShadowOption() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -122,7 +124,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_DomainNotExist() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -141,7 +144,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_TaskListNotSpecified() ShadowOptions{}, workerExecutionParameters{ WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -165,7 +169,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_StartWorkflowError() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -209,7 +214,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Succeed() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), )