-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protect misconfiguring catchupModeBlockGap < catchupModePageSize and fix catchup #199
Conversation
…eSize Signed-off-by: Peter Broadhurst <[email protected]>
Codecov Report
@@ Coverage Diff @@
## main #199 +/- ##
==========================================
+ Coverage 97.21% 97.23% +0.01%
==========================================
Files 58 58
Lines 6897 6900 +3
==========================================
+ Hits 6705 6709 +4
+ Misses 147 146 -1
Partials 45 45
Continue to review full report at Codecov.
|
I found we still were not writing checkpoint HWM updates while replaying in catchup mode. There was a check here that architecturally was meant to address this when #130 was first implemented, but it has a special case of failing when
So I've just made |
5ef1b78
to
e4fe13e
Compare
Signed-off-by: Peter Broadhurst <[email protected]>
e4fe13e
to
55d5515
Compare
@@ -428,14 +427,12 @@ func (a *eventStream) eventPoller() { | |||
blockHeight, exists := checkpoint[sub.info.ID] | |||
if !exists || blockHeight.Cmp(big.NewInt(0)) <= 0 { | |||
blockHeight, err = sub.setInitialBlockHeight(ctx) | |||
} else { | |||
} else if !sub.inCatchupMode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were constantly restoring back to the checkpoint (and logging that fact) in catchup mode. No need
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this took me a bit of time to understand, so adding a note here for my future self. The checkpoint is updated in catchup mode as part of batchComplete()
call: https://github.com/kaleido-io/firefly-ethconnect/blob/511d55a563aa7311c310ecadd4aa3040760abead/internal/events/logprocessor.go#L75-L76
changed = changed || blockUpdatedFilterStale || i1 == nil || i1.Cmp(&i2) != 0 | ||
subChanged := i1 == nil || i1.Cmp(&i2) != 0 | ||
if subChanged { | ||
log.Debugf("%s: New checkpoint HWM: %s", a.spec.ID, i2.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced other less helpful logging, to just show the one subscription that updated its checkpoint
subID: subID, | ||
event: event, | ||
stream: stream, | ||
} | ||
lp.highestDispatched.SetInt64(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key fix - start with highestDispatched
at -1
so compare to 0
works
lp.blockHWM.Set(blockNumber) | ||
log.Debugf("%s: HWM: %s", lp.subID, lp.blockHWM.String()) | ||
// Nothing in-flight, its safe to update the HWM - to one after the block we're up to | ||
lp.blockHWM.Set(new(big.Int).Add(blockNumber, big.NewInt(1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this, each restart we were going back one block.
Note very important we do new(big.Int)
here, so we are not adding to the blockNumber
passed in - which is a pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, definitely missed this the last time I worked on fixing the restart!
@@ -122,6 +122,10 @@ func NewSubscriptionManager(conf *SubscriptionManagerConf, rpc eth.RPCClient, cr | |||
if conf.CatchupModePageSize <= 0 { | |||
conf.CatchupModePageSize = defaultCatchupModePageSize | |||
} | |||
if conf.CatchupModeBlockGap < conf.CatchupModePageSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key fix - stop users from setting invalid combination
@@ -410,7 +414,7 @@ func (s *subscriptionMGR) loadCheckpoint(streamID string) (map[string]*big.Int, | |||
func (s *subscriptionMGR) storeCheckpoint(streamID string, checkpoint map[string]*big.Int) error { | |||
cpID := checkpointIDPrefix + streamID | |||
b, _ := json.MarshalIndent(&checkpoint, "", " ") | |||
log.Debugf("Storing checkpoint %s: %s", cpID, string(b)) | |||
log.Tracef("Storing checkpoint %s: %s", cpID, string(b)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to trace as we now log at Debug the individual subs above, only when they change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with reverting the previous iteration of attempted fix, @peterbroadhurst . Changes LGTM
See #198 for the 🤕 behavior you can get if we don't protect against this.
The logic here has an implicit assumption of this rule, so that when we read a page it's never going to extend past the
blockNumber
of the header of the chain at the point we query.Otherwise, we think:
firefly-ethconnect/internal/events/subscription.go
Lines 223 to 248 in f6c7fe5