Skip to content

Commit

Permalink
Update block poller
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Oct 27, 2023
1 parent 377b29b commit ecd3a46
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 528 deletions.
286 changes: 137 additions & 149 deletions node/pkg/watchers/evm/connectors/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/supervisor"
ethEvent "github.com/ethereum/go-ethereum/event"

ethereum "github.com/ethereum/go-ethereum"
ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil"
ethEvent "github.com/ethereum/go-ethereum/event"

"go.uber.org/zap"
)
Expand All @@ -24,27 +24,20 @@ type PollFinalizer interface {
// finalizer which will be used to only return finalized blocks on subscriptions.
type BlockPollConnector struct {
Connector
Delay time.Duration
useFinalized bool
publishSafeBlocks bool
finalizer PollFinalizer
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
Delay time.Duration
finalizer PollFinalizer
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
}

func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool, publishSafeBlocks bool) (*BlockPollConnector, error) {
if publishSafeBlocks && !useFinalized {
return nil, fmt.Errorf("publishSafeBlocks may only be enabled if useFinalized is enabled")
}
func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration) (*BlockPollConnector, error) {
if finalizer == nil {
return nil, fmt.Errorf("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.")
panic("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.")
}
connector := &BlockPollConnector{
Connector: baseConnector,
Delay: delay,
useFinalized: useFinalized,
publishSafeBlocks: publishSafeBlocks,
finalizer: finalizer,
Connector: baseConnector,
Delay: delay,
finalizer: finalizer,
}
err := supervisor.Run(ctx, "blockPoller", common.WrapWithScissors(connector.runFromSupervisor, "blockPoller"))
if err != nil {
Expand All @@ -53,190 +46,179 @@ func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finaliz
return connector, nil
}

func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) {
sub := NewPollSubscription()
blockSub := b.blockFeed.Subscribe(sink)

// The feed library does not support error forwarding, so we're emulating that using a custom subscription and
// an error feed. The feed library can't handle interfaces which is why we post strings.
innerErrSink := make(chan string, 10)
innerErrSub := b.errFeed.Subscribe(innerErrSink)

common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
return nil
case <-sub.quit:
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
}
}
})
return sub, nil
}

func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error {
logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName()))
supervisor.Signal(ctx, supervisor.SignalHealthy)
return b.run(ctx, logger)
}

func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error {
lastBlock, err := b.getBlock(ctx, logger, nil, Finalized)
prevLatest, err := getBlockByTag(ctx, logger, b.Connector, "latest", Latest)
if err != nil {
return err
}

var lastSafeBlock *NewBlock
if b.publishSafeBlocks {
lastSafeBlock, err = b.getBlock(ctx, logger, nil, Safe)
if err != nil {
return err
}
prevFinalized := &NewBlock{
Number: prevLatest.Number,
Hash: prevLatest.Hash,
Finality: Finalized,
}

timer := time.NewTimer(time.Millisecond) // Start immediately.
timer := time.NewTimer(b.Delay)
defer timer.Stop()

errCount := 0
for {
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
for count := 0; count < 3; count++ {
lastBlock, err = b.pollBlocks(ctx, logger, lastBlock, Finalized)
if err == nil {
break
}
logger.Error("polling of block encountered an error", zap.Error(err))

// Wait an interval before trying again. We stay in this loop so that we
// try up to three times before causing the watcher to restart.
time.Sleep(b.Delay)
}

if err == nil && b.publishSafeBlocks {
for count := 0; count < 3; count++ {
lastSafeBlock, err = b.pollBlocks(ctx, logger, lastSafeBlock, Safe)
if err == nil {
break
}
logger.Error("polling of safe block encountered an error", zap.Error(err))

// Same wait as above.
time.Sleep(b.Delay)
prevLatest, prevFinalized, err = b.pollBlock(ctx, logger, prevLatest, prevFinalized)
if err != nil {
errCount++
if errCount > 3 {
logger.Error("polling encountered an error", zap.Int("errCount", errCount), zap.Error(err))
b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err))
return err
}
} else {
errCount = 0
}

if err != nil {
b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err))
}
timer.Reset(b.Delay)
}
}
}

