Skip to content

Commit

Permalink
Merge branch 'fix/strange-cursor' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Sep 16, 2024
2 parents 8700a4c + 53abbec commit 1ebe8eb
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 4 deletions.
4 changes: 4 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## v1.10.4 (Unreleased)

### Server

* Fix bug where some invalid cursors may be sent (with 'LIB' being above the block being sent) and add safeguard/loggin if the bug appears again

### CLI

* Add `-o cursor` output type to `substreams run` for debugging purposes
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/jhump/protoreflect v1.14.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/streamingfast/bstream v0.0.2-0.20240906151250-c7bc58efc760
github.com/streamingfast/bstream v0.0.2-0.20240916154503-c9c5c8bbeca0
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/streamingfast/bstream v0.0.2-0.20240906151250-c7bc58efc760 h1:m6yFZwq1t45QjPK7B1UEoRvM99YYD8U2OZIa3oLtgbM=
github.com/streamingfast/bstream v0.0.2-0.20240906151250-c7bc58efc760/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/bstream v0.0.2-0.20240916154503-c9c5c8bbeca0 h1:7qWlxoUY8r/RUOYEH48ZJC1lwIRLiXyOIp2Xwp2TKoE=
github.com/streamingfast/bstream v0.0.2-0.20240916154503-c9c5c8bbeca0/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 h1:UxJUTcEVkdZy8N77E3exz0iNlgQuxl4m220GPvzdZ2s=
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80/go.mod h1:QxjVH73Lkqk+mP8bndvhMuQDUINfkgsYhdCH/5TJFKI=
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338 h1:o3Imquu+RhIdF62OSr/ZxVPsn6jpKHwBV/Upl6P28o0=
Expand Down
6 changes: 6 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,14 @@ func returnModuleDataOutputs(
extraMapModuleOutputs []*pbsubstreamsrpc.MapModuleOutput,
extraStoreModuleOutputs []*pbsubstreamsrpc.StoreModuleOutput,
respFunc substreams.ResponseFunc,
logger *zap.Logger,
) error {

if cursor.Block.Num() < cursor.LIB.Num() {
// safeguard for a bug that "may" have been fixed in bstream library
logger.Warn("cursor is invalid", zap.Uint64("clock_num", clock.Number), zap.String("cursor", cursor.String()))
return fmt.Errorf("internal error 1203")
}
out := &pbsubstreamsrpc.BlockScopedData{
Clock: clock,
Output: mapModuleOutput,
Expand Down
2 changes: 1 addition & 1 deletion pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock,
MapOutput: &anypb.Any{},
}
}
if err = returnModuleDataOutputs(clock, cursor, mapModuleOutput, p.extraMapModuleOutputs, p.extraStoreModuleOutputs, p.respFunc); err != nil {
if err = returnModuleDataOutputs(clock, cursor, mapModuleOutput, p.extraMapModuleOutputs, p.extraStoreModuleOutputs, p.respFunc, logger); err != nil {
return fmt.Errorf("failed to return module data output: %w", err)
}
}
Expand Down
5 changes: 5 additions & 0 deletions pipeline/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ func resolveStartBlockNum(ctx context.Context, req *pbsubstreamsrpc.Request, res
return nextBlock, "", nil, nil
}

if cursor.LIB.Num() > cursor.Block.Num() {
reqctx.Logger(ctx).Warn("Received invalid start cursor", zap.String("cursor", cursor.String()))
return 0, "", nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("StartCursor is invalid: LIB %d greater than Block %d", cursor.LIB.Num(), cursor.Block.Num()))
}

reorgJunctionBlock, head, err := resolveCursor(ctx, cursor)
if err != nil {
return 0, "", nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("cannot resolve StartCursor %q: %s", cursor, err.Error()))
Expand Down
1 change: 1 addition & 0 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (s *Tier1Service) Blocks(
zap.String("output_module", request.OutputModule),
zap.String("output_module_hash", outputModuleHash),
zap.Bool("compressed", compressed),
zap.Bool("final_blocks_only", request.FinalBlocksOnly),
}
fields = append(fields, zap.Bool("production_mode", request.ProductionMode))
fields = append(fields, zap.Bool("noop_mode", request.NoopMode))
Expand Down

0 comments on commit 1ebe8eb

Please sign in to comment.