diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84fd438..dac1a9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,7 @@ name: CI on: [push, pull_request] env: - go-version: "1.19.x" + go-version: "1.21.x" jobs: test: name: Test diff --git a/Dockerfile b/Dockerfile index 3171e5a..9d8c95e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20 +FROM golang:1.21 WORKDIR /usr/src/app diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index 429766d..33b0589 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -2,6 +2,8 @@ package main import ( "database/sql" + + "log/slog" "os" "os/signal" "syscall" @@ -12,7 +14,8 @@ import ( "github.com/nyaruka/ezconf" indexer "github.com/nyaruka/rp-indexer/v8" "github.com/nyaruka/rp-indexer/v8/indexers" - log "github.com/sirupsen/logrus" + "github.com/nyaruka/rp-indexer/v8/utils" + "github.com/sirupsen/logrus" ) var ( @@ -26,32 +29,38 @@ func main() { loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"}) loader.MustLoad() - level, err := log.ParseLevel(cfg.LogLevel) + level, err := logrus.ParseLevel(cfg.LogLevel) if err != nil { - log.Fatalf("Invalid log level '%s'", level) + logrus.Fatalf("Invalid log level '%s'", level) } - log.SetLevel(level) - log.SetOutput(os.Stdout) - log.SetFormatter(&log.TextFormatter{}) - log.WithField("version", version).WithField("released", date).Info("starting indexer") + logrus.SetLevel(level) + logrus.SetOutput(os.Stdout) + logrus.SetFormatter(&logrus.TextFormatter{}) + logrus.WithField("version", version).WithField("released", date).Info("starting indexer") + + // configure golang std structured logging to route to logrus + slog.SetDefault(slog.New(utils.NewLogrusHandler(logrus.StandardLogger()))) + + logger := slog.With("comp", "main") + logger.Info("starting indexer", "version", version, "released", date) // if we have a DSN entry, try to initialize it if cfg.SentryDSN != "" { - hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel}) + hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel}) hook.Timeout = 0 hook.StacktraceConfiguration.Enable = true hook.StacktraceConfiguration.Skip = 4 hook.StacktraceConfiguration.Context = 5 if err != nil { - log.Fatalf("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err) + logger.Error("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err) } - log.StandardLogger().Hooks.Add(hook) + logrus.StandardLogger().Hooks.Add(hook) } db, err := sql.Open("postgres", cfg.DB) if err != nil { - log.Fatalf("unable to connect to database") + logger.Error("unable to connect to database") } idxrs := []indexers.Indexer{ @@ -63,7 +72,7 @@ func main() { // the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts idxr := idxrs[0] if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil { - log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding") + logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name()) } } else { d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second) @@ -82,7 +91,7 @@ func handleSignals(d *indexer.Daemon) { sig := <-sigs switch sig { case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: - log.WithField("signal", sig).Info("received exit signal, exiting") + slog.Info("received exit signal, exiting", "signal", sig) d.Stop() return } diff --git a/daemon.go b/daemon.go index 1ede36f..c3e7ee3 100644 --- a/daemon.go +++ b/daemon.go @@ -2,12 +2,12 @@ package indexer import ( "database/sql" + "log/slog" "sync" "time" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/rp-indexer/v8/indexers" - "github.com/sirupsen/logrus" ) type Daemon struct { @@ -53,7 +53,7 @@ func (d *Daemon) Start() { func (d *Daemon) startIndexer(indexer indexers.Indexer) { d.wg.Add(1) // add ourselves to the wait group - log := logrus.WithField("indexer", indexer.Name()) + log := slog.With("indexer", indexer.Name()) go func() { defer func() { @@ -68,7 +68,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) { case <-time.After(d.poll): _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup) if err != nil { - log.WithError(err).Error("error during indexing") + log.Error("error during indexing", "error", err) } } } @@ -80,7 +80,7 @@ func (d *Daemon) startStatsReporter(interval time.Duration) { go func() { defer func() { - logrus.Info("analytics exiting") + slog.Info("analytics exiting") d.wg.Done() }() @@ -117,11 +117,11 @@ func (d *Daemon) reportStats() { d.prevStats[ix] = stats } - log := logrus.NewEntry(logrus.StandardLogger()) + log := slog.New(slog.Default().Handler()) for k, v := range metrics { analytics.Gauge("indexer."+k, v) - log = log.WithField(k, v) + log = log.With(k, v) } log.Info("stats reported") @@ -129,7 +129,7 @@ func (d *Daemon) reportStats() { // Stop stops this daemon func (d *Daemon) Stop() { - logrus.Info("daemon stopping") + slog.Info("daemon stopping") analytics.Stop() close(d.quit) diff --git a/go.mod b/go.mod index 6e7d170..5fc9dd3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/nyaruka/rp-indexer/v8 -go 1.20 +go 1.21 require ( github.com/evalphobia/logrus_sentry v0.8.2 diff --git a/go.sum b/go.sum index bb97cb2..6153a1d 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,7 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= @@ -16,7 +17,9 @@ github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49P github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= +github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/indexers/base.go b/indexers/base.go index 5c85a68..f2f4d0f 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "log/slog" "net/http" "sort" "strings" @@ -11,7 +12,6 @@ import ( "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/rp-indexer/v8/utils" - "github.com/sirupsen/logrus" ) // indexes a document @@ -76,8 +76,8 @@ func (i *baseIndexer) Stats() Stats { return i.stats } -func (i *baseIndexer) log() *logrus.Entry { - return logrus.WithField("indexer", i.name) +func (i *baseIndexer) log() *slog.Logger { + return slog.With("indexer", i.name) } // records a complete index and updates statistics @@ -86,7 +86,7 @@ func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration i.stats.Deleted += int64(deleted) i.stats.Elapsed += elapsed - i.log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing") + i.log().Info("completed indexing", "indexed", indexed, "deleted", deleted, "elapsed", elapsed) } // our response for figuring out the physical index for an alias @@ -111,7 +111,7 @@ func (i *baseIndexer) FindIndexes() []string { // reverse sort order should put our newest index first sort.Sort(sort.Reverse(sort.StringSlice(indexes))) - i.log().WithField("indexes", indexes).Debug("found physical indexes") + i.log().Debug("found physical indexes", "indexes", indexes) return indexes } @@ -153,7 +153,7 @@ func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) { } // all went well, return our physical index name - i.log().WithField("index", index).Info("created new index") + i.log().Info("created new index", "index", index) return index, nil } @@ -191,7 +191,7 @@ func (i *baseIndexer) updateAlias(newIndex string) error { remove.Remove.Index = idx commands = append(commands, remove) - logrus.WithField("indexer", i.name).WithField("index", idx).Debug("removing old alias") + slog.Debug("removing old alias", "indexer", i.name, "index", idx) } // add our new index @@ -204,7 +204,7 @@ func (i *baseIndexer) updateAlias(newIndex string) error { _, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil) - i.log().WithField("index", newIndex).Info("updated alias") + i.log().Info("updated alias", "index", newIndex) return err } @@ -236,7 +236,7 @@ func (i *baseIndexer) cleanupIndexes() error { // for each active index, if it starts with our alias but is before our current index, remove it for key := range healthResponse.Indices { if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 { - logrus.WithField("index", key).Info("removing old index") + slog.Info("removing old index", "index", key) _, err = utils.MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil) if err != nil { return err @@ -275,27 +275,26 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { createdCount, deletedCount, conflictedCount := 0, 0, 0 for _, item := range response.Items { if item.Index.ID != "" { - logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Trace("index response") + slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status) if item.Index.Status == 200 || item.Index.Status == 201 { createdCount++ } else if item.Index.Status == 409 { conflictedCount++ } else { - logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).WithField("result", item.Index.Result).Error("error indexing document") + slog.Error("error indexing document", "id", item.Index.ID, "status", item.Index.Status, "result", item.Index.Result) } } else if item.Delete.ID != "" { - logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Trace("delete response") + slog.Debug("delete response", "id", item.Index.ID, "status", item.Index.Status) if item.Delete.Status == 200 { deletedCount++ } else if item.Delete.Status == 409 { conflictedCount++ } } else { - logrus.Error("unparsed item in response") + slog.Error("unparsed item in response") } } - logrus.WithField("created", createdCount).WithField("deleted", deletedCount).WithField("conflicted", conflictedCount).Debug("indexed batch") - + slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount) return createdCount, deletedCount, nil } diff --git a/indexers/base_test.go b/indexers/base_test.go index 9def23f..882171c 100644 --- a/indexers/base_test.go +++ b/indexers/base_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "log" + "log/slog" "os" "sort" "strconv" @@ -12,7 +13,6 @@ import ( "github.com/nyaruka/rp-indexer/v8/indexers" "github.com/olivere/elastic/v7" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -44,7 +44,7 @@ func setup(t *testing.T) (*sql.DB, *elastic.Client) { } } - logrus.SetLevel(logrus.DebugLevel) + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))) return db, es } diff --git a/indexers/contacts.go b/indexers/contacts.go index 93ed213..cf7c88b 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -6,10 +6,10 @@ import ( "database/sql" _ "embed" "fmt" + "log/slog" "time" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) //go:embed contacts.index.json @@ -54,7 +54,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error if err != nil { return "", errors.Wrap(err, "error creating new index") } - i.log().WithField("index", physicalIndex).Info("created new physical index") + i.log().Info("created new physical index", "index", physicalIndex) remapAlias = true } @@ -63,7 +63,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error return "", errors.Wrap(err, "error finding last modified") } - i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Debug("indexing newer than last modified") + i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified) // now index our docs start := time.Now() @@ -211,14 +211,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st lastModified = modifiedOn if isActive { - logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Trace("modified contact") + slog.Debug("modified contact", "id", id, "modifiedOn", modifiedOn, "contact", contactJSON) subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID)) subBatch.WriteString("\n") subBatch.WriteString(contactJSON) subBatch.WriteString("\n") } else { - logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).Trace("deleted contact") + slog.Debug("deleted contact", "id", id, "modifiedOn", modifiedOn) subBatch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID)) subBatch.WriteString("\n") @@ -248,16 +248,16 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st batchTime := time.Since(batchStart) batchRate := int(float32(batchFetched) / (float32(batchTime) / float32(time.Second))) - log := i.log().WithField("index", index).WithFields(logrus.Fields{ - "rate": batchRate, - "batch_fetched": batchFetched, - "batch_created": batchCreated, - "batch_elapsed": batchTime, - "batch_elapsed_es": batchESTime, - "total_fetched": totalFetched, - "total_created": totalCreated, - "total_elapsed": totalTime, - }) + log := i.log().With("index", index, + "rate", batchRate, + "batch_fetched", batchFetched, + "batch_created", batchCreated, + "batch_elapsed", batchTime, + "batch_elapsed_es", batchESTime, + "total_fetched", totalFetched, + "total_created", totalCreated, + "total_elapsed", totalTime, + ) // if we're rebuilding, always log batch progress if rebuild { diff --git a/utils/http.go b/utils/http.go index 9f25684..8fbb327 100644 --- a/utils/http.go +++ b/utils/http.go @@ -5,11 +5,11 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" "time" "github.com/nyaruka/gocommon/httpx" - log "github.com/sirupsen/logrus" ) var retryConfig *httpx.RetryConfig @@ -41,7 +41,7 @@ func shouldRetry(request *http.Request, response *http.Response, withDelay time. bodyBytes, err := io.ReadAll(response.Body) response.Body.Close() if err != nil { - log.WithError(err).Error("error reading ES response, retrying") + slog.Error("error reading ES response, retrying", "error", err) return true } @@ -51,12 +51,12 @@ func shouldRetry(request *http.Request, response *http.Response, withDelay time. // MakeJSONRequest is a utility function to make a JSON request, optionally decoding the response into the passed in struct func MakeJSONRequest(method string, url string, body []byte, dest any) (*http.Response, error) { - l := log.WithField("url", url).WithField("method", method) + l := slog.With("url", url, "method", method) req, _ := httpx.NewRequest(method, url, bytes.NewReader(body), map[string]string{"Content-Type": "application/json"}) resp, err := httpx.Do(http.DefaultClient, req, retryConfig, nil) if err != nil { - l.WithError(err).Error("error making request") + l.Error("error making request", "error", err) return resp, err } defer resp.Body.Close() @@ -64,22 +64,22 @@ func MakeJSONRequest(method string, url string, body []byte, dest any) (*http.Re // if we have a body, try to decode it respBody, err := io.ReadAll(resp.Body) if err != nil { - l.WithError(err).Error("error reading response") + l.Error("error reading response", "error", err) return resp, err } - l = l.WithField("response", string(respBody)).WithField("status", resp.StatusCode) + l = l.With("response", string(respBody), "status", resp.StatusCode) // error if we got a non-200 if resp.StatusCode != http.StatusOK { - l.WithError(err).Error("error reaching ES") + l.Error("error reaching ES", "error", err) return resp, fmt.Errorf("received non-200 response %d: %s", resp.StatusCode, respBody) } if dest != nil { err = json.Unmarshal(respBody, dest) if err != nil { - l.WithError(err).Error("error unmarshalling response") + l.Error("error unmarshalling response", "error", err) return resp, err } } diff --git a/utils/logrus.go b/utils/logrus.go new file mode 100644 index 0000000..2650bf9 --- /dev/null +++ b/utils/logrus.go @@ -0,0 +1,92 @@ +// Structured logging handler for logrus so we can rewrite code to use slog package incrementally. Once all logging is +// happening via slog, we just need to hook up Sentry directly to that, and then we can get rid of this file. +package utils + +import ( + "context" + "log/slog" + "slices" + "strings" + + "github.com/sirupsen/logrus" +) + +var levels = map[slog.Level]logrus.Level{ + slog.LevelError: logrus.ErrorLevel, + slog.LevelWarn: logrus.WarnLevel, + slog.LevelInfo: logrus.InfoLevel, + slog.LevelDebug: logrus.DebugLevel, +} + +type LogrusHandler struct { + logger *logrus.Logger + groups []string + attrs []slog.Attr +} + +func NewLogrusHandler(logger *logrus.Logger) *LogrusHandler { + return &LogrusHandler{logger: logger} +} + +func (l *LogrusHandler) clone() *LogrusHandler { + return &LogrusHandler{ + logger: l.logger, + groups: slices.Clip(l.groups), + attrs: slices.Clip(l.attrs), + } +} + +func (l *LogrusHandler) Enabled(ctx context.Context, level slog.Level) bool { + return levels[level] <= l.logger.GetLevel() +} + +func (l *LogrusHandler) Handle(ctx context.Context, r slog.Record) error { + log := logrus.NewEntry(l.logger) + if r.Time.IsZero() { + log = log.WithTime(r.Time) + } + + f := logrus.Fields{} + for _, a := range l.attrs { + if a.Key != "" { + f[a.Key] = a.Value + } + } + log = log.WithFields(f) + + r.Attrs(func(attr slog.Attr) bool { + if attr.Key == "" { + return true + } + log = log.WithField(attr.Key, attr.Value) + return true + }) + log.Logf(levels[r.Level], r.Message) + return nil +} + +func (l *LogrusHandler) groupPrefix() string { + if len(l.groups) > 0 { + return strings.Join(l.groups, ":") + ":" + } + return "" +} + +func (l *LogrusHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + newHandler := l.clone() + for _, a := range attrs { + newHandler.attrs = append(newHandler.attrs, slog.Attr{ + Key: l.groupPrefix() + a.Key, + Value: a.Value, + }) + } + return newHandler +} + +func (l *LogrusHandler) WithGroup(name string) slog.Handler { + newHandler := l.clone() + newHandler.groups = append(newHandler.groups, name) + return newHandler +} + +var _ slog.Handler = &LogrusHandler{}