Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create separate worker usage data collection and move hardware emit there #1293

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 33 additions & 76 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sync"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -57,8 +55,6 @@ var (

var errShutdown = errors.New("worker shutting down")

var collectHardwareUsageOnce sync.Once

type (
// resultHandler that returns result
resultHandler func(result []byte, err error)
Expand Down Expand Up @@ -140,10 +136,11 @@ type (
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
workerUsageCollector *workerUsageCollector
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
}

polledTask struct {
Expand Down Expand Up @@ -173,17 +170,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
logger,
)
}
// for now it's default to be enabled
var workerUC *workerUsageCollector
timl3136 marked this conversation as resolved.
Show resolved Hide resolved
workerUC = newWorkerUsageCollector(
workerUsageCollectorOptions{
Enabled: true,
Cooldown: 30 * time.Second,
MetricsScope: metricsScope,
WorkerType: options.workerType,
},
logger,
)

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
workerUsageCollector: workerUC,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.

limiterContext: ctx,
limiterContextCancel: cancel,
Expand All @@ -207,6 +216,10 @@ func (bw *baseWorker) Start() {
bw.pollerAutoScaler.Start()
}

if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Start()
}

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
go bw.runPoller()
Expand All @@ -215,15 +228,6 @@ func (bw *baseWorker) Start() {
bw.shutdownWG.Add(1)
go bw.runTaskDispatcher()

// We want the emit function run once per host instead of run once per worker
//collectHardwareUsageOnce.Do(func() {
// bw.shutdownWG.Add(1)
// go bw.emitHardwareUsage()
//})

bw.shutdownWG.Add(1)
go bw.emitHardwareUsage()

bw.isWorkerStarted = true
traceLog(func() {
bw.logger.Info("Started Worker",
Expand Down Expand Up @@ -407,6 +411,9 @@ func (bw *baseWorker) Stop() {
if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Stop()
}
if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Stop()
}

if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
Expand All @@ -420,53 +427,3 @@ func (bw *baseWorker) Stop() {
}
return
}

func (bw *baseWorker) emitHardwareUsage() {
defer func() {
if p := recover(); p != nil {
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
st := getStackTraceRaw(topLine, 7, 0)
bw.logger.Error("Unhandled panic in hardware emitting.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer bw.shutdownWG.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(hardwareMetricsCollectInterval)
for {
select {
case <-bw.shutdownCh:
ticker.Stop()
return
case <-ticker.C:
host := bw.options.host
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})

cpuPercent, err := cpu.Percent(0, false)
if err != nil {
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
return
}
cpuCores, err := cpu.Counts(false)
if err != nil {
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
return
}
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
}
}
})

}
126 changes: 126 additions & 0 deletions internal/internal_worker_usage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package internal

import (
"context"
"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/zap"
"runtime"
"sync"
"time"
)

type (
workerUsageCollector struct {
workerType string
cooldownTime time.Duration
logger *zap.Logger
ctx context.Context
wg *sync.WaitGroup // graceful stop
cancel context.CancelFunc
metricsScope tally.Scope
}

workerUsageCollectorOptions struct {
Enabled bool
Cooldown time.Duration
MetricsScope tally.Scope
WorkerType string
}

hardwareUsage struct {
NumCPUCores int
CPUPercent float64
NumGoRoutines int
TotalMemory float64
MemoryUsedHeap float64
MemoryUsedStack float64
}
)

func newWorkerUsageCollector(
options workerUsageCollectorOptions,
logger *zap.Logger,
) *workerUsageCollector {
if !options.Enabled {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
return &workerUsageCollector{
workerType: options.WorkerType,
cooldownTime: options.Cooldown,
metricsScope: options.MetricsScope,
logger: logger,
ctx: ctx,
cancel: cancel,
wg: &sync.WaitGroup{},
}
}

func (w *workerUsageCollector) Start() {
w.wg.Add(1)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to spawn a goroutine per worker? Why not ensure only 1 running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the hardware emitting is once per host, all other metrics will be worker-specific. (e.g activity poll response vs. decision poll response)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I see only w.collectHardwareUsage() which will just spawn bunch of data into the same scope. I would suggest separating hardware emitter and worker specific metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's current design, for each type of metrics based on their origin, I will create a separate gorountine for each of them. But they would be contained under a single workerusagecollector so that their result can be collected and sent in one place

defer func() {
if p := recover(); p != nil {
w.logger.Error("Unhandled panic in workerUsageCollector.")
w.logger.Error(p.(error).Error())
timl3136 marked this conversation as resolved.
Show resolved Hide resolved
}
}()
defer w.wg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a few things problematic about this goroutine closure

  1. this wg.Done() will be called once goroutine for go w.runHardwareCollector() is started. It shouldn't be marked as done until runHardwareCollector() terminates so should be moved there
  2. no need for a panic recovery here
  3. no need for a goroutine to invoke runHardwareCollector.

ticker := time.NewTicker(w.cooldownTime)
timl3136 marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
// Given that decision worker and activity worker are running in the same host, we only need to collect
// hardware usage from one of them.
if w.workerType == "DecisionWorker" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might not be future proof and also if customer is running separate processes for decision and activity workers then we will not have the hardware usage of those hosts that only runs activity workers. we should also not create no-op workerUsageCollectors if only one of them will do the work.
@Groxx what would be your recommendation for host level metric reporting on the client side? I would like to avoid global static variables but this use case probably requires one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tried Sync.Once before, but that would cause issues with unit testing as it will just wait indefinitely for this routine to stop while blocking all other goroutine from closing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can override it in the unit tests

type once interface {
   Do(func())
}

var collectHardwareUsageOnce once

in typical startup this would be set to sync.Once:

collectHardwareUsageOnce = sync.Once{}

in test code you can initialize this to a fake implementation

collectHardwareUsageOnce = myFakeOnce{}  // myFakeOnce implements Do(func())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. I have implemented that in the latest commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see EmitOnce being used in workerUsageCollector. We should only have one (singleton) instance of workerUsageCollector which would be lazily created by the first worker instance. Rest of the workers would create a noOpUsageCollector. This lazy initialization logic should be hidden from workers. Worker just calls newWorkerUsageCollector() and that function should determine whether it's first time or not. Let's discuss offline if more clarification needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our usecase, only the hardware info are once per host collected. Other worker type (decision worker and activity worker) should have different workerUsageCollector as they track different task type behaviors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what type of information are you planning to collect per worker basis in this workerUsageCollector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasklist backlog/poll response since decision and activity worker have their own pollers and that need to be scaled independently

hardwareUsageData := w.collectHardwareUsage()
if w.metricsScope != nil {
w.emitHardwareUsage(hardwareUsageData)
}
}
}
}
}()
return
}

func (w *workerUsageCollector) Stop() {
w.cancel()
w.wg.Wait()
}

func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage {
cpuPercent, err := cpu.Percent(0, false)
if err != nil {
w.logger.Warn("Failed to get cpu percent", zap.Error(err))
}
cpuCores, err := cpu.Counts(false)
if err != nil {
w.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
}

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
return hardwareUsage{
NumCPUCores: cpuCores,
CPUPercent: cpuPercent[0],
NumGoRoutines: runtime.NumGoroutine(),
TotalMemory: float64(memStats.Sys),
MemoryUsedHeap: float64(memStats.HeapAlloc),
MemoryUsedStack: float64(memStats.StackInuse),
}
}

// emitHardwareUsage emits collected hardware usage metrics to metrics scope
func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) {
w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores))
w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent)
w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines))
w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory))
w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap))
w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack))
}