Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a timeline of highest blocks #136

Merged
merged 28 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b324762
implement a timeline of highest blocks
canercidam Nov 20, 2022
6362833
add ability to disable logs
canercidam Nov 21, 2022
c84a886
Merge branch 'master' into caner/block-timeline
canercidam Oct 16, 2023
1ab11b7
add functionality to calculate lag in a given minute
canercidam Oct 16, 2023
34a40d6
add experiment as optional test
canercidam Oct 16, 2023
9f891ea
add distance number per chain
canercidam Oct 16, 2023
151b354
reduce optimism threshold
canercidam Oct 16, 2023
ff76c57
add method to check timeline size
canercidam Oct 16, 2023
a321998
log delay
canercidam Oct 17, 2023
56cab57
return delay
canercidam Oct 17, 2023
1f88ce4
fix the timeline construction
canercidam Oct 17, 2023
81372b6
remove distance stuff
canercidam Oct 17, 2023
3ed77f8
add sorting and fix the experiment test
canercidam Oct 18, 2023
d3a368a
remove the size method
canercidam Oct 18, 2023
44281ae
incorporate the score estimation
canercidam Oct 18, 2023
a75a2c5
remove redundant method and increase coverage
canercidam Oct 18, 2023
bf10332
fix tests
canercidam Oct 18, 2023
1105118
fix issues
canercidam Oct 18, 2023
70dfdf5
shorten the tests
canercidam Oct 18, 2023
ba668cb
fix typo
canercidam Oct 18, 2023
4ae9aa5
one more short test
canercidam Oct 18, 2023
9946960
add one more short test
canercidam Oct 19, 2023
55b69f6
try changing cache key
canercidam Oct 19, 2023
e77a27a
reset cache key
canercidam Oct 19, 2023
7ab4acc
add skip to timeline experiment
canercidam Oct 19, 2023
cbfba0f
Merge branch 'master' into caner/block-timeline
canercidam Oct 19, 2023
12c6c82
fix tests
canercidam Oct 19, 2023
3dfb95f
remove some debug lines
canercidam Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ mocks:

.PHONY: test
test:
go test -v -count=1 -coverprofile=coverage.out ./...
go test -v -count=1 -short -coverprofile=coverage.out ./...

.PHONY: coverage
coverage:
Expand Down
14 changes: 10 additions & 4 deletions feeds/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type blockFeed struct {
cache utils.Cache
chainID *big.Int
tracing bool
logs bool
started bool
rateLimit *time.Ticker
maxBlockAge *time.Duration
Expand All @@ -56,6 +57,7 @@ type BlockFeedConfig struct {
ChainID *big.Int
RateLimit *time.Ticker
Tracing bool
DisableLogs bool
SkipBlocksOlderThan *time.Duration
}

Expand Down Expand Up @@ -313,10 +315,13 @@ func (bf *blockFeed) forEachBlock() error {
traces = nil
}

logs, err := bf.logsForBlock(blockNumToAnalyze)
if err != nil {
logger.WithError(err).Errorf("error getting logs for block")
continue
var logs []domain.LogEntry
if bf.logs {
logs, err = bf.logsForBlock(blockNumToAnalyze)
if err != nil {
logger.WithError(err).Errorf("error getting logs for block")
continue
}
}

blockTs, err := block.GetTimestamp()
Expand Down Expand Up @@ -390,6 +395,7 @@ func NewBlockFeed(ctx context.Context, client ethereum.Client, traceClient ether
cache: utils.NewCache(10000),
chainID: cfg.ChainID,
tracing: cfg.Tracing,
logs: !cfg.DisableLogs,
rateLimit: cfg.RateLimit,
maxBlockAge: cfg.SkipBlocksOlderThan,
subscriptionMode: client.IsWebsocket(),
Expand Down
1 change: 1 addition & 0 deletions feeds/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func getTestBlockFeed(t *testing.T) (*blockFeed, *mocks.MockClient, *mocks.MockC
traceClient: traceClient,
cache: cache,
tracing: true,
logs: true,
maxBlockAge: &maxBlockAge,
}, client, traceClient, ctx, cancel
}
Expand Down
200 changes: 200 additions & 0 deletions feeds/timeline/timeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package timeline

import (
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol/settings"
log "github.com/sirupsen/logrus"
)

// BlockTimeline implements a block feed subscriber and keeps track of the
// latest block in every minute.
type BlockTimeline struct {
threshold int
maxMinutes int
chainMinutes []*Minute // when the block was produced
localMinutes []*Minute // when we handled the block
delay *time.Duration
mu sync.RWMutex
}

// Minute represents a minute in a chain.
type Minute struct {
HighestBlockNumber uint64
Timestamp time.Time
}

// NewBlockTimeline creates a new block timeline.
func NewBlockTimeline(chainID, maxMinutes int) *BlockTimeline {
bt := &BlockTimeline{
threshold: settings.GetChainSettings(chainID).BlockThreshold,
maxMinutes: maxMinutes,
}

go bt.cleanup()

return bt
}

func (bt *BlockTimeline) cleanup() {
ticker := time.NewTicker(time.Minute)
for {
<-ticker.C
bt.doCleanup()
}
}

func (bt *BlockTimeline) doCleanup() {
bt.mu.Lock()
defer bt.mu.Unlock()

currSize := len(bt.chainMinutes)
if currSize > bt.maxMinutes {
extra := currSize - bt.maxMinutes
bt.chainMinutes = bt.chainMinutes[extra:] // release oldest
}

currSize = len(bt.localMinutes)
if currSize > bt.maxMinutes {
extra := currSize - bt.maxMinutes
bt.localMinutes = bt.localMinutes[extra:] // release oldest
}
}

// HandleBlock handles a block incoming from block feed.
func (bt *BlockTimeline) HandleBlock(evt *domain.BlockEvent) error {
bt.mu.Lock()
defer bt.mu.Unlock()

blockTs, err := evt.Block.GetTimestamp()
if err != nil {
log.WithError(err).Error("failed to get block timestamp")
return nil
}
delay := time.Since(*blockTs)
bt.delay = &delay

localMinuteTs := time.Now().Truncate(time.Minute)

blockMinuteTs := blockTs.Truncate(time.Minute)
blockNum, err := hexutil.DecodeUint64(evt.Block.Number)
if err != nil {
log.WithError(err).Error("failed to decode block number")
}

var foundBlockMinute bool
for _, minute := range bt.chainMinutes {
if minute.Timestamp.Equal(blockMinuteTs) {
if blockNum > minute.HighestBlockNumber {
minute.HighestBlockNumber = blockNum
}
foundBlockMinute = true
break
}
}
if !foundBlockMinute {
bt.chainMinutes = append(bt.chainMinutes, &Minute{
HighestBlockNumber: blockNum,
Timestamp: blockMinuteTs,
})
}

var foundLocalMinute bool
for _, minute := range bt.localMinutes {
if minute.Timestamp.Equal(localMinuteTs) {
if blockNum > minute.HighestBlockNumber {
minute.HighestBlockNumber = blockNum
}
foundLocalMinute = true
break
}
}
if !foundLocalMinute {
bt.localMinutes = append(bt.localMinutes, &Minute{
HighestBlockNumber: blockNum,
Timestamp: localMinuteTs,
})
}

sort.Slice(bt.chainMinutes, func(i, j int) bool {
return bt.chainMinutes[i].Timestamp.Before(bt.chainMinutes[j].Timestamp)
})
sort.Slice(bt.localMinutes, func(i, j int) bool {
return bt.localMinutes[i].Timestamp.Before(bt.localMinutes[j].Timestamp)
})

return nil
}

func (bt *BlockTimeline) GetDelay() (time.Duration, bool) {
bt.mu.RLock()
defer bt.mu.RUnlock()

if bt.delay == nil {
return 0, false
}
return *bt.delay, true
}

func (bt *BlockTimeline) getLatestUpTo(minutes []*Minute, ts time.Time) (uint64, bool) {
ts = ts.Truncate(time.Minute)
var foundMinute *Minute
for _, minute := range minutes {
if minute.Timestamp.After(ts) {
break
}
foundMinute = minute
}
if foundMinute != nil {
return foundMinute.HighestBlockNumber, true
}
return 0, false
}

// CalculateLag calculates the block number lag by taking the average of each minute.
func (bt *BlockTimeline) CalculateLag() (float64, bool) {
bt.mu.RLock()
defer bt.mu.RUnlock()

var (
total float64
count float64
)
for i, chainMinute := range bt.chainMinutes {
// exclude the last minute
if i == len(bt.chainMinutes)-1 {
break
}
// avoid calculation if we can't find a highest
localMinuteHighest, ok := bt.getLatestUpTo(bt.localMinutes, chainMinute.Timestamp)
if !ok {
continue
}
total += float64(chainMinute.HighestBlockNumber - localMinuteHighest)
count++
}
if count == 0 {
return 0, false
}
return total / count, true
}

// EstimateBlockScore estimates the block score based on the lag and the block threshold.
func (bt *BlockTimeline) EstimateBlockScore() (float64, bool) {
lag, ok := bt.CalculateLag()
if !ok {
return 0, false
}
estimate := (float64(bt.threshold) - float64(lag)) / float64(bt.threshold)
if estimate < 0 {
estimate = 0
}
if estimate > 1 {
estimate = 1
}
return estimate, true
}
Loading
Loading