Skip to content

Commit

Permalink
better stats
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Feb 8, 2024
1 parent 96a7811 commit bca0310
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cmd/substreams-sink-kv/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var injectCmd = Command(injectRunE,
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags)

flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks")
flags.Int("flush-interval", 100, "When in catch up mode, flush every N blocks")
flags.String("module", "", "An explicit module to sink, if not provided, expecting the Substreams manifest to defined 'sink' configuration")
flags.String("server-listen-addr", "", "Launch query server on this address")
flags.Bool("server-listen-ssl-self-signed", false, "Listen with an HTTPS server (with self-signed certificate)")
Expand Down
1 change: 1 addition & 0 deletions sinker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ var metrics = dmetrics.NewSet()

var FlushedEntriesCount = metrics.NewCounter("substreams_sink_kv_flushed_entries_count", "The number of flushed entries")
var FlushCount = metrics.NewCounter("substreams_sink_kv_store_flush_count", "The amount of flush that happened so far")
var BlockCount = metrics.NewCounter("substreams_sink_kv_store_block_count", "The block processed so far")
var FlushDuration = metrics.NewCounter("substreams_sink_kv_store_flush_duration", "The amount of time spent flushing cache to db")
4 changes: 2 additions & 2 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *KVSinker) handleBlockScopedData(ctx context.Context, data *pbsubstreams

FlushCount.Inc()
FlushedEntriesCount.AddInt(count)
FlushedEntriesCount.AddInt64(time.Since(flushStart).Nanoseconds())

FlushDuration.AddInt64(time.Since(flushStart).Nanoseconds())
s.stats.RecordFlushDuration(time.Since(flushStart))
s.stats.RecordBlock(blockRef)
}

Expand Down
16 changes: 13 additions & 3 deletions sinker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@ type Stats struct {
*shutter.Shutter

dbFlushRate *dmetrics.AvgRatePromCounter
flusehdEntries *dmetrics.ValueFromMetric
flushedEntries *dmetrics.ValueFromMetric
lastBlock bstream.BlockRef
logger *zap.Logger
blockRate *dmetrics.AvgRatePromCounter
flushDuration *dmetrics.AvgDurationCounter
}

func NewStats(logger *zap.Logger) *Stats {
return &Stats{
Shutter: shutter.New(),

dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"),
flusehdEntries: dmetrics.NewValueFromMetric(FlushedEntriesCount, "entries"),
blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockCount, 1*time.Second, 30*time.Second, "block"),
flushedEntries: dmetrics.NewValueFromMetric(FlushedEntriesCount, "entries"),
flushDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Second, "flush duration"),
lastBlock: unsetBlockRef{},
logger: logger,
}
Expand All @@ -34,6 +38,10 @@ func (s *Stats) RecordBlock(block bstream.BlockRef) {
s.lastBlock = block
}

func (s *Stats) RecordFlushDuration(since time.Duration) {

}

func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
if !cursor.IsBlank() {
s.lastBlock = cursor.Block()
Expand Down Expand Up @@ -63,7 +71,9 @@ func (s *Stats) LogNow() {
// them so the development logs looks nicer.
s.logger.Info("substreams kv stats",
zap.Stringer("db_flush_rate", s.dbFlushRate),
zap.Uint64("flushed_entries", s.flusehdEntries.ValueUint()),
zap.String("flush_duration", s.flushDuration.String()),
zap.Stringer("block_rate", s.blockRate),
zap.Uint64("flushed_entries", s.flushedEntries.ValueUint()),
zap.Stringer("last_block", s.lastBlock),
)
}
Expand Down

0 comments on commit bca0310

Please sign in to comment.