func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock, desiredFinality FinalityLevel) (lastPublishedBlock *NewBlock, retErr error) {
// Some of the testnet providers (like the one we are using for Arbitrum) limit how many transactions we can do. When that happens, the call hangs.
// Use a timeout so that the call will fail and the runable will get restarted. This should not happen in mainnet, but if it does, we will need to
// investigate why the runable is dying and fix the underlying problem.

lastPublishedBlock = lastBlock

// Fetch the latest block on the chain
// We could do this on every iteration such that if a new block is created while this function is being executed,
// it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration.
latestBlock, err := b.getBlockWithTimeout(ctx, logger, nil, desiredFinality)
// pollBlock poll for the latest block, compares them to the last one, and publishes any new ones.
// In the case of an error, it returns the last block that were passed in, otherwise it returns the new block.
func (b *BlockPollConnector) pollBlock(ctx context.Context, logger *zap.Logger, prevLatest *NewBlock, prevFinalized *NewBlock) (newLatest *NewBlock, newFinalized *NewBlock, err error) {
newLatest, err = getBlockByTag(ctx, logger, b.Connector, "latest", Latest)
if err != nil {
logger.Error("failed to look up latest block",
zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to look up latest block: %w", err)
err = fmt.Errorf("failed to get latest block: %w", err)
newLatest = prevLatest
newFinalized = prevFinalized
return
}
for {
if lastPublishedBlock.Number.Cmp(latestBlock.Number) >= 0 {
// We have to wait for a new block to become available
return
}

// Try to fetch the next block between lastBlock and latestBlock
nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1))
block, err := b.getBlockWithTimeout(ctx, logger, nextBlockNumber, desiredFinality)
if err != nil {
logger.Error("failed to fetch next block",
zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to fetch next block (%d): %w", nextBlockNumber.Uint64(), err)
}

finalized, err := b.isBlockFinalizedWithTimeout(ctx, block)
if err != nil {
logger.Error("failed to check block finalization",
zap.Uint64("block", block.Number.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err)
}
// First see if there might be some newly finalized ones to publish
var block *NewBlock
if newLatest.Number.Cmp(prevLatest.Number) > 0 {
// If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches.
newBlockNum := newLatest.Number.Uint64()
for blockNum := prevLatest.Number.Uint64() + 1; blockNum < newBlockNum; blockNum++ {
block, err = getBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Latest)
if err != nil {
err = fmt.Errorf("failed to get gap block: %w", err)
newLatest = prevLatest
newFinalized = prevFinalized
return
}

if !finalized {
break
b.blockFeed.Send(block)
}

b.blockFeed.Send(block)
lastPublishedBlock = block
b.blockFeed.Send(newLatest)
}

return
}
newFinalized = prevFinalized
if newLatest.Number.Cmp(prevFinalized.Number) > 0 {
var finalized bool
// If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches.
newBlockNum := newLatest.Number.Uint64()
for blockNum := prevFinalized.Number.Uint64() + 1; blockNum <= newBlockNum; blockNum++ {
block, err = getBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Finalized)
if err != nil {
err = fmt.Errorf("failed to get gap block: %w", err)
newLatest = prevLatest
newFinalized = prevFinalized
return
}

func (b *BlockPollConnector) getBlockWithTimeout(ctx context.Context, logger *zap.Logger, blockNumber *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return b.getBlock(timeout, logger, blockNumber, desiredFinality)
}
finalized, err = b.isBlockFinalized(ctx, block)
if err != nil {
err = fmt.Errorf("failed to check finality on block: %w", err)
newLatest = prevLatest
newFinalized = prevFinalized
return
}

func (b *BlockPollConnector) isBlockFinalizedWithTimeout(ctx context.Context, block *NewBlock) (bool, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return b.finalizer.IsBlockFinalized(timeout, block)
}
if !finalized {
break
}

func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) {
sub := NewPollSubscription()
blockSub := b.blockFeed.Subscribe(sink)
b.blockFeed.Send(&NewBlock{
Number: block.Number,
Hash: block.Hash,
L1BlockNumber: block.L1BlockNumber,
Finality: Safe,
})

b.blockFeed.Send(&NewBlock{
Number: block.Number,
Hash: block.Hash,
L1BlockNumber: block.L1BlockNumber,
Finality: Finalized,
})

newFinalized = block
}
}

// The feed library does not support error forwarding, so we're emulating that using a custom subscription and
// an error feed. The feed library can't handle interfaces which is why we post strings.
innerErrSink := make(chan string, 10)
innerErrSub := b.errFeed.Subscribe(innerErrSink)
return
}

common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
return nil
case <-sub.quit:
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
}
}
})
return sub, nil
func getBlockByNumberUint64(ctx context.Context, logger *zap.Logger, conn Connector, blockNum uint64, desiredFinality FinalityLevel) (*NewBlock, error) {
return getBlockByTag(ctx, logger, conn, "0x"+fmt.Sprintf("%x", blockNum), desiredFinality)
}

func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) {
return getBlock(ctx, logger, b.Connector, number, b.useFinalized, desiredFinality)
func getBlockByNumberBigInt(ctx context.Context, logger *zap.Logger, conn Connector, blockNum *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) {
return getBlockByTag(ctx, logger, conn, ethHexUtils.EncodeBig(blockNum), desiredFinality)
}

// getBlock is a free function that can be called from other connectors to get a single block.
func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool, desiredFinality FinalityLevel) (*NewBlock, error) {
var numStr string
if number != nil {
numStr = ethHexUtils.EncodeBig(number)
} else if useFinalized {
if desiredFinality == Safe {
numStr = "safe"
} else if desiredFinality == Finalized {
numStr = "finalized"
} else if desiredFinality == Latest {
numStr = "latest"
} else {
panic("invalid finality level")
}
} else {
numStr = "latest"
}
func getBlockByTag(ctx context.Context, logger *zap.Logger, conn Connector, tag string, desiredFinality FinalityLevel) (*NewBlock, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

var m BlockMarshaller
err := conn.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false)
err := conn.RawCallContext(timeout, &m, "eth_getBlockByNumber", tag, false)
if err != nil {
logger.Error("failed to get block",
zap.String("requested_block", numStr), zap.Error(err))
zap.String("requested_block", tag), zap.Error(err))
return nil, err
}
if m.Number == nil {
logger.Error("failed to unmarshal block",
zap.String("requested_block", numStr),
zap.String("requested_block", tag),
)
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
Expand All @@ -255,3 +237,9 @@ func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *b
Finality: desiredFinality,
}, nil
}

func (b *BlockPollConnector) isBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return b.finalizer.IsBlockFinalized(timeout, block)
}
Loading

0 comments on commit ecd3a46

Please sign in to comment.