Skip to content

Commit

Permalink
Make finality an enum rather than a bool
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Oct 27, 2023
1 parent ab4e415 commit 9b51f0c
Show file tree
Hide file tree
Showing 13 changed files with 754 additions and 718 deletions.
18 changes: 8 additions & 10 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,19 +1089,17 @@ func runNode(cmd *cobra.Command, args []string) {
}

if shouldStart(polygonRPC) {
// Checkpointing is required in mainnet, so we don't need to wait for confirmations.
waitForConfirmations := *unsafeDevMode || *testnetMode
if !waitForConfirmations && *polygonRootChainRpc == "" {
// Checkpointing is required in mainnet and testnet.
if !*unsafeDevMode && *polygonRootChainRpc == "" {
log.Fatal("Polygon checkpointing is required in mainnet")
}
wc := &evm.WatcherConfig{
NetworkID: "polygon",
ChainID: vaa.ChainIDPolygon,
Rpc: *polygonRPC,
Contract: *polygonContract,
WaitForConfirmations: waitForConfirmations,
RootChainRpc: *polygonRootChainRpc,
RootChainContract: *polygonRootChainContractAddress,
NetworkID: "polygon",
ChainID: vaa.ChainIDPolygon,
Rpc: *polygonRPC,
Contract: *polygonContract,
RootChainRpc: *polygonRootChainRpc,
RootChainContract: *polygonRootChainContractAddress,
}

watcherConfigs = append(watcherConfigs, wc)
Expand Down
5 changes: 5 additions & 0 deletions node/pkg/adminrpc/adminserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
ethRpc "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,6 +74,10 @@ func (m mockEVMConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.Ba
panic("unimplemented")
}

func (c mockEVMConnector) Client() *ethclient.Client {
panic("unimplemented")
}

func generateGS(num int) (keys []*ecdsa.PrivateKey, addrs []common.Address) {
for i := 0; i < num; i++ {
key, err := ethcrypto.GenerateKey()
Expand Down
329 changes: 176 additions & 153 deletions node/pkg/proto/gossip/v1/gossip.pb.go

Large diffs are not rendered by default.

283 changes: 283 additions & 0 deletions node/pkg/watchers/evm/connectors/batch_poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package connectors

import (
"context"
"fmt"
"math/big"
"time"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/supervisor"
ethEvent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"

ethereum "github.com/ethereum/go-ethereum"

"go.uber.org/zap"
)

// BatchPollConnector uses batch requests to poll for latest, safe and finalized blocks.
type BatchPollConnector struct {
Connector
Delay time.Duration
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
batchData []BatchEntry
}

type (
Blocks []*NewBlock

BatchEntry struct {
tag string
finality FinalityLevel
}

BatchResult struct {
result BlockMarshaller
err error
}
)

const MAX_GAP_BATCH_SIZE uint64 = 5

func NewBatchPollConnector(ctx context.Context, baseConnector Connector, delay time.Duration) (*BatchPollConnector, error) {
// Create the batch data in the order we want to report them to the watcher, so finalized is most important, latest is least.
batchData := []BatchEntry{
{tag: "finalized", finality: Finalized},
{tag: "safe", finality: Safe},
{tag: "latest", finality: Latest},
}

connector := &BatchPollConnector{
Connector: baseConnector,
Delay: delay,
batchData: batchData,
}
err := supervisor.Run(ctx, "batchPoller", common.WrapWithScissors(connector.runFromSupervisor, "batchPoller"))
if err != nil {
return nil, err
}
return connector, nil
}

func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) {
sub := NewPollSubscription()
blockSub := b.blockFeed.Subscribe(sink)

// The feed library does not support error forwarding, so we're emulating that using a custom subscription and
// an error feed. The feed library can't handle interfaces which is why we post strings.
innerErrSink := make(chan string, 10)
innerErrSub := b.errFeed.Subscribe(innerErrSink)

common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
return nil
case <-sub.quit:
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
}
}
})
return sub, nil
}

func (b *BatchPollConnector) runFromSupervisor(ctx context.Context) error {
logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName()))
supervisor.Signal(ctx, supervisor.SignalHealthy)
return b.run(ctx, logger)
}

func (b *BatchPollConnector) run(ctx context.Context, logger *zap.Logger) error {
// Get the initial blocks.
lastBlocks, err := b.getBlocks(ctx, logger)
if err != nil {
return err
}

timer := time.NewTimer(b.Delay)
defer timer.Stop()

errCount := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
lastBlocks, err = b.pollBlocks(ctx, logger, lastBlocks)
if err != nil {
errCount++
if errCount > 3 {
logger.Error("batch polling encountered an error", zap.Int("errCount", errCount), zap.Error(err))
b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err))
return err
}
} else {
errCount = 0
}

timer.Reset(b.Delay)
}
}
}

