Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC dedupe-logs #4564

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions components/motor/fake/motor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.viam.com/rdk/logging"
"go.viam.com/rdk/operation"
"go.viam.com/rdk/resource"
"go.viam.com/utils"
)

var (
Expand Down Expand Up @@ -79,7 +80,6 @@ func init() {
// direction.
type Motor struct {
resource.Named
resource.TriviallyCloseable

mu sync.Mutex
powerPct float64
Expand All @@ -93,14 +93,29 @@ type Motor struct {

OpMgr *operation.SingleOperationManager
Logger logging.Logger

logWorkers *utils.StoppableWorkers
}

// NewMotor creates a new fake motor.
func NewMotor(ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger) (motor.Motor, error) {
logWorker := func(ctx context.Context) {
for {
if ctx.Err() != nil {
logger.Info("stopping log worker")
return
}

logger.Info("here is an annoying spammy message")
time.Sleep(10 * time.Millisecond)
}
}

m := &Motor{
Named: conf.ResourceName().AsNamed(),
Logger: logger,
OpMgr: operation.NewSingleOperationManager(),
Named: conf.ResourceName().AsNamed(),
Logger: logger,
OpMgr: operation.NewSingleOperationManager(),
logWorkers: utils.NewBackgroundStoppableWorkers(logWorker),
}
if err := m.Reconfigure(ctx, deps, conf); err != nil {
return nil, err
Expand Down Expand Up @@ -431,3 +446,8 @@ func (m *Motor) IsMoving(ctx context.Context) (bool, error) {
defer m.mu.Unlock()
return math.Abs(m.powerPct) >= 0.005, nil
}

func (m *Motor) Close(ctx context.Context) error {
m.logWorkers.Stop()
return nil
}
61 changes: 61 additions & 0 deletions logging/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ import (
"go.uber.org/zap/zaptest"
)

const (
// Window duration over which to consider log messages "noisy."
noisyMessageWindowDuration = 5 * time.Second
// Count threshold within `noisyMessageWindowDuration` after which to
// consider log messages "noisy."
noisyMessageCountThreshold = 3
)

type (
impl struct {
name string
Expand All @@ -26,6 +34,15 @@ type (
// avoid that. This function is a no-op for non-test loggers. See `NewTestAppender`
// documentation for more details.
testHelper func()

// Whether or not to de-duplicate noisy logs.
dedupNoisyLogs bool
// Map of messages to counts of that message being `Write`ten within window.
recentMessageCounts map[string]int
// Map of messages to last `LogEntry` with that message within window.
recentMessageEntries map[string]LogEntry
// Start of current window.
recentMessageWindowStart time.Time
}

// LogEntry embeds a zapcore Entry and slice of Fields.
Expand Down Expand Up @@ -84,6 +101,14 @@ func (imp *impl) Sublogger(subname string) Logger {
imp.appenders,
imp.registry,
imp.testHelper,
// Inherit _whether_ we should deduplicate noisy logs from parent. However,
// subloggers should handle their own de-duplication with their own maps
// and windows. This design avoids races and allows logs with identical
// messages from different loggers to be considered unique.
imp.dedupNoisyLogs,
make(map[string]int),
make(map[string]LogEntry),
time.Now(),
}

// If there are multiple callers racing to create the same logger name (e.g: `viam.networking`),
Expand Down Expand Up @@ -198,6 +223,42 @@ func (imp *impl) shouldLog(logLevel Level) bool {
}

func (imp *impl) Write(entry *LogEntry) {
if imp.dedupNoisyLogs {
// If we have have entered a new recentMessage window, output noisy logs from
// the last window.
if time.Since(imp.recentMessageWindowStart) > noisyMessageWindowDuration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not a PR, but just noting that I think most of this code would need to be wrapped in a lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah totally correct, will want a more "careful" implementation eventually.

for message, count := range imp.recentMessageCounts {
if count > noisyMessageCountThreshold {
collapsedEntry := imp.recentMessageEntries[entry.Message]
collapsedEntry.Message = fmt.Sprintf("Message logged %d times in past %v: %s",
count, noisyMessageWindowDuration, message)

imp.testHelper()
for _, appender := range imp.appenders {
err := appender.Write(collapsedEntry.Entry, collapsedEntry.Fields)
if err != nil {
fmt.Fprint(os.Stderr, err)
}
}
}
}

// Clear maps and reset window.
clear(imp.recentMessageCounts)
clear(imp.recentMessageEntries)
imp.recentMessageWindowStart = time.Now()
}

// Track entry in recentMessage maps.
imp.recentMessageCounts[entry.Message]++
imp.recentMessageEntries[entry.Message] = *entry

if imp.recentMessageCounts[entry.Message] > noisyMessageCountThreshold {
// If entry's message is reportedly "noisy," return early.
return
}
}

imp.testHelper()
for _, appender := range imp.appenders {
err := appender.Write(entry.Entry, entry.Fields)
Expand Down
80 changes: 60 additions & 20 deletions logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,42 @@
package logging

import (
"os"
"sync"
"testing"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"go.viam.com/utils"
)

// Environment variable to control whether noisy logs are de-deduplicated. Set
// to "false" to turn off de-duplicating logic; de-duplication logic is enabled
// by default.
const dedupNoisyLogsEnvVar = "VIAM_DEDUP_LOGS"

var (
globalMu sync.RWMutex
globalLogger = NewDebugLogger("global")

// GlobalLogLevel should be used whenever a zap logger is created that wants to obey the debug
// flag from the CLI or robot config.
GlobalLogLevel = zap.NewAtomicLevelAt(zap.InfoLevel)

// Whether to de-duplicate noisy logs; obtained from value of
// `dedupNoisyLogsEnvVar` and defaults to true. Export env var to "false" to
// turn off de-duplicating logic.
dedupNoisyLogs = true
)

func init() {
if dedupNoisyLogEnvVal := os.Getenv(dedupNoisyLogsEnvVar); dedupNoisyLogEnvVal == "false" {
dedupNoisyLogs = false
}
}

// ReplaceGlobal replaces the global loggers.
func ReplaceGlobal(logger Logger) {
globalMu.Lock()
Expand Down Expand Up @@ -62,11 +80,15 @@ func NewZapLoggerConfig() zap.Config {
// NewLogger returns a new logger that outputs Info+ logs to stdout in UTC.
func NewLogger(name string) Logger {
logger := &impl{
name: name,
level: NewAtomicLevelAt(INFO),
appenders: []Appender{NewStdoutAppender()},
registry: newRegistry(),
testHelper: func() {},
name: name,
level: NewAtomicLevelAt(INFO),
appenders: []Appender{NewStdoutAppender()},
registry: newRegistry(),
testHelper: func() {},
dedupNoisyLogs: dedupNoisyLogs,
recentMessageCounts: make(map[string]int),
recentMessageEntries: make(map[string]LogEntry),
recentMessageWindowStart: time.Now(),
}

logger.registry.registerLogger(name, logger)
Expand All @@ -78,11 +100,15 @@ func NewLogger(name string) Logger {
func NewLoggerWithRegistry(name string) (Logger, *Registry) {
reg := newRegistry()
logger := &impl{
name: name,
level: NewAtomicLevelAt(INFO),
appenders: []Appender{NewStdoutAppender()},
registry: reg,
testHelper: func() {},
name: name,
level: NewAtomicLevelAt(INFO),
appenders: []Appender{NewStdoutAppender()},
registry: reg,
testHelper: func() {},
dedupNoisyLogs: dedupNoisyLogs,
recentMessageCounts: make(map[string]int),
recentMessageEntries: make(map[string]LogEntry),
recentMessageWindowStart: time.Now(),
}

logger.registry.registerLogger(name, logger)
Expand All @@ -92,11 +118,15 @@ func NewLoggerWithRegistry(name string) (Logger, *Registry) {
// NewDebugLogger returns a new logger that outputs Debug+ logs to stdout in UTC.
func NewDebugLogger(name string) Logger {
logger := &impl{
name: name,
level: NewAtomicLevelAt(DEBUG),
appenders: []Appender{NewStdoutAppender()},
registry: newRegistry(),
testHelper: func() {},
name: name,
level: NewAtomicLevelAt(DEBUG),
appenders: []Appender{NewStdoutAppender()},
registry: newRegistry(),
testHelper: func() {},
dedupNoisyLogs: dedupNoisyLogs,
recentMessageCounts: make(map[string]int),
recentMessageEntries: make(map[string]LogEntry),
recentMessageWindowStart: time.Now(),
}

logger.registry.registerLogger(name, logger)
Expand All @@ -107,11 +137,15 @@ func NewDebugLogger(name string) Logger {
// pre-existing appenders/outputs.
func NewBlankLogger(name string) Logger {
logger := &impl{
name: name,
level: NewAtomicLevelAt(DEBUG),
appenders: []Appender{},
registry: newRegistry(),
testHelper: func() {},
name: name,
level: NewAtomicLevelAt(DEBUG),
appenders: []Appender{},
registry: newRegistry(),
testHelper: func() {},
dedupNoisyLogs: dedupNoisyLogs,
recentMessageCounts: make(map[string]int),
recentMessageEntries: make(map[string]LogEntry),
recentMessageWindowStart: time.Now(),
}

logger.registry.registerLogger(name, logger)
Expand All @@ -136,6 +170,8 @@ func NewObservedTestLogger(tb testing.TB) (Logger, *observer.ObservedLogs) {
},
registry: newRegistry(),
testHelper: tb.Helper,
// Only prod loggers should de-duplicate noisy logs.
dedupNoisyLogs: false,
}

return logger, observedLogs
Expand All @@ -155,6 +191,8 @@ func NewObservedTestLoggerWithRegistry(tb testing.TB, name string) (Logger, *obs
},
registry: registry,
testHelper: tb.Helper,
// Only prod loggers should de-duplicate noisy logs.
dedupNoisyLogs: false,
}

return logger, observedLogs, registry
Expand Down Expand Up @@ -189,6 +227,8 @@ func NewInMemoryLogger(tb testing.TB) *MemLogger {
},
registry: newRegistry(),
testHelper: tb.Helper,
// Only prod loggers should de-duplicate noisy logs.
dedupNoisyLogs: false,
}

memLogger := &MemLogger{logger, tb, observedLogs}
Expand Down
Loading