Skip to content

Commit

Permalink
Merge pull request #136 from forta-network/caner/block-timeline
Browse files Browse the repository at this point in the history
Implement a timeline of highest blocks
  • Loading branch information
canercidam authored Oct 19, 2023
2 parents deace24 + 3dfb95f commit 3b887fe
Show file tree
Hide file tree
Showing 26 changed files with 1,205 additions and 261 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ mocks:
$(MOCKGEN) -source manifest/client.go -destination manifest/mocks/mock_client.go
$(MOCKGEN) -source clients/graphql/client.go -destination clients/mocks/mock_graphql_client.go
$(MOCKGEN) -source utils/ethutils/iterator.go -destination utils/ethutils/mocks/mock_iterator.go
$(MOCKGEN) -source inspect/proxy_api.go -destination inspect/mocks/mock_proxy_api.go

.PHONY: test
test:
go test -v -count=1 -coverprofile=coverage.out ./...
go test -v -count=1 -short -coverprofile=coverage.out ./...

.PHONY: coverage
coverage:
Expand Down
26 changes: 22 additions & 4 deletions ethereum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,28 @@ import (
log "github.com/sirupsen/logrus"
)

// EthClient is the original interface from go-ethereum.
type EthClient interface {
ethereum.ChainReader
ethereum.ChainStateReader
ethereum.TransactionReader
ethereum.LogFilterer
ethereum.ContractCaller
BlockNumber(ctx context.Context) (uint64, error)
BlockByNumber(ctx context.Context, blockNum *big.Int) (*types.Block, error)
}

// RPCClient is a wrapper implementation of the RPC client.
type RPCClient interface {
Close()
Call(result interface{}, method string, args ...interface{}) error
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
Subscribe(ctx context.Context, channel interface{}, args ...interface{}) (domain.ClientSubscription, error)
}

// Subscriber subscribes to Ethereum namespaces.
type Subscriber interface {
RPCClient
Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (domain.ClientSubscription, error)
}

// Client is an interface encompassing all ethereum actions
Expand Down Expand Up @@ -82,6 +99,7 @@ var maxBackoff = 1 * time.Minute
type streamEthClient struct {
apiName string
rpcClient RPCClient
subscriber Subscriber
retryInterval time.Duration
isWebsocket bool

Expand Down Expand Up @@ -331,7 +349,7 @@ func (e *streamEthClient) SubscribeToHead(ctx context.Context) (domain.HeaderCh,
log.Debug("subscribing to blockchain head")
recvCh := make(chan *types.Header)
sendCh := make(chan *types.Header)
sub, err := e.rpcClient.Subscribe(ctx, recvCh, "newHeads")
sub, err := e.subscriber.Subscribe(ctx, "eth", recvCh, "newHeads")
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %v", err)
}
Expand Down Expand Up @@ -384,8 +402,8 @@ type rpcClient struct {
*rpc.Client
}

func (rc *rpcClient) Subscribe(ctx context.Context, channel interface{}, args ...interface{}) (domain.ClientSubscription, error) {
sub, err := rc.EthSubscribe(ctx, channel, args...)
func (rc *rpcClient) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (domain.ClientSubscription, error) {
sub, err := rc.Subscribe(ctx, namespace, channel, args...)

Check failure on line 406 in ethereum/client.go

View workflow job for this annotation

GitHub Actions / Validate Go code

SA5007: infinite recursive call (staticcheck)
return sub, err
}

Expand Down
14 changes: 9 additions & 5 deletions ethereum/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ const testBlockHash = "0x4fc0862e76691f5312964883954d5c2db35e2b8f7a4f191775a4f50

var testErr = errors.New("test err")

func initClient(t *testing.T) (*streamEthClient, *mocks.MockRPCClient, context.Context) {
func initClient(t *testing.T) (*streamEthClient, *mocks.MockRPCClient, *mocks.MockSubscriber, context.Context) {
minBackoff = 1 * time.Millisecond
maxBackoff = 1 * time.Millisecond
ctx := context.Background()
ctrl := gomock.NewController(t)
client := mocks.NewMockRPCClient(ctrl)
subscriber := mocks.NewMockSubscriber(ctrl)

return &streamEthClient{rpcClient: client}, client, ctx
return &streamEthClient{rpcClient: client, subscriber: subscriber}, client, subscriber, ctx
}

func TestEthClient_BlockByHash(t *testing.T) {
r := require.New(t)

ethClient, client, ctx := initClient(t)
ethClient, client, _, ctx := initClient(t)
hash := testBlockHash
// verify retry
client.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr).Times(1)
Expand All @@ -50,10 +51,10 @@ func TestEthClient_BlockByHash(t *testing.T) {
func TestEthClient_SubscribeToHeader_Err(t *testing.T) {
r := require.New(t)

ethClient, client, ctx := initClient(t)
ethClient, _, subscriber, ctx := initClient(t)
sub := mock_domain.NewMockClientSubscription(gomock.NewController(t))

client.EXPECT().Subscribe(gomock.Any(), gomock.Any(), "newHeads").Return(sub, nil).Times(2)
subscriber.EXPECT().Subscribe(gomock.Any(), "eth", gomock.Any(), "newHeads").Return(sub, nil).Times(2)
errCh := make(chan error, 1)
errCh <- errors.New("subscription encountered some error")
sub.EXPECT().Err().Return(errCh).Times(2)
Expand All @@ -64,9 +65,12 @@ func TestEthClient_SubscribeToHeader_Err(t *testing.T) {

headerCh, err = ethClient.SubscribeToHead(ctx)
r.NoError(err)
var blocked bool
select {
case <-time.After(time.Second):
// should continue from here
blocked = true
case <-headerCh: // should block
}
r.True(blocked)
}
Loading

0 comments on commit 3b887fe

Please sign in to comment.