// pollBlocks polls for the latest blocks (finalized, safe and latest), compares them to the last ones, and publishes any new ones.
// In the case of an error, it returns the last blocks that were passed in, otherwise it returns the new blocks.
func (b *BatchPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, prevBlocks Blocks) (Blocks, error) {
newBlocks, err := b.getBlocks(ctx, logger)
if err != nil {
return prevBlocks, err
}

if len(newBlocks) != len(prevBlocks) {
panic(fmt.Sprintf("getBlocks returned %d entries when there should be %d", len(newBlocks), len(prevBlocks)))
}

for idx, newBlock := range newBlocks {
if newBlock.Number.Cmp(prevBlocks[idx].Number) > 0 {
// If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches.
newBlockNum := newBlock.Number.Uint64()
blockNum := prevBlocks[idx].Number.Uint64() + 1
for blockNum < newBlockNum {
batchSize := newBlockNum - blockNum
if batchSize > MAX_GAP_BATCH_SIZE {
batchSize = MAX_GAP_BATCH_SIZE
}
gapBlocks, err := b.getBlockRange(ctx, logger, blockNum, batchSize, b.batchData[idx].finality)
if err != nil {
return prevBlocks, fmt.Errorf("failed to get gap blocks: %w", err)
}
for _, block := range gapBlocks {
b.blockFeed.Send(block)
}
blockNum += batchSize
}

b.blockFeed.Send(newBlock)
}
}

return newBlocks, nil
}

// getBlocks gets the current batch of configured blocks (finalized, safe, latest).
func (b *BatchPollConnector) getBlocks(ctx context.Context, logger *zap.Logger) (Blocks, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

batch := make([]rpc.BatchElem, len(b.batchData))
results := make([]BatchResult, len(b.batchData))
for idx, bd := range b.batchData {
batch[idx] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
bd.tag,
false, // no full transaction details
},
Result: &results[idx].result,
Error: results[idx].err,
}
}

err := b.Connector.RawBatchCallContext(timeout, batch)
if err != nil {
logger.Error("failed to get blocks", zap.Error(err))
return nil, err
}

ret := make(Blocks, len(b.batchData))
for idx, result := range results {
finality := b.batchData[idx].finality
if result.err != nil {
logger.Error("failed to get block", zap.Stringer("finality", finality), zap.Error(result.err))
return nil, err
}

m := &result.result
if m.Number == nil {
logger.Error("failed to unmarshal block: Number is nil", zap.Stringer("finality", finality), zap.String("tag", b.batchData[idx].tag))
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
n := big.Int(*m.Number)

var l1bn *big.Int
if m.L1BlockNumber != nil {
bn := big.Int(*m.L1BlockNumber)
l1bn = &bn
}

ret[idx] = &NewBlock{
Number: &n,
Hash: m.Hash,
L1BlockNumber: l1bn,
Finality: finality,
}
}

return ret, nil
}

// getBlockRange gets a range of blocks, starting at blockNum, including the next numBlocks. It passes back an array of those blocks.
func (b *BatchPollConnector) getBlockRange(ctx context.Context, logger *zap.Logger, blockNum uint64, numBlocks uint64, finality FinalityLevel) (Blocks, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

batch := make([]rpc.BatchElem, numBlocks)
results := make([]BatchResult, numBlocks)
for idx := 0; idx < int(numBlocks); idx++ {
batch[idx] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
"0x" + fmt.Sprintf("%x", blockNum),
false, // no full transaction details
},
Result: &results[idx].result,
Error: results[idx].err,
}
blockNum++
}

err := b.Connector.RawBatchCallContext(timeout, batch)
if err != nil {
logger.Error("failed to get blocks", zap.Error(err))
return nil, err
}

ret := make(Blocks, numBlocks)
for idx, result := range results {
if result.err != nil {
logger.Error("failed to get block", zap.Int("idx", idx), zap.Stringer("finality", finality), zap.Error(result.err))
return nil, err
}

m := &result.result
if m.Number == nil {
logger.Error("failed to unmarshal block: Number is nil")
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
n := big.Int(*m.Number)

var l1bn *big.Int
if m.L1BlockNumber != nil {
bn := big.Int(*m.L1BlockNumber)
l1bn = &bn
}

ret[idx] = &NewBlock{
Number: &n,
Hash: m.Hash,
L1BlockNumber: l1bn,
Finality: finality,
}
}

return ret, nil
}
21 changes: 19 additions & 2 deletions node/pkg/watchers/evm/connectors/celo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
ethRpc "github.com/ethereum/go-ethereum/rpc"

Expand Down Expand Up @@ -167,9 +168,21 @@ func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, errC chan error,
c.logger.Error("new header block number is nil")
continue
}
hash := ethCommon.BytesToHash(ev.Hash().Bytes())
sink <- &NewBlock{
Number: ev.Number,
Hash: ethCommon.BytesToHash(ev.Hash().Bytes()),
Number: ev.Number,
Hash: hash,
Finality: Finalized,
}
sink <- &NewBlock{
Number: ev.Number,
Hash: hash,
Finality: Safe,
}
sink <- &NewBlock{
Number: ev.Number,
Hash: hash,
Finality: Latest,
}
}
}
Expand All @@ -195,6 +208,10 @@ func (c *CeloConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.Batc
return c.rawClient.BatchCallContext(ctx, celoB)
}

func (c *CeloConnector) Client() *ethClient.Client {
panic("unimplemented")
}

func convertCeloEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished {
return &ethAbi.AbiLogMessagePublished{
Sender: ethCommon.BytesToAddress(ev.Sender.Bytes()),
Expand Down
Loading

0 comments on commit 9b51f0c

Please sign in to comment.