From 9b51f0c1d064b6ce3ad24cb50cc6d7925e101268 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Wed, 25 Oct 2023 11:51:56 -0500 Subject: [PATCH] Make finality an enum rather than a bool --- node/cmd/guardiand/node.go | 18 +- node/pkg/adminrpc/adminserver_test.go | 5 + node/pkg/proto/gossip/v1/gossip.pb.go | 329 ++++++++------- .../watchers/evm/connectors/batch_poller.go | 283 +++++++++++++ node/pkg/watchers/evm/connectors/celo.go | 21 +- node/pkg/watchers/evm/connectors/common.go | 13 +- node/pkg/watchers/evm/connectors/ethereum.go | 15 +- node/pkg/watchers/evm/connectors/finality.go | 26 ++ node/pkg/watchers/evm/connectors/poller.go | 272 ++++++------- .../watchers/evm/connectors/poller_test.go | 377 ------------------ node/pkg/watchers/evm/connectors/polygon.go | 29 +- node/pkg/watchers/evm/watcher.go | 80 ++-- proto/gossip/v1/gossip.proto | 4 + 13 files changed, 754 insertions(+), 718 deletions(-) create mode 100644 node/pkg/watchers/evm/connectors/batch_poller.go create mode 100644 node/pkg/watchers/evm/connectors/finality.go delete mode 100644 node/pkg/watchers/evm/connectors/poller_test.go diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index d17f46d716..ee54ef1261 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -1089,19 +1089,17 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(polygonRPC) { - // Checkpointing is required in mainnet, so we don't need to wait for confirmations. - waitForConfirmations := *unsafeDevMode || *testnetMode - if !waitForConfirmations && *polygonRootChainRpc == "" { + // Checkpointing is required in mainnet and testnet. + if !*unsafeDevMode && *polygonRootChainRpc == "" { log.Fatal("Polygon checkpointing is required in mainnet") } wc := &evm.WatcherConfig{ - NetworkID: "polygon", - ChainID: vaa.ChainIDPolygon, - Rpc: *polygonRPC, - Contract: *polygonContract, - WaitForConfirmations: waitForConfirmations, - RootChainRpc: *polygonRootChainRpc, - RootChainContract: *polygonRootChainContractAddress, + NetworkID: "polygon", + ChainID: vaa.ChainIDPolygon, + Rpc: *polygonRPC, + Contract: *polygonContract, + RootChainRpc: *polygonRootChainRpc, + RootChainContract: *polygonRootChainContractAddress, } watcherConfigs = append(watcherConfigs, wc) diff --git a/node/pkg/adminrpc/adminserver_test.go b/node/pkg/adminrpc/adminserver_test.go index 7426f842d3..78c7fa1e70 100644 --- a/node/pkg/adminrpc/adminserver_test.go +++ b/node/pkg/adminrpc/adminserver_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ethcrypto "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" ethRpc "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" @@ -73,6 +74,10 @@ func (m mockEVMConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.Ba panic("unimplemented") } +func (c mockEVMConnector) Client() *ethclient.Client { + panic("unimplemented") +} + func generateGS(num int) (keys []*ecdsa.PrivateKey, addrs []common.Address) { for i := 0; i < num; i++ { key, err := ethcrypto.GenerateKey() diff --git a/node/pkg/proto/gossip/v1/gossip.pb.go b/node/pkg/proto/gossip/v1/gossip.pb.go index 2165fe345d..de59fd6103 100644 --- a/node/pkg/proto/gossip/v1/gossip.pb.go +++ b/node/pkg/proto/gossip/v1/gossip.pb.go @@ -1282,6 +1282,10 @@ type Heartbeat_Network struct { ContractAddress string `protobuf:"bytes,3,opt,name=contract_address,json=contractAddress,proto3" json:"contract_address,omitempty"` // Connection error count ErrorCount uint64 `protobuf:"varint,4,opt,name=error_count,json=errorCount,proto3" json:"error_count,omitempty"` + // Safe block height of the node, if supported. + SafeHeight int64 `protobuf:"varint,5,opt,name=safe_height,json=safeHeight,proto3" json:"safe_height,omitempty"` + // Finalized block height of the node, if supported. + FinalizedHeight int64 `protobuf:"varint,6,opt,name=finalized_height,json=finalizedHeight,proto3" json:"finalized_height,omitempty"` } func (x *Heartbeat_Network) Reset() { @@ -1344,6 +1348,20 @@ func (x *Heartbeat_Network) GetErrorCount() uint64 { return 0 } +func (x *Heartbeat_Network) GetSafeHeight() int64 { + if x != nil { + return x.SafeHeight + } + return 0 +} + +func (x *Heartbeat_Network) GetFinalizedHeight() int64 { + if x != nil { + return x.FinalizedHeight + } + return 0 +} + type ChainGovernorConfig_Chain struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1738,7 +1756,7 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, 0x61, - 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0xbb, 0x03, 0x0a, 0x09, 0x48, 0x65, + 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x88, 0x04, 0x0a, 0x09, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x18, @@ -1758,164 +1776,169 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x70, 0x32, 0x70, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, - 0x32, 0x70, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x1a, 0x7d, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, - 0x6f, 0x72, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x03, 0x52, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x29, 0x0a, 0x10, 0x63, - 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x41, - 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, - 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x91, 0x01, 0x0a, 0x11, 0x53, 0x69, 0x67, 0x6e, - 0x65, 0x64, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, - 0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x64, 0x64, - 0x72, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, - 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, - 0x75, 0x72, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x12, 0x1d, 0x0a, 0x0a, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x27, 0x0a, 0x13, 0x53, - 0x69, 0x67, 0x6e, 0x65, 0x64, 0x56, 0x41, 0x41, 0x57, 0x69, 0x74, 0x68, 0x51, 0x75, 0x6f, 0x72, - 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x76, 0x61, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x03, 0x76, 0x61, 0x61, 0x22, 0x8e, 0x01, 0x0a, 0x18, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x4f, - 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2f, 0x0a, 0x13, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x12, - 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, - 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, - 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, - 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x48, 0x0a, 0x12, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x63, - 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, - 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, - 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x22, - 0xbf, 0x01, 0x0a, 0x16, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4f, + 0x32, 0x70, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x1a, 0xc9, 0x01, 0x0a, 0x07, 0x4e, 0x65, 0x74, + 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x29, 0x0a, 0x10, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, + 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x61, 0x66, 0x65, + 0x5f, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x73, + 0x61, 0x66, 0x65, 0x48, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x29, 0x0a, 0x10, 0x66, 0x69, 0x6e, + 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x5f, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x0f, 0x66, 0x69, 0x6e, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x48, 0x65, + 0x69, 0x67, 0x68, 0x74, 0x22, 0x91, 0x01, 0x0a, 0x11, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, - 0x12, 0x13, 0x0a, 0x05, 0x74, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x04, 0x74, 0x78, 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, - 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, - 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, - 0x69, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, - 0x64, 0x22, 0x98, 0x01, 0x0a, 0x18, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x42, 0x61, 0x74, 0x63, - 0x68, 0x56, 0x41, 0x41, 0x57, 0x69, 0x74, 0x68, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x12, 0x1b, - 0x0a, 0x09, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x76, 0x61, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x56, 0x61, 0x61, 0x12, 0x19, 0x0a, 0x08, 0x63, - 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, - 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x78, 0x5f, 0x69, 0x64, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x74, 0x78, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6e, - 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, - 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x22, 0x76, 0x0a, 0x19, - 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, - 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, - 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, - 0x41, 0x64, 0x64, 0x72, 0x22, 0xd1, 0x03, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, - 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1b, 0x0a, 0x09, - 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, - 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, - 0x74, 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x12, 0x3c, 0x0a, 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, - 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x52, 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x12, - 0x3c, 0x0a, 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x24, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, - 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, - 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x1a, 0x7b, 0x0a, - 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, - 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x6c, 0x69, - 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x6f, - 0x6e, 0x61, 0x6c, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x30, 0x0a, 0x14, 0x62, 0x69, 0x67, 0x5f, - 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x69, 0x7a, 0x65, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x12, 0x62, 0x69, 0x67, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x69, 0x7a, 0x65, 0x1a, 0x6c, 0x0a, 0x05, 0x54, 0x6f, - 0x6b, 0x65, 0x6e, 0x12, 0x26, 0x0a, 0x0f, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x5f, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x6f, 0x72, - 0x69, 0x67, 0x69, 0x6e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x6f, - 0x72, 0x69, 0x67, 0x69, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x65, - 0x73, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x02, 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x22, 0x76, 0x0a, 0x19, 0x53, 0x69, 0x67, 0x6e, - 0x65, 0x64, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x0a, - 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, - 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, - 0x22, 0x98, 0x05, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, - 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, - 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x12, - 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3c, 0x0a, - 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, - 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, - 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x43, 0x68, - 0x61, 0x69, 0x6e, 0x52, 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x1a, 0x8c, 0x01, 0x0a, 0x0b, - 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x41, 0x41, 0x12, 0x1a, 0x0a, 0x08, 0x73, - 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x73, - 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x65, 0x6c, 0x65, 0x61, - 0x73, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x72, - 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x6e, 0x6f, - 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x1a, 0xb3, 0x01, 0x0a, 0x07, 0x45, - 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, - 0x72, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0e, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, - 0x2e, 0x0a, 0x13, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, - 0x64, 0x5f, 0x76, 0x61, 0x61, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, 0x74, 0x6f, - 0x74, 0x61, 0x6c, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x12, - 0x4f, 0x0a, 0x0d, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x76, 0x61, 0x61, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, + 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x22, 0x27, 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, + 0x65, 0x64, 0x56, 0x41, 0x41, 0x57, 0x69, 0x74, 0x68, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x12, + 0x10, 0x0a, 0x03, 0x76, 0x61, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x76, 0x61, + 0x61, 0x22, 0x8e, 0x01, 0x0a, 0x18, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x4f, 0x62, 0x73, 0x65, + 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, + 0x0a, 0x13, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x12, 0x6f, 0x62, 0x73, + 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, + 0x64, 0x72, 0x22, 0x48, 0x0a, 0x12, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x22, 0xbf, 0x01, 0x0a, + 0x16, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4f, 0x62, 0x73, 0x65, + 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, 0x64, 0x72, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x64, 0x64, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x68, + 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, + 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x13, 0x0a, + 0x05, 0x74, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x74, 0x78, + 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x14, 0x0a, + 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6e, 0x6f, + 0x6e, 0x63, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x22, 0x98, + 0x01, 0x0a, 0x18, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x42, 0x61, 0x74, 0x63, 0x68, 0x56, 0x41, + 0x41, 0x57, 0x69, 0x74, 0x68, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x12, 0x1b, 0x0a, 0x09, 0x62, + 0x61, 0x74, 0x63, 0x68, 0x5f, 0x76, 0x61, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, + 0x62, 0x61, 0x74, 0x63, 0x68, 0x56, 0x61, 0x61, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x49, 0x64, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x74, 0x78, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, + 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x19, + 0x0a, 0x08, 0x62, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x62, 0x61, 0x74, 0x63, 0x68, 0x49, 0x64, 0x22, 0x76, 0x0a, 0x19, 0x53, 0x69, 0x67, + 0x6e, 0x65, 0x64, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1c, + 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, + 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, + 0x72, 0x22, 0xd1, 0x03, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, + 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, + 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, + 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, + 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3c, + 0x0a, 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, + 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, + 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, + 0x68, 0x61, 0x69, 0x6e, 0x52, 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x12, 0x3c, 0x0a, 0x06, + 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, + 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, + 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x6f, 0x6b, + 0x65, 0x6e, 0x52, 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x1a, 0x7b, 0x0a, 0x05, 0x43, 0x68, + 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x25, + 0x0a, 0x0e, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, + 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x30, 0x0a, 0x14, 0x62, 0x69, 0x67, 0x5f, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x12, 0x62, 0x69, 0x67, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x53, 0x69, 0x7a, 0x65, 0x1a, 0x6c, 0x0a, 0x05, 0x54, 0x6f, 0x6b, 0x65, 0x6e, + 0x12, 0x26, 0x0a, 0x0f, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x6f, 0x72, 0x69, 0x67, 0x69, + 0x6e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x6f, 0x72, 0x69, 0x67, + 0x69, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0d, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, + 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x02, 0x52, 0x05, + 0x70, 0x72, 0x69, 0x63, 0x65, 0x22, 0x76, 0x0a, 0x19, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x43, + 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, + 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, + 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x98, 0x05, + 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, + 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3c, 0x0a, 0x06, 0x63, 0x68, + 0x61, 0x69, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x6f, 0x73, + 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, + 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, + 0x52, 0x06, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x73, 0x1a, 0x8c, 0x01, 0x0a, 0x0b, 0x45, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x41, 0x41, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, + 0x65, 0x6e, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x5f, + 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x72, 0x65, 0x6c, 0x65, + 0x61, 0x73, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x6e, 0x6f, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x17, + 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, 0x1a, 0xb3, 0x01, 0x0a, 0x07, 0x45, 0x6d, 0x69, 0x74, + 0x74, 0x65, 0x72, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x5f, 0x61, + 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6d, + 0x69, 0x74, 0x74, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x2e, 0x0a, 0x13, + 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x76, + 0x61, 0x61, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, 0x74, 0x6f, 0x74, 0x61, 0x6c, + 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x12, 0x4f, 0x0a, 0x0d, + 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x76, 0x61, 0x61, 0x73, 0x18, 0x03, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, + 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x41, 0x41, 0x52, + 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x1a, 0xa8, 0x01, + 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x49, 0x64, 0x12, 0x40, 0x0a, 0x1c, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x5f, + 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, + 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x1a, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, + 0x69, 0x6e, 0x67, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x6f, 0x74, 0x69, + 0x6f, 0x6e, 0x61, 0x6c, 0x12, 0x42, 0x0a, 0x08, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x73, + 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, - 0x41, 0x41, 0x52, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, - 0x1a, 0xa8, 0x01, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x40, 0x0a, 0x1c, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, - 0x6e, 0x67, 0x5f, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x6f, 0x74, - 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x1a, 0x72, 0x65, 0x6d, - 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x4e, - 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x12, 0x42, 0x0a, 0x08, 0x65, 0x6d, 0x69, 0x74, 0x74, - 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x6f, 0x73, 0x73, - 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, - 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6d, 0x69, 0x74, 0x74, 0x65, - 0x72, 0x52, 0x08, 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x73, 0x22, 0x57, 0x0a, 0x12, 0x53, - 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, - 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x74, 0x75, 0x72, 0x65, 0x22, 0x5a, 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, - 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, - 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, - 0x65, 0x72, 0x74, 0x75, 0x73, 0x6f, 0x6e, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6d, 0x68, 0x6f, 0x6c, - 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2f, 0x76, 0x31, 0x3b, 0x67, 0x6f, 0x73, 0x73, 0x69, - 0x70, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x52, 0x08, + 0x65, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x73, 0x22, 0x57, 0x0a, 0x12, 0x53, 0x69, 0x67, 0x6e, + 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, + 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x22, 0x5a, 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x42, 0x41, 0x5a, + 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x65, 0x72, 0x74, + 0x75, 0x73, 0x6f, 0x6e, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6d, 0x68, 0x6f, 0x6c, 0x65, 0x2f, 0x6e, + 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, + 0x73, 0x73, 0x69, 0x70, 0x2f, 0x76, 0x31, 0x3b, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x76, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/node/pkg/watchers/evm/connectors/batch_poller.go b/node/pkg/watchers/evm/connectors/batch_poller.go new file mode 100644 index 0000000000..c16f9acd2a --- /dev/null +++ b/node/pkg/watchers/evm/connectors/batch_poller.go @@ -0,0 +1,283 @@ +package connectors + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/supervisor" + ethEvent "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" + + ethereum "github.com/ethereum/go-ethereum" + + "go.uber.org/zap" +) + +// BatchPollConnector uses batch requests to poll for latest, safe and finalized blocks. +type BatchPollConnector struct { + Connector + Delay time.Duration + blockFeed ethEvent.Feed + errFeed ethEvent.Feed + batchData []BatchEntry +} + +type ( + Blocks []*NewBlock + + BatchEntry struct { + tag string + finality FinalityLevel + } + + BatchResult struct { + result BlockMarshaller + err error + } +) + +const MAX_GAP_BATCH_SIZE uint64 = 5 + +func NewBatchPollConnector(ctx context.Context, baseConnector Connector, delay time.Duration) (*BatchPollConnector, error) { + // Create the batch data in the order we want to report them to the watcher, so finalized is most important, latest is least. + batchData := []BatchEntry{ + {tag: "finalized", finality: Finalized}, + {tag: "safe", finality: Safe}, + {tag: "latest", finality: Latest}, + } + + connector := &BatchPollConnector{ + Connector: baseConnector, + Delay: delay, + batchData: batchData, + } + err := supervisor.Run(ctx, "batchPoller", common.WrapWithScissors(connector.runFromSupervisor, "batchPoller")) + if err != nil { + return nil, err + } + return connector, nil +} + +func (b *BatchPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { + sub := NewPollSubscription() + blockSub := b.blockFeed.Subscribe(sink) + + // The feed library does not support error forwarding, so we're emulating that using a custom subscription and + // an error feed. The feed library can't handle interfaces which is why we post strings. + innerErrSink := make(chan string, 10) + innerErrSub := b.errFeed.Subscribe(innerErrSink) + + common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + return nil + case <-sub.quit: + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + sub.unsubDone <- struct{}{} + return nil + case v := <-innerErrSink: + sub.err <- fmt.Errorf(v) + } + } + }) + return sub, nil +} + +func (b *BatchPollConnector) runFromSupervisor(ctx context.Context) error { + logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName())) + supervisor.Signal(ctx, supervisor.SignalHealthy) + return b.run(ctx, logger) +} + +func (b *BatchPollConnector) run(ctx context.Context, logger *zap.Logger) error { + // Get the initial blocks. + lastBlocks, err := b.getBlocks(ctx, logger) + if err != nil { + return err + } + + timer := time.NewTimer(b.Delay) + defer timer.Stop() + + errCount := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + lastBlocks, err = b.pollBlocks(ctx, logger, lastBlocks) + if err != nil { + errCount++ + if errCount > 3 { + logger.Error("batch polling encountered an error", zap.Int("errCount", errCount), zap.Error(err)) + b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err)) + return err + } + } else { + errCount = 0 + } + + timer.Reset(b.Delay) + } + } +} + +// pollBlocks polls for the latest blocks (finalized, safe and latest), compares them to the last ones, and publishes any new ones. +// In the case of an error, it returns the last blocks that were passed in, otherwise it returns the new blocks. +func (b *BatchPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, prevBlocks Blocks) (Blocks, error) { + newBlocks, err := b.getBlocks(ctx, logger) + if err != nil { + return prevBlocks, err + } + + if len(newBlocks) != len(prevBlocks) { + panic(fmt.Sprintf("getBlocks returned %d entries when there should be %d", len(newBlocks), len(prevBlocks))) + } + + for idx, newBlock := range newBlocks { + if newBlock.Number.Cmp(prevBlocks[idx].Number) > 0 { + // If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches. + newBlockNum := newBlock.Number.Uint64() + blockNum := prevBlocks[idx].Number.Uint64() + 1 + for blockNum < newBlockNum { + batchSize := newBlockNum - blockNum + if batchSize > MAX_GAP_BATCH_SIZE { + batchSize = MAX_GAP_BATCH_SIZE + } + gapBlocks, err := b.getBlockRange(ctx, logger, blockNum, batchSize, b.batchData[idx].finality) + if err != nil { + return prevBlocks, fmt.Errorf("failed to get gap blocks: %w", err) + } + for _, block := range gapBlocks { + b.blockFeed.Send(block) + } + blockNum += batchSize + } + + b.blockFeed.Send(newBlock) + } + } + + return newBlocks, nil +} + +// getBlocks gets the current batch of configured blocks (finalized, safe, latest). +func (b *BatchPollConnector) getBlocks(ctx context.Context, logger *zap.Logger) (Blocks, error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + + batch := make([]rpc.BatchElem, len(b.batchData)) + results := make([]BatchResult, len(b.batchData)) + for idx, bd := range b.batchData { + batch[idx] = rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{ + bd.tag, + false, // no full transaction details + }, + Result: &results[idx].result, + Error: results[idx].err, + } + } + + err := b.Connector.RawBatchCallContext(timeout, batch) + if err != nil { + logger.Error("failed to get blocks", zap.Error(err)) + return nil, err + } + + ret := make(Blocks, len(b.batchData)) + for idx, result := range results { + finality := b.batchData[idx].finality + if result.err != nil { + logger.Error("failed to get block", zap.Stringer("finality", finality), zap.Error(result.err)) + return nil, err + } + + m := &result.result + if m.Number == nil { + logger.Error("failed to unmarshal block: Number is nil", zap.Stringer("finality", finality), zap.String("tag", b.batchData[idx].tag)) + return nil, fmt.Errorf("failed to unmarshal block: Number is nil") + } + n := big.Int(*m.Number) + + var l1bn *big.Int + if m.L1BlockNumber != nil { + bn := big.Int(*m.L1BlockNumber) + l1bn = &bn + } + + ret[idx] = &NewBlock{ + Number: &n, + Hash: m.Hash, + L1BlockNumber: l1bn, + Finality: finality, + } + } + + return ret, nil +} + +// getBlockRange gets a range of blocks, starting at blockNum, including the next numBlocks. It passes back an array of those blocks. +func (b *BatchPollConnector) getBlockRange(ctx context.Context, logger *zap.Logger, blockNum uint64, numBlocks uint64, finality FinalityLevel) (Blocks, error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + + batch := make([]rpc.BatchElem, numBlocks) + results := make([]BatchResult, numBlocks) + for idx := 0; idx < int(numBlocks); idx++ { + batch[idx] = rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{ + "0x" + fmt.Sprintf("%x", blockNum), + false, // no full transaction details + }, + Result: &results[idx].result, + Error: results[idx].err, + } + blockNum++ + } + + err := b.Connector.RawBatchCallContext(timeout, batch) + if err != nil { + logger.Error("failed to get blocks", zap.Error(err)) + return nil, err + } + + ret := make(Blocks, numBlocks) + for idx, result := range results { + if result.err != nil { + logger.Error("failed to get block", zap.Int("idx", idx), zap.Stringer("finality", finality), zap.Error(result.err)) + return nil, err + } + + m := &result.result + if m.Number == nil { + logger.Error("failed to unmarshal block: Number is nil") + return nil, fmt.Errorf("failed to unmarshal block: Number is nil") + } + n := big.Int(*m.Number) + + var l1bn *big.Int + if m.L1BlockNumber != nil { + bn := big.Int(*m.L1BlockNumber) + l1bn = &bn + } + + ret[idx] = &NewBlock{ + Number: &n, + Hash: m.Hash, + L1BlockNumber: l1bn, + Finality: finality, + } + } + + return ret, nil +} diff --git a/node/pkg/watchers/evm/connectors/celo.go b/node/pkg/watchers/evm/connectors/celo.go index d1b5d355df..ff5a310522 100644 --- a/node/pkg/watchers/evm/connectors/celo.go +++ b/node/pkg/watchers/evm/connectors/celo.go @@ -16,6 +16,7 @@ import ( ethereum "github.com/ethereum/go-ethereum" ethCommon "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" + ethClient "github.com/ethereum/go-ethereum/ethclient" ethEvent "github.com/ethereum/go-ethereum/event" ethRpc "github.com/ethereum/go-ethereum/rpc" @@ -167,9 +168,21 @@ func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, errC chan error, c.logger.Error("new header block number is nil") continue } + hash := ethCommon.BytesToHash(ev.Hash().Bytes()) sink <- &NewBlock{ - Number: ev.Number, - Hash: ethCommon.BytesToHash(ev.Hash().Bytes()), + Number: ev.Number, + Hash: hash, + Finality: Finalized, + } + sink <- &NewBlock{ + Number: ev.Number, + Hash: hash, + Finality: Safe, + } + sink <- &NewBlock{ + Number: ev.Number, + Hash: hash, + Finality: Latest, } } } @@ -195,6 +208,10 @@ func (c *CeloConnector) RawBatchCallContext(ctx context.Context, b []ethRpc.Batc return c.rawClient.BatchCallContext(ctx, celoB) } +func (c *CeloConnector) Client() *ethClient.Client { + panic("unimplemented") +} + func convertCeloEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished { return ðAbi.AbiLogMessagePublished{ Sender: ethCommon.BytesToAddress(ev.Sender.Bytes()), diff --git a/node/pkg/watchers/evm/connectors/common.go b/node/pkg/watchers/evm/connectors/common.go index f177c8a539..dc6618cedc 100644 --- a/node/pkg/watchers/evm/connectors/common.go +++ b/node/pkg/watchers/evm/connectors/common.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + ethClient "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) @@ -30,7 +31,16 @@ type NewBlock struct { Number *big.Int Hash common.Hash L1BlockNumber *big.Int // This is only populated on some chains (Arbitrum) - Safe bool + Finality FinalityLevel +} + +func (b *NewBlock) Copy(f FinalityLevel) *NewBlock { + return &NewBlock{ + Number: b.Number, + Hash: b.Hash, + L1BlockNumber: b.L1BlockNumber, + Finality: f, + } } // Connector exposes Wormhole-specific interactions with an EVM-based network @@ -46,6 +56,7 @@ type Connector interface { SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error RawBatchCallContext(ctx context.Context, b []rpc.BatchElem) error + Client() *ethClient.Client } type PollSubscription struct { diff --git a/node/pkg/watchers/evm/connectors/ethereum.go b/node/pkg/watchers/evm/connectors/ethereum.go index e28d53ef10..17e440f7c9 100644 --- a/node/pkg/watchers/evm/connectors/ethereum.go +++ b/node/pkg/watchers/evm/connectors/ethereum.go @@ -121,8 +121,19 @@ func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, errC chan er continue } sink <- &NewBlock{ - Number: ev.Number, - Hash: ev.Hash(), + Number: ev.Number, + Hash: ev.Hash(), + Finality: Finalized, + } + sink <- &NewBlock{ + Number: ev.Number, + Hash: ev.Hash(), + Finality: Safe, + } + sink <- &NewBlock{ + Number: ev.Number, + Hash: ev.Hash(), + Finality: Latest, } } } diff --git a/node/pkg/watchers/evm/connectors/finality.go b/node/pkg/watchers/evm/connectors/finality.go new file mode 100644 index 0000000000..1015dd63fd --- /dev/null +++ b/node/pkg/watchers/evm/connectors/finality.go @@ -0,0 +1,26 @@ +package connectors + +import ( + "fmt" +) + +type FinalityLevel uint8 + +const ( + Latest FinalityLevel = iota + Safe + Finalized +) + +func (f FinalityLevel) String() string { + if f == Latest { + return "Latest" + } + if f == Safe { + return "Safe" + } + if f == Finalized { + return "Finalized" + } + return fmt.Sprintf("unknown(%d)", f) +} diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index 4e5506b7e7..f5ed5ab3b3 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -8,10 +8,10 @@ import ( "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/supervisor" - ethEvent "github.com/ethereum/go-ethereum/event" ethereum "github.com/ethereum/go-ethereum" ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil" + ethEvent "github.com/ethereum/go-ethereum/event" "go.uber.org/zap" ) @@ -24,27 +24,20 @@ type PollFinalizer interface { // finalizer which will be used to only return finalized blocks on subscriptions. type BlockPollConnector struct { Connector - Delay time.Duration - useFinalized bool - publishSafeBlocks bool - finalizer PollFinalizer - blockFeed ethEvent.Feed - errFeed ethEvent.Feed + Delay time.Duration + finalizer PollFinalizer + blockFeed ethEvent.Feed + errFeed ethEvent.Feed } -func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool, publishSafeBlocks bool) (*BlockPollConnector, error) { - if publishSafeBlocks && !useFinalized { - return nil, fmt.Errorf("publishSafeBlocks may only be enabled if useFinalized is enabled") - } +func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration) (*BlockPollConnector, error) { if finalizer == nil { - return nil, fmt.Errorf("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.") + panic("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.") } connector := &BlockPollConnector{ - Connector: baseConnector, - Delay: delay, - useFinalized: useFinalized, - publishSafeBlocks: publishSafeBlocks, - finalizer: finalizer, + Connector: baseConnector, + Delay: delay, + finalizer: finalizer, } err := supervisor.Run(ctx, "blockPoller", common.WrapWithScissors(connector.runFromSupervisor, "blockPoller")) if err != nil { @@ -53,6 +46,35 @@ func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finaliz return connector, nil } +func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { + sub := NewPollSubscription() + blockSub := b.blockFeed.Subscribe(sink) + + // The feed library does not support error forwarding, so we're emulating that using a custom subscription and + // an error feed. The feed library can't handle interfaces which is why we post strings. + innerErrSink := make(chan string, 10) + innerErrSub := b.errFeed.Subscribe(innerErrSink) + + common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + return nil + case <-sub.quit: + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + sub.unsubDone <- struct{}{} + return nil + case v := <-innerErrSink: + sub.err <- fmt.Errorf(v) + } + } + }) + return sub, nil +} + func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error { logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName())) supervisor.Signal(ctx, supervisor.SignalHealthy) @@ -60,179 +82,131 @@ func (b *BlockPollConnector) runFromSupervisor(ctx context.Context) error { } func (b *BlockPollConnector) run(ctx context.Context, logger *zap.Logger) error { - lastBlock, err := b.getBlock(ctx, logger, nil, false) + prevLatest, err := getBlockByTag(ctx, logger, b.Connector, "latest", Latest) if err != nil { return err } - var lastSafeBlock *NewBlock - if b.publishSafeBlocks { - lastSafeBlock, err = b.getBlock(ctx, logger, nil, true) - if err != nil { - return err - } + prevFinalized := &NewBlock{ + Number: prevLatest.Number, + Hash: prevLatest.Hash, + Finality: Finalized, } - timer := time.NewTimer(time.Millisecond) // Start immediately. + timer := time.NewTimer(b.Delay) + defer timer.Stop() + errCount := 0 for { select { case <-ctx.Done(): - timer.Stop() return ctx.Err() case <-timer.C: - for count := 0; count < 3; count++ { - lastBlock, err = b.pollBlocks(ctx, logger, lastBlock, false) - if err == nil { - break - } - logger.Error("polling of block encountered an error", zap.Error(err)) - - // Wait an interval before trying again. We stay in this loop so that we - // try up to three times before causing the watcher to restart. - time.Sleep(b.Delay) - } - - if err == nil && b.publishSafeBlocks { - for count := 0; count < 3; count++ { - lastSafeBlock, err = b.pollBlocks(ctx, logger, lastSafeBlock, true) - if err == nil { - break - } - logger.Error("polling of safe block encountered an error", zap.Error(err)) - - // Same wait as above. - time.Sleep(b.Delay) + prevLatest, prevFinalized, err = b.pollBlock(ctx, logger, prevLatest, prevFinalized) + if err != nil { + errCount++ + if errCount > 3 { + logger.Error("polling encountered an error", zap.Int("errCount", errCount), zap.Error(err)) + b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err)) + return err } + } else { + errCount = 0 } - if err != nil { - b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err)) - } timer.Reset(b.Delay) } } } -func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock, safe bool) (lastPublishedBlock *NewBlock, retErr error) { - // Some of the testnet providers (like the one we are using for Arbitrum) limit how many transactions we can do. When that happens, the call hangs. - // Use a timeout so that the call will fail and the runable will get restarted. This should not happen in mainnet, but if it does, we will need to - // investigate why the runable is dying and fix the underlying problem. - - lastPublishedBlock = lastBlock - - // Fetch the latest block on the chain - // We could do this on every iteration such that if a new block is created while this function is being executed, - // it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration. - latestBlock, err := b.getBlockWithTimeout(ctx, logger, nil, safe) +// pollBlock poll for the latest block, compares them to the last one, and publishes any new ones. +// In the case of an error, it returns the last block that were passed in, otherwise it returns the new block. +func (b *BlockPollConnector) pollBlock(ctx context.Context, logger *zap.Logger, prevLatest *NewBlock, prevFinalized *NewBlock) (newLatest *NewBlock, newFinalized *NewBlock, err error) { + newLatest, err = getBlockByTag(ctx, logger, b.Connector, "latest", Latest) if err != nil { - logger.Error("failed to look up latest block", - zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err)) - return lastPublishedBlock, fmt.Errorf("failed to look up latest block: %w", err) + err = fmt.Errorf("failed to get latest block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return } - for { - if lastPublishedBlock.Number.Cmp(latestBlock.Number) >= 0 { - // We have to wait for a new block to become available - return - } - // Try to fetch the next block between lastBlock and latestBlock - nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1)) - block, err := b.getBlockWithTimeout(ctx, logger, nextBlockNumber, safe) - if err != nil { - logger.Error("failed to fetch next block", - zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err)) - return lastPublishedBlock, fmt.Errorf("failed to fetch next block (%d): %w", nextBlockNumber.Uint64(), err) - } - - finalized, err := b.isBlockFinalizedWithTimeout(ctx, block) - if err != nil { - logger.Error("failed to check block finalization", - zap.Uint64("block", block.Number.Uint64()), zap.Error(err)) - return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err) - } + // First see if there might be some newly finalized ones to publish + var block *NewBlock + if newLatest.Number.Cmp(prevLatest.Number) > 0 { + // If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches. + newBlockNum := newLatest.Number.Uint64() + for blockNum := prevLatest.Number.Uint64() + 1; blockNum < newBlockNum; blockNum++ { + block, err = getBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Latest) + if err != nil { + err = fmt.Errorf("failed to get gap block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return + } - if !finalized { - break + b.blockFeed.Send(block) } - b.blockFeed.Send(block) - lastPublishedBlock = block + b.blockFeed.Send(newLatest) } - return -} + newFinalized = prevFinalized + if newLatest.Number.Cmp(prevFinalized.Number) > 0 { + var finalized bool + // If there is a gap between prev and new, we have to look up the transaction hashes for the missing ones. Do that in batches. + newBlockNum := newLatest.Number.Uint64() + for blockNum := prevFinalized.Number.Uint64() + 1; blockNum <= newBlockNum; blockNum++ { + block, err = getBlockByNumberUint64(ctx, logger, b.Connector, blockNum, Finalized) + if err != nil { + err = fmt.Errorf("failed to get gap block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return + } -func (b *BlockPollConnector) getBlockWithTimeout(ctx context.Context, logger *zap.Logger, blockNumber *big.Int, safe bool) (*NewBlock, error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - return b.getBlock(timeout, logger, blockNumber, safe) -} + finalized, err = b.isBlockFinalized(ctx, block) + if err != nil { + err = fmt.Errorf("failed to check finality on block: %w", err) + newLatest = prevLatest + newFinalized = prevFinalized + return + } -func (b *BlockPollConnector) isBlockFinalizedWithTimeout(ctx context.Context, block *NewBlock) (bool, error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - return b.finalizer.IsBlockFinalized(timeout, block) -} + if !finalized { + break + } -func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { - sub := NewPollSubscription() - blockSub := b.blockFeed.Subscribe(sink) + b.blockFeed.Send(block.Copy(Safe)) + b.blockFeed.Send(block.Copy(Finalized)) + newFinalized = block + } + } - // The feed library does not support error forwarding, so we're emulating that using a custom subscription and - // an error feed. The feed library can't handle interfaces which is why we post strings. - innerErrSink := make(chan string, 10) - innerErrSub := b.errFeed.Subscribe(innerErrSink) + return +} - common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - blockSub.Unsubscribe() - innerErrSub.Unsubscribe() - return nil - case <-sub.quit: - blockSub.Unsubscribe() - innerErrSub.Unsubscribe() - sub.unsubDone <- struct{}{} - return nil - case v := <-innerErrSink: - sub.err <- fmt.Errorf(v) - } - } - }) - return sub, nil +func getBlockByNumberUint64(ctx context.Context, logger *zap.Logger, conn Connector, blockNum uint64, desiredFinality FinalityLevel) (*NewBlock, error) { + return getBlockByTag(ctx, logger, conn, "0x"+fmt.Sprintf("%x", blockNum), desiredFinality) } -func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int, safe bool) (*NewBlock, error) { - return getBlock(ctx, logger, b.Connector, number, b.useFinalized, safe) +func getBlockByNumberBigInt(ctx context.Context, logger *zap.Logger, conn Connector, blockNum *big.Int, desiredFinality FinalityLevel) (*NewBlock, error) { + return getBlockByTag(ctx, logger, conn, ethHexUtils.EncodeBig(blockNum), desiredFinality) } -// getBlock is a free function that can be called from other connectors to get a single block. -func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool, safe bool) (*NewBlock, error) { - var numStr string - if number != nil { - numStr = ethHexUtils.EncodeBig(number) - } else if useFinalized { - if safe { - numStr = "safe" - } else { - numStr = "finalized" - } - } else { - numStr = "latest" - } +func getBlockByTag(ctx context.Context, logger *zap.Logger, conn Connector, tag string, desiredFinality FinalityLevel) (*NewBlock, error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() var m BlockMarshaller - err := conn.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false) + err := conn.RawCallContext(timeout, &m, "eth_getBlockByNumber", tag, false) if err != nil { logger.Error("failed to get block", - zap.String("requested_block", numStr), zap.Error(err)) + zap.String("requested_block", tag), zap.Error(err)) return nil, err } if m.Number == nil { logger.Error("failed to unmarshal block", - zap.String("requested_block", numStr), + zap.String("requested_block", tag), ) return nil, fmt.Errorf("failed to unmarshal block: Number is nil") } @@ -248,6 +222,12 @@ func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *b Number: &n, Hash: m.Hash, L1BlockNumber: l1bn, - Safe: safe, + Finality: desiredFinality, }, nil } + +func (b *BlockPollConnector) isBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + return b.finalizer.IsBlockFinalized(timeout, block) +} diff --git a/node/pkg/watchers/evm/connectors/poller_test.go b/node/pkg/watchers/evm/connectors/poller_test.go deleted file mode 100644 index ef2e1cc103..0000000000 --- a/node/pkg/watchers/evm/connectors/poller_test.go +++ /dev/null @@ -1,377 +0,0 @@ -package connectors - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.uber.org/zap" - - ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" - - ethereum "github.com/ethereum/go-ethereum" - ethCommon "github.com/ethereum/go-ethereum/common" - ethTypes "github.com/ethereum/go-ethereum/core/types" - ethClient "github.com/ethereum/go-ethereum/ethclient" - ethEvent "github.com/ethereum/go-ethereum/event" - ethRpc "github.com/ethereum/go-ethereum/rpc" -) - -// mockConnectorForPoller implements the connector interface for testing purposes. -type mockConnectorForPoller struct { - address ethCommon.Address - client *ethClient.Client - mutex sync.Mutex - err error - persistentError bool - blockNumber uint64 -} - -// setError takes an error which will be returned on the next RPC call. The error will persist until cleared. -func (m *mockConnectorForPoller) setError(err error) { - m.mutex.Lock() - m.err = err - m.persistentError = true - m.mutex.Unlock() -} - -// setSingleError takes an error which will be returned on the next RPC call. After that, the error is reset to nil. -func (m *mockConnectorForPoller) setSingleError(err error) { - m.mutex.Lock() - m.err = err - m.persistentError = false - m.mutex.Unlock() -} - -func (e *mockConnectorForPoller) NetworkName() string { - return "mockConnectorForPoller" -} - -func (e *mockConnectorForPoller) ContractAddress() ethCommon.Address { - return e.address -} - -func (e *mockConnectorForPoller) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { - return 0, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { - return ethAbi.StructsGuardianSet{}, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { - var s ethEvent.Subscription - return s, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { - return nil, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { - return 0, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { - return nil, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { - var s ethEvent.Subscription - return s, fmt.Errorf("not implemented") -} - -func (e *mockConnectorForPoller) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) (err error) { - if method != "eth_getBlockByNumber" { - panic("method not implemented by mockConnectorForPoller") - } - - e.mutex.Lock() - // If they set the error, return that immediately. - if e.err != nil { - err = e.err - if !e.persistentError { - e.err = nil - } - } else { - str := fmt.Sprintf(`{"author":"0x24c275f0719fdaec6356c4eb9f39ecb9c4d37ce1","baseFeePerGas":"0x3b9aca00","difficulty":"0x0","extraData":"0x","gasLimit":"0xe4e1c0","gasUsed":"0x0","hash":"0xfc8b62a31110121c57cfcccfaf2b147cc2c13b6d01bde4737846cefd29f045cf","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x24c275f0719fdaec6356c4eb9f39ecb9c4d37ce1","nonce":"0x0000000000000000","number":"0x%x","parentHash":"0x09d6d33a658b712f41db7fb9f775f94911ae0132123116aa4f8cf3da9f774e89","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x201","stateRoot":"0x0409ed10e03fd49424ae1489c6fbc6ff1897f45d0e214655ebdb8df94eedc3c0","timestamp":"0x6373ec24","totalDifficulty":"0x0","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`, e.blockNumber) - err = json.Unmarshal([]byte(str), &result) - } - e.mutex.Unlock() - - return -} - -func (e *mockConnectorForPoller) RawBatchCallContext(ctx context.Context, b []ethRpc.BatchElem) error { - panic("method not implemented by mockConnectorForPoller") -} - -func (e *mockConnectorForPoller) setBlockNumber(blockNumber uint64) { - e.mutex.Lock() - e.blockNumber = blockNumber - e.mutex.Unlock() -} - -func (e *mockConnectorForPoller) expectedHash() ethCommon.Hash { - return ethCommon.HexToHash("0xfc8b62a31110121c57cfcccfaf2b147cc2c13b6d01bde4737846cefd29f045cf") -} - -func (e *mockConnectorForPoller) Client() *ethClient.Client { - return e.client -} - -type mockFinalizerForPoller struct { - mutex sync.Mutex - finalized bool -} - -func newMockFinalizerForPoller(initialState bool) *mockFinalizerForPoller { - return &mockFinalizerForPoller{finalized: initialState} -} - -func (f *mockFinalizerForPoller) setFinalized(finalized bool) { - f.mutex.Lock() - defer f.mutex.Unlock() - f.finalized = finalized -} - -func (f *mockFinalizerForPoller) IsBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - return f.finalized, nil -} - -// TestBlockPoller is one big, ugly test because of all the set up required. -func TestBlockPoller(t *testing.T) { - ctx := context.Background() - logger := zap.NewNop() - baseConnector := mockConnectorForPoller{} - - finalizer := newMockFinalizerForPoller(true) // Start by assuming blocks are finalized. - assert.NotNil(t, finalizer) - - poller := &BlockPollConnector{ - Connector: &baseConnector, - Delay: 1 * time.Millisecond, - useFinalized: false, - finalizer: finalizer, - } - - // Set the starting block. - baseConnector.setBlockNumber(0x309a0c) - - // The go routines will post results here. - var mutex sync.Mutex - var block *NewBlock - var err error - var pollerStatus int - - const pollerRunning = 1 - const pollerExited = 2 - - // Start the poller running. - go func() { - mutex.Lock() - pollerStatus = pollerRunning - mutex.Unlock() - err := poller.run(ctx, logger) - require.NoError(t, err) - mutex.Lock() - pollerStatus = pollerExited - mutex.Unlock() - }() - - // Subscribe for events to be processed by our go routine. - headSink := make(chan *NewBlock, 2) - errC := make(chan error) - - headerSubscription, suberr := poller.SubscribeForBlocks(ctx, errC, headSink) - require.NoError(t, suberr) - - go func() { - for { - select { - case <-ctx.Done(): - return - case thisErr := <-errC: - mutex.Lock() - err = thisErr - mutex.Unlock() - case thisErr := <-headerSubscription.Err(): - mutex.Lock() - err = thisErr - mutex.Unlock() - case thisBlock := <-headSink: - require.NotNil(t, thisBlock) - mutex.Lock() - block = thisBlock - mutex.Unlock() - } - } - }() - - // First sleep a bit and make sure there were no start up errors. - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - assert.Nil(t, block) - mutex.Unlock() - - // Post the first new block and verify we get it. - baseConnector.setBlockNumber(0x309a0d) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a0d), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Sleep some more and verify we don't see any more blocks, since we haven't posted a new one. - baseConnector.setBlockNumber(0x309a0d) - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.Nil(t, block) - mutex.Unlock() - - // Post the next block and verify we get it. - baseConnector.setBlockNumber(0x309a0e) - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a0e), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Post the next block but mark it as not finalized, so we shouldn't see it yet. - mutex.Lock() - finalizer.setFinalized(false) - baseConnector.setBlockNumber(0x309a0f) - mutex.Unlock() - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.Nil(t, block) - mutex.Unlock() - - // Once it goes finalized we should see it. - mutex.Lock() - finalizer.setFinalized(true) - mutex.Unlock() - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a0f), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // An RPC error should be returned to us. - err = nil - baseConnector.setError(fmt.Errorf("RPC failed")) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - assert.Error(t, err) - assert.Nil(t, block) - baseConnector.setError(nil) - err = nil - mutex.Unlock() - - // Post the next block and verify we get it (so we survived the RPC error). - baseConnector.setBlockNumber(0x309a10) - - // There may be a few errors already queued up. Loop for a bit before we give up. - success := false - for count := 0; (count < 20) && (!success); count++ { - time.Sleep(10 * time.Millisecond) - mutex.Lock() - if err == nil { - success = true - } else { - err = nil - } - mutex.Unlock() - } - require.True(t, success) - - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a10), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Post an old block and we should not hear about it. - baseConnector.setBlockNumber(0x309a0c) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.Nil(t, block) - mutex.Unlock() - - // But we should keep going when we get a new one. - baseConnector.setBlockNumber(0x309a11) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a11), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // If there's a gap in the blocks, we should keep going. - baseConnector.setBlockNumber(0x309a13) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a13), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() - - // Should retry on a transient error and be able to continue. - baseConnector.setSingleError(fmt.Errorf("RPC failed")) - baseConnector.setBlockNumber(0x309a14) - - time.Sleep(10 * time.Millisecond) - mutex.Lock() - require.Equal(t, pollerRunning, pollerStatus) - require.NoError(t, err) - require.NotNil(t, block) - assert.Equal(t, uint64(0x309a14), block.Number.Uint64()) - assert.Equal(t, baseConnector.expectedHash(), block.Hash) - block = nil - mutex.Unlock() -} diff --git a/node/pkg/watchers/evm/connectors/polygon.go b/node/pkg/watchers/evm/connectors/polygon.go index 9867001984..6b6a102b05 100644 --- a/node/pkg/watchers/evm/connectors/polygon.go +++ b/node/pkg/watchers/evm/connectors/polygon.go @@ -31,6 +31,7 @@ import ( ethereum "github.com/ethereum/go-ethereum" ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind" ethCommon "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" ethClient "github.com/ethereum/go-ethereum/ethclient" ethRpc "github.com/ethereum/go-ethereum/rpc" @@ -115,6 +116,13 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, errC chan err return nil, fmt.Errorf("failed to post initial block: %w", err) } + // Use the standard geth head sink to get latest blocks. + headSink := make(chan *ethTypes.Header, 2) + _, err = c.Connector.Client().SubscribeNewHead(ctx, headSink) + if err != nil { + return nil, fmt.Errorf("failed to subscribe for latest blocks: %w", err) + } + common.RunWithScissors(ctx, errC, "polygon_subscribe_for_block", func(ctx context.Context) error { for { select { @@ -128,6 +136,20 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, errC chan err if err := c.processCheckpoint(ctx, sink, checkpoint); err != nil { sub.err <- fmt.Errorf("failed to process checkpoint: %w", err) } + case ev := <-headSink: + if ev == nil { + c.logger.Error("new latest header event is nil") + continue + } + if ev.Number == nil { + c.logger.Error("new latest header block number is nil") + continue + } + sink <- &NewBlock{ + Number: ev.Number, + Hash: ev.Hash(), + Finality: Latest, + } case <-sub.quit: messageSub.Unsubscribe() sub.unsubDone <- struct{}{} @@ -157,11 +179,16 @@ func (c *PolygonConnector) postBlock(ctx context.Context, blockNum *big.Int, sin return fmt.Errorf("blockNum is nil") } - block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false, false) + block, err := getBlockByNumberBigInt(ctx, c.logger, c.Connector, blockNum, Finalized) if err != nil { return fmt.Errorf("failed to get block %s: %w", blockNum.String(), err) } + // Publish the finalized block. sink <- block + + // Publish same thing for the safe block. + sink <- block.Copy(Safe) + return nil } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 1cc33dcbb3..26b65ac4fb 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -59,6 +59,16 @@ var ( Name: "wormhole_eth_current_height", Help: "Current Ethereum block height", }, []string{"eth_network"}) + currentEthSafeHeight = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "wormhole_eth_current_safe_height", + Help: "Current Ethereum safe block height", + }, []string{"eth_network"}) + currentEthFinalizedHeight = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "wormhole_eth_current_finalized_height", + Help: "Current Ethereum finalized block height", + }, []string{"eth_network"}) queryLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "wormhole_eth_query_latency", @@ -121,9 +131,12 @@ type ( maxWaitConfirmations uint64 // Interface to the chain specific ethereum library. - ethConn connectors.Connector - unsafeDevMode bool + ethConn connectors.Connector + unsafeDevMode bool + safeBlocksSupported bool + latestBlockNumber uint64 + latestSafeBlockNumber uint64 latestFinalizedBlockNumber uint64 l1Finalizer interfaces.L1Finalizer @@ -225,7 +238,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, true, safeBlocksSupported) + w.ethConn, err = connectors.NewBatchPollConnector(ctx, baseConnector, 250*time.Millisecond) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -252,7 +265,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } finalizer := finalizers.NewNeonFinalizer(logger, w.l1Finalizer) - pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) + pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -561,7 +574,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { }) // Watch headers - headSink := make(chan *connectors.NewBlock, 2) + headSink := make(chan *connectors.NewBlock, 100) headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, errC, headSink) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc() @@ -586,7 +599,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { continue } if ev.Number == nil { - logger.Error("new header block number is nil", zap.String("eth_network", w.networkName), zap.Bool("is_safe_block", ev.Safe)) + logger.Error("new header block number is nil", zap.String("eth_network", w.networkName), zap.Stringer("finality", ev.Finality)) continue } @@ -595,36 +608,42 @@ func (w *Watcher) Run(parentCtx context.Context) error { logger.Debug("processing new header", zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.String("eth_network", w.networkName)) readiness.SetReady(w.readinessSync) - w.pendingMu.Lock() - blockNumberU := ev.Number.Uint64() - if ev.Safe { - atomic.StoreUint64(¤tSafeBlockNumber, blockNumberU) + if ev.Finality == connectors.Latest { + atomic.StoreUint64(&w.latestBlockNumber, blockNumberU) + currentEthHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU)) + w.updateRegistry() + continue + } + + // The only blocks that get here are safe and finalized. + + if ev.Finality == connectors.Safe { + atomic.StoreUint64(&w.latestSafeBlockNumber, blockNumberU) + currentEthSafeHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU)) } else { - p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{ - Height: ev.Number.Int64(), - ContractAddress: w.contract.Hex(), - }) atomic.StoreUint64(¤tBlockNumber, blockNumberU) atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU) - currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64())) + currentEthFinalizedHeight.WithLabelValues(w.networkName).Set(float64(blockNumberU)) } + w.updateRegistry() + w.pendingMu.Lock() for key, pLock := range w.pending { // If this block is safe, only process messages wanting safe. // If it's not safe, only process messages wanting finalized. - if safeBlocksSupported { - if ev.Safe != (pLock.message.ConsistencyLevel == vaa.ConsistencyLevelSafe) { + if w.safeBlocksSupported { + if (ev.Finality == connectors.Safe) != (pLock.message.ConsistencyLevel == vaa.ConsistencyLevelSafe) { continue } } var expectedConfirmations uint64 - if w.waitForConfirmations && !ev.Safe { + if w.waitForConfirmations && ev.Finality != connectors.Safe { expectedConfirmations = uint64(pLock.message.ConsistencyLevel) } @@ -636,7 +655,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Uint64("expectedConfirmations", expectedConfirmations), @@ -668,7 +687,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Error(err)) @@ -687,7 +706,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Error(err)) @@ -704,7 +723,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Error(err)) @@ -721,7 +740,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName)) delete(w.pending, key) @@ -735,7 +754,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName)) delete(w.pending, key) @@ -747,7 +766,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { w.pendingMu.Unlock() logger.Debug("processed new header", zap.Stringer("current_block", ev.Number), - zap.Bool("is_safe_block", ev.Safe), + zap.Stringer("finality", ev.Finality), zap.Stringer("current_blockhash", currentHash), zap.Duration("took", time.Since(start)), zap.String("eth_network", w.networkName)) @@ -908,3 +927,12 @@ func (w *Watcher) SetWaitForConfirmations(waitForConfirmations bool) { func (w *Watcher) SetMaxWaitConfirmations(maxWaitConfirmations uint64) { w.maxWaitConfirmations = maxWaitConfirmations } + +func (w *Watcher) updateRegistry() { + p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{ + Height: int64(atomic.LoadUint64(&w.latestBlockNumber)), + SafeHeight: int64(atomic.LoadUint64(&w.latestSafeBlockNumber)), + FinalizedHeight: int64(atomic.LoadUint64(&w.latestFinalizedBlockNumber)), + ContractAddress: w.contract.Hex(), + }) +} diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 04685a831d..206865a0b4 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -50,6 +50,10 @@ message Heartbeat { string contract_address = 3; // Connection error count uint64 error_count = 4; + // Safe block height of the node, if supported. + int64 safe_height = 5; + // Finalized block height of the node, if supported. + int64 finalized_height = 6; } repeated Network networks = 4;