diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index e31c75a151..19b37788d6 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -8,10 +8,10 @@ import ( "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/supervisor" - ethEvent "github.com/ethereum/go-ethereum/event" ethereum "github.com/ethereum/go-ethereum" ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil" + ethEvent "github.com/ethereum/go-ethereum/event" "go.uber.org/zap" ) @@ -24,27 +24,20 @@ type PollFinalizer interface { // finalizer which will be used to only return finalized blocks on subscriptions. type BlockPollConnector struct { Connector - Delay time.Duration - useFinalized bool - publishSafeBlocks bool - finalizer PollFinalizer - blockFeed ethEvent.Feed - errFeed ethEvent.Feed + Delay time.Duration + finalizer PollFinalizer + blockFeed ethEvent.Feed + errFeed ethEvent.Feed } -func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool, publishSafeBlocks bool) (*BlockPollConnector, error) { - if publishSafeBlocks && !useFinalized { - return nil, fmt.Errorf("publishSafeBlocks may only be enabled if useFinalized is enabled") - } +func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration) (*BlockPollConnector, error) { if finalizer == nil { - return nil, fmt.Errorf("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.") + panic("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.") } connector := &BlockPollConnector{ - Connector: baseConnector, - Delay: delay, - useFinalized: useFinalized, - publishSafeBlocks: publishSafeBlocks, - finalizer: finalizer, + Connector: baseConnector, + Delay: delay, + finalizer: finalizer, } err := supervisor.Run(ctx, "blockPoller", common.WrapWithScissors(connector.runFromSupervisor, "blockPoller")) if err != nil { @@ -53,6 +46,35 @@ func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finaliz return connector, nil } +func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { + sub := NewPollSubscription() + blockSub := b.blockFeed.Subscribe(sink) + + // The feed library does not support error forwarding, so we're emulating that using a custom subscription and + // an error feed. The feed library can't handle interfaces which is why we post strings. + innerErrSink := make(chan string, 10) + innerErrSub := b.errFeed.Subscribe(innerErrSink) + + common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + return nil + case <-sub.quit: + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + sub.unsubDone <- struct{}{} + return nil + case v := <-innerErrSink: + sub.err <- fmt.Errorf(v) + } + } + }) + return sub, nil +} + func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error { logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName())) supervisor.Signal(ctx, supervisor.SignalHealthy) @@ -60,183 +82,143 @@ func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error { } func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error { - lastBlock, err := b.getBlock(ctx, logger, nil, Finalized) + prevLatest, err := getBlockByTag(ctx, logger, b.Connector, "latest", Latest) if err != nil { return err } - var lastSafeBlock *NewBlock - if b.publishSafeBlocks { - lastSafeBlock, err = b.getBlock(ctx, logger, nil, Safe) - if err != nil { - return err - } + prevFinalized := &NewBlock{ + Number: prevLatest.Number, + Hash: prevLatest.Hash, + Finality: Finalized, } - timer := time.NewTimer(time.Millisecond) // Start immediately. + timer := time.NewTimer(b.Delay) + defer timer.Stop() + errCount := 0 for { select { case <-ctx.Done(): - timer.Stop() return ctx.Err() case <-timer.C: - for count := 0; count < 3; count++ { - lastBlock, err = b.pollBlocks(ctx, logger, lastBlock, Finalized) - if err == nil { - break - } - logger.Error("polling of block encountered an error", zap.Error(err)) - - // Wait an interval before trying again. We stay in this loop so that we - // try up to three times before causing the watcher to restart. - time.Sleep(b.Delay) - } - - if err == nil && b.publishSafeBlocks { - for count := 0; count < 3; count++ { - lastSafeBlock, err = b.pollBlocks(ctx, logger, lastSafeBlock, Safe) - if err == nil { - break - } - logger.Error("polling of safe block encountered an error", zap.Error(err)) - - // Same wait as above. - time.Sleep(b.Delay) + prevLatest, prevFinalized, err = b.pollBlock(ctx, logger, prevLatest, prevFinalized) + if err != nil { + errCount++ + if errCount > 3 { + logger.Error("polling encountered an error", zap.Int("errCount", errCount), zap.Error(err)) + b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err)) + return err } + } else { + errCount = 0 } - if err != nil { - b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err)) - } timer.Reset(b.Delay) } } } -func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock, desiredFinality FinalityLevel) (lastPublishedBlock *NewBlock, retErr error) { - // Some of the testnet providers (like the one we are using for Arbitrum) limit how many transactions we can do. When that happens, the call hangs. - // Use a timeout so that the call will fail and the runable will get restarted. This should not happen in mainnet, but if it does, we will need to - // investigate why the runable is dying and fix the underlying problem. - - lastPublishedBlock = lastBlock - - // Fetch the latest block on the chain - // We could do this on every iteration such that if a new block is created while this function is being executed, - // it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration. - latestBlock, err := b.getBlockWithTimeout(ctx, logger, nil, desiredFinality) +// pollBlock poll for the latest block, compares them to the last one, and publishes any new ones. +// In the case of an error, it returns the last block that were passed in, otherwise it returns the new block. +func (b *BlockPollConnector) pollBlock(ctx context.Context, logger *zap.Logger, prevLatest *NewBlock, prevFinalized *NewBlock) (newLatest *NewBlock, newFinalized *NewBlock, err error) { + newLatest, err = getBlockByTag(ctx, logger, b.Connector, "latest", Latest) if err != nil { - logger.Error("failed to look up latest block", - zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err)) - return lastPublishedBlock, fmt.Errorf("failed to look up latest block: %w", err) + err = fmt.Errorf("failed to get latest block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return } - for { - if lastPublishedBlock.Number.Cmp(latestBlock.Number) >= 0 { - // We have to wait for a new block to become available - return - } - // Try to fetch the next block between lastBlock and latestBlock - nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1)) - block, err := b.getBlockWithTimeout(ctx, logger, nextBlockNumber, desiredFinality) - if err != nil { - logger.Error("failed to fetch next block", - zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err)) - return lastPublishedBlock, fmt.Errorf("failed to fetch next block (%d): %w", nextBlockNumber.Uint64(), err) - } - - finalized, err := b.isBlockFinalizedWithTimeout(ctx, block) - if err != nil { - logger.Error("failed to check block finalization", - zap.Uint64("block", block.Number.Uint64()), zap.Error(err)) - return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err) - } + // First see if there might be some newly finalized ones to publish + var block *NewBlock + if newLatest.Number.Cmp(prevLatest.Number) > 0 { + // If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches. + newBlockNum := newLatest.Number.Uint64() + for blockNum := prevLatest.Number.Uint64() + 1; blockNum < newBlockNum; blockNum++ { + block, err = getBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Latest) + if err != nil { + err = fmt.Errorf("failed to get gap block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return + } - if !finalized { - break + b.blockFeed.Send(block) } - b.blockFeed.Send(block) - lastPublishedBlock = block + b.blockFeed.Send(newLatest) } - return -} + newFinalized = prevFinalized + if newLatest.Number.Cmp(prevFinalized.Number) > 0 { + var finalized bool + // If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches. + newBlockNum := newLatest.Number.Uint64() + for blockNum := prevFinalized.Number.Uint64() + 1; blockNum <= newBlockNum; blockNum++ { + block, err = getBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Finalized) + if err != nil { + err = fmt.Errorf("failed to get gap block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return + } -func (b *BlockPollConnector) getBlockWithTimeout(ctx context.Context, logger *zap.Logger, blockNumber *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - return b.getBlock(timeout, logger, blockNumber, desiredFinality) -} + finalized, err = b.isBlockFinalized(ctx, block) + if err != nil { + err = fmt.Errorf("failed to check finality on block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return + } -func (b *BlockPollConnector) isBlockFinalizedWithTimeout(ctx context.Context, block *NewBlock) (bool, error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - return b.finalizer.IsBlockFinalized(timeout, block) -} + if !finalized { + break + } -func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { - sub := NewPollSubscription() - blockSub := b.blockFeed.Subscribe(sink) + b.blockFeed.Send(&NewBlock{ + Number: block.Number, + Hash: block.Hash, + L1BlockNumber: block.L1BlockNumber, + Finality: Safe, + }) + + b.blockFeed.Send(&NewBlock{ + Number: block.Number, + Hash: block.Hash, + L1BlockNumber: block.L1BlockNumber, + Finality: Finalized, + }) + + newFinalized = block + } + } - // The feed library does not support error forwarding, so we're emulating that using a custom subscription and - // an error feed. The feed library can't handle interfaces which is why we post strings. - innerErrSink := make(chan string, 10) - innerErrSub := b.errFeed.Subscribe(innerErrSink) + return +} - common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - blockSub.Unsubscribe() - innerErrSub.Unsubscribe() - return nil - case <-sub.quit: - blockSub.Unsubscribe() - innerErrSub.Unsubscribe() - sub.unsubDone <- struct{}{} - return nil - case v := <-innerErrSink: - sub.err <- fmt.Errorf(v) - } - } - }) - return sub, nil +func getBlockByNumberUint64(ctx context.Context, logger *zap.Logger, conn Connector, blockNum uint64, desiredFinality FinalityLevel) (*NewBlock, error) { + return getBlockByTag(ctx, logger, conn, "0x"+fmt.Sprintf("%x", blockNum), desiredFinality) } -func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) { - return getBlock(ctx, logger, b.Connector, number, b.useFinalized, desiredFinality) +func getBlockByNumberBigInt(ctx context.Context, logger *zap.Logger, conn Connector, blockNum *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) { + return getBlockByTag(ctx, logger, conn, ethHexUtils.EncodeBig(blockNum), desiredFinality) } -// getBlock is a free function that can be called from other connectors to get a single block. -func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool, desiredFinality FinalityLevel) (*NewBlock, error) { - var numStr string - if number != nil { - numStr = ethHexUtils.EncodeBig(number) - } else if useFinalized { - if desiredFinality == Safe { - numStr = "safe" - } else if desiredFinality == Finalized { - numStr = "finalized" - } else if desiredFinality == Latest { - numStr = "latest" - } else { - panic("invalid finality level") - } - } else { - numStr = "latest" - } +func getBlockByTag(ctx context.Context, logger *zap.Logger, conn Connector, tag string, desiredFinality FinalityLevel) (*NewBlock, error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() var m BlockMarshaller - err := conn.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false) + err := conn.RawCallContext(timeout, &m, "eth_getBlockByNumber", tag, false) if err != nil { logger.Error("failed to get block", - zap.String("requested_block", numStr), zap.Error(err)) + zap.String("requested_block", tag), zap.Error(err)) return nil, err } if m.Number == nil { logger.Error("failed to unmarshal block", - zap.String("requested_block", numStr), + zap.String("requested_block", tag), ) return nil, fmt.Errorf("failed to unmarshal block: Number is nil") } @@ -255,3 +237,9 @@ func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *b Finality: desiredFinality, }, nil } + +func (b *BlockPollConnector) isBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + return b.finalizer.IsBlockFinalized(timeout, block) +} diff --git a/node/pkg/watchers/evm/connectors/poller_test.go b/node/pkg/watchers/evm/connectors/poller_test.go deleted file mode 100644 index ef2e1cc103..0000000000 --- a/node/pkg/watchers/evm/connectors/poller_test.go +++ /dev/null @@ -1,377 +0,0 @@ -package connectors - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.uber.org/zap" - - ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" - - ethereum "github.com/ethereum/go-ethereum" - ethCommon "github.com/ethereum/go-ethereum/common" - ethTypes "github.com/ethereum/go-ethereum/core/types" - ethClient "github.com/ethereum/go-ethereum/ethclient" - ethEvent "github.com/ethereum/go-ethereum/event" - ethRpc "github.com/ethereum/go-ethereum/rpc" -) - -// mockConnectorForPoller implements the connector interface for testing purposes. -type mockConnectorForPoller struct { - address ethCommon.Address - client *ethClient.Client - mutex sync.Mutex - err error - persistentError bool - blockNumber uint64 -} - -// setError takes an error which will be returned on the next RPC call. The error will persist until cleared. -func (m *mockConnectorForPoller) setError(err error) { - m.mutex.Lock() - m.err = err - m.persistentError = true - m.mutex.Unlock() -} - -// setSingleError takes an error which will be returned on the next RPC call. After that, the error is reset to nil. -func (m *mockConnectorForPoller) setSingleError(err error) { - m.mutex.Lock() - m.err = err - m.persistentError = false - m.mutex.Unlock() -} - -func (e *mockConnectorForPoller) NetworkName() string { - return "mockConnectorForPoller" -} - -func (e *mockConnectorForPoller) ContractAddress() ethCommon.Address { - return e.address -} - -func (e *mockConnectorForPoller) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { - return 0, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { - return ethAbi.StructsGuardianSet{}, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { - var s ethEvent.Subscription - return s, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { - return nil, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { - return 0, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { - return nil, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { - var s ethEvent.Subscription - return s, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) (err error) { - if method != "eth_getBlockByNumber" { - panic("method not implemented by mockConnectorForPoller") - } - - e.mutex.Lock() - // If they set the error, return that immediately. - if e.err != nil { - err = e.err - if !e.persistentError { - e.err = nil - } - } else { - str := fmt.Sprintf(`{"author":"0x24c275f0719fdaec6356c4eb9f39ecb9c4d37ce1","baseFeePerGas":"0x3b9aca00","difficulty":"0x0","extraData":"0x","gasLimit":"0xe4e1c0","gasUsed":"0x0","hash":"0xfc8b62a31110121c57cfcccfaf2b147cc2c13b6d01bde4737846cefd29f045cf","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x24c275f0719fdaec6356c4eb9f39ecb9c4d37ce1","nonce":"0x0000000000000000","number":"0x%x","parentHash":"0x09d6d33a658b712f41db7fb9f775f94911ae0132123116aa4f8cf3da9f774e89","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x201","stateRoot":"0x0409ed10e03fd49424ae1489c6fbc6ff1897f45d0e214655ebdb8df94eedc3c0","timestamp":"0x6373ec24","totalDifficulty":"0x0","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`, e.blockNumber) - err = json.Unmarshal([]byte(str), &result) - } - e.mutex.Unlock() - - return -} - -func (e *mockConnectorForPoller) RawBatchCallContext(ctx context.Context, b []ethRpc.BatchElem) error { - panic("method not implemented by mockConnectorForPoller") -} - -func (e *mockConnectorForPoller) setBlockNumber(blockNumber uint64) { - e.mutex.Lock() - e.blockNumber = blockNumber - e.mutex.Unlock() -} - -func (e *mockConnectorForPoller) expectedHash() ethCommon.Hash { - return ethCommon.HexToHash("0xfc8b62a31110121c57cfcccfaf2b147cc2c13b6d01bde4737846cefd29f045cf") -} - -func (e *mockConnectorForPoller) Client() *ethClient.Client { - return e.client -} - -type mockFinalizerForPoller struct { - mutex sync.Mutex - finalized bool -} - -func newMockFinalizerForPoller(initialState bool) *mockFinalizerForPoller { - return &mockFinalizerForPoller{finalized: initialState} -} - -func (f *mockFinalizerForPoller) setFinalized(finalized bool) { - f.mutex.Lock() - defer f.mutex.Unlock() - f.finalized = finalized -} - -func (f *mockFinalizerForPoller) IsBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - return f.finalized, nil -} - -// TestBlockPoller is one big, ugly test because of all the set up required. -func TestBlockPoller(t *testing.T) { - ctx := context.Background() - logger := zap.NewNop() - baseConnector := mockConnectorForPoller{} - - finalizer := newMockFinalizerForPoller(true) // Start by assuming blocks are finalized. - assert.NotNil(t, finalizer) - - poller := &BlockPollConnector{ - Connector: &baseConnector, - Delay: 1 * time.Millisecond, - useFinalized: false, - finalizer: finalizer, - } - - // Set the starting block. - baseConnector.setBlockNumber(0x309a0c) - - // The go routines will post results here. - var mutex sync.Mutex - var block *NewBlock - var err error - var pollerStatus int - - const pollerRunning = 1 - const pollerExited = 2 - - // Start the poller running. - go func() { - mutex.Lock() - pollerStatus = pollerRunning - mutex.Unlock() - err := poller.run(ctx, logger) - require.NoError(t, err) - mutex.Lock() - pollerStatus = pollerExited - mutex.Unlock() - }() - - // Subscribe for events to be processed by our go routine. - headSink := make(chan *NewBlock, 2) - errC := make(chan error) - - headerSubscription, suberr := poller.SubscribeForBlocks(ctx, errC, headSink) - require.NoError(t, suberr) - - go func() { - for { - select { - case <-ctx.Done(): - return - case thisErr := <-errC: - mutex.Lock() - err = thisErr - mutex.Unlock() - case thisErr := <-headerSubscription.Err(): - mutex.Lock() - err = thisErr - mutex.Unlock() - case thisBlock := <-headSink: - require.NotNil(t, thisBlock) - mutex.Lock() - block = thisBlock - mutex.Unlock() - } - } - }() - - // First sleep a bit and make sure there were no start up errors. - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - assert.Nil(t, block) - mutex.Unlock() - - // Post the first new block and verify we get it. - baseConnector.setBlockNumber(0x309a0d) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a0d), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Sleep some more and verify we don't see any more blocks, since we haven't posted a new one. - baseConnector.setBlockNumber(0x309a0d) - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.Nil(t, block) - mutex.Unlock() - - // Post the next block and verify we get it. - baseConnector.setBlockNumber(0x309a0e) - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a0e), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Post the next block but mark it as not finalized, so we shouldn't see it yet. - mutex.Lock() - finalizer.setFinalized(false) - baseConnector.setBlockNumber(0x309a0f) - mutex.Unlock() - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.Nil(t, block) - mutex.Unlock() - - // Once it goes finalized we should see it. - mutex.Lock() - finalizer.setFinalized(true) - mutex.Unlock() - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a0f), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // An RPC error should be returned to us. - err = nil - baseConnector.setError(fmt.Errorf("RPC failed")) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - assert.Error(t, err) - assert.Nil(t, block) - baseConnector.setError(nil) - err = nil - mutex.Unlock() - - // Post the next block and verify we get it (so we survived the RPC error). - baseConnector.setBlockNumber(0x309a10) - - // There may be a few errors already queued up. Loop for a bit before we give up. - success := false - for count := 0; (count < 20) && (!success); count++ { - time.Sleep(10 * time.Millisecond) - mutex.Lock() - if err == nil { - success = true - } else { - err = nil - } - mutex.Unlock() - } - require.True(t, success) - - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a10), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Post an old block and we should not hear about it. - baseConnector.setBlockNumber(0x309a0c) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.Nil(t, block) - mutex.Unlock() - - // But we should keep going when we get a new one. - baseConnector.setBlockNumber(0x309a11) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a11), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // If there's a gap in the blocks, we should keep going. - baseConnector.setBlockNumber(0x309a13) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a13), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Should retry on a transient error and be able to continue. - baseConnector.setSingleError(fmt.Errorf("RPC failed")) - baseConnector.setBlockNumber(0x309a14) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a14), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() -} diff --git a/node/pkg/watchers/evm/connectors/polygon.go b/node/pkg/watchers/evm/connectors/polygon.go index 261d4c98dd..83d970f894 100644 --- a/node/pkg/watchers/evm/connectors/polygon.go +++ b/node/pkg/watchers/evm/connectors/polygon.go @@ -179,7 +179,7 @@ func (c *PolygonConnector) postBlock(ctx context.Context, blockNum *big.Int, sin return fmt.Errorf("blockNum is nil") } - block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false, Finalized) + block, err := getBlockByNumberBigInt(ctx, c.logger, c.Connector, blockNum, Finalized) if err != nil { return fmt.Errorf("failed to get block %s: %w", blockNum.String(), err) } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index d94923079e..26b65ac4fb 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -265,7 +265,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } finalizer := finalizers.NewNeonFinalizer(logger, w.l1Finalizer) - pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) + pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)