Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #15 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Supervising Block Header Listener Worker
  • Loading branch information
itzmeanjan authored Apr 25, 2021
2 parents 7fe7ba3 + 7482a8d commit cd01e26
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
45 changes: 42 additions & 3 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,48 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
go pool.Queued.Start(ctx)
// (b)
go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan)
// Listens for new block headers & informs 👆 (a) for pruning
// txs which can be/ need to be
go listen.SubscribeHead(ctx, wsClient, caughtTxsChan, lastSeenBlockChan)

// This worker will supervise block header listener, so that it can keep
// track of their health & if they die due to some abnormal reasons
// it'll spawn a new one after a static delay of x time unit ( see below )
go func() {

var died bool

healthChan := make(chan struct{})
go listen.SubscribeHead(ctx, wsClient, pool.Pending.GetLastSeenBlock().Number, caughtTxsChan, lastSeenBlockChan, healthChan)

for {

if died {
// Wait before we spawn new worker
<-time.After(time.Duration(5) * time.Second)

healthChan = make(chan struct{})
go listen.SubscribeHead(ctx, wsClient, pool.Pending.GetLastSeenBlock().Number, caughtTxsChan, lastSeenBlockChan, healthChan)

died = false
}

select {

case <-ctx.Done():
return

case <-healthChan:
died = true

default:
// sleep for a while
<-time.After(time.Duration(1000) * time.Millisecond)
// and go to work again

}

}

}()

go data.TrackNotFoundTxs(ctx, inPendingPoolChan, notFoundTxsChan, caughtTxsChan)

// Passed this mempool handle to graphql query resolver
Expand Down
25 changes: 21 additions & 4 deletions app/listen/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type CaughtTxs []*CaughtTx
// SubscribeHead - Subscribe to block headers & as soon as new block gets mined
// its txs are picked up & published on a go channel, which will be listened
// to by pending pool watcher, so that it can prune its state
func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan<- CaughtTxs, lastSeenBlockChan chan<- uint64) {
func SubscribeHead(ctx context.Context, client *ethclient.Client, lastSeenBlock uint64, commChan chan<- CaughtTxs, lastSeenBlockChan chan<- uint64, healthChan chan struct{}) {

retryTable := make(map[*big.Int]bool)
headerChan := make(chan *types.Header, 64)
Expand All @@ -44,19 +44,36 @@ func SubscribeHead(ctx context.Context, client *ethclient.Client, commChan chan<
return

case err := <-subs.Err():
log.Printf("❗️ Block header subscription failed : %s\n", err.Error())
if err != nil {
log.Printf("❗️ Block header subscription failed : %s\n", err.Error())
} else {
log.Printf("❗️ Block header subscription failed\n")
}

// Notify supervisor this worker is dying
close(healthChan)
return

case header := <-headerChan:

if !ProcessBlock(ctx, client, header.Number, commChan, lastSeenBlockChan) {
// If this go routine dies in mid, supervisor will spawn a new one
// after some delay, which will require processing missed blocks
if lastSeenBlock != 0 && header.Number.Uint64()-lastSeenBlock > 1 {

for i := lastSeenBlock + 1; i < header.Number.Uint64(); i++ {
retryTable[big.NewInt(int64(i))] = true
}

}

if !ProcessBlock(ctx, client, header.Number, commChan, lastSeenBlockChan) {
// Put entry in table that we failed to fetch this block, to be
// attempted in some time future
retryTable[header.Number] = true

}

lastSeenBlock = header.Number.Uint64()

case <-time.After(time.Duration(1) * time.Millisecond):

pendingC := len(retryTable)
Expand Down

0 comments on commit cd01e26

Please sign in to comment.