Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #12 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Limiting Pending/ Queued Pool Size
  • Loading branch information
itzmeanjan authored Apr 14, 2021
2 parents b130077 + 9d7e7ac commit 20e7368
Show file tree
Hide file tree
Showing 13 changed files with 748 additions and 444 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# harmony
Reduce Chaos in MemPool 😌

**Production grade release : >= v0.8.0**

![banner](./sc/banner.png)

## Table of Contents
Expand Down Expand Up @@ -74,6 +76,8 @@ During my journey of exploring Ethereum MemPool, I found good initiative from [B

![architecture](./sc/architecture.jpg)

![internals](./sc/internals.jpg)

## Prerequisite

- Make sure you've _`Go ( >= 1.16)`_, _`make`_ installed
Expand Down Expand Up @@ -102,6 +106,8 @@ touch .env
RPCUrl=https://<rpc-node>
WSUrl=wss://<rpc-node>
MemPoolPollingPeriod=1000
PendingPoolSize=4096
QueuedPoolSize=4096
PendingTxEntryTopic=pending_pool_entry
PendingTxExitTopic=pending_pool_exit
QueuedTxEntryTopic=queued_pool_entry
Expand All @@ -119,6 +125,8 @@ Environment Variable | Interpretation
RPCUrl | `txpool` RPC API enabled Ethereum Node's URI
WSUrl | To be used for listening to newly mined block headers
MemPoolPollingPeriod | RPC node's mempool to be checked every `X` milliseconds
PendingPoolSize | #-of pending tx(s) to be kept in-memory at a time
QueuedPoolSize | #-of queued tx(s) to be kept in-memory at a time
PendingTxEntryTopic | Whenever tx enters pending pool, it'll be published on Redis topic `t`
PendingTxExitTopic | Whenever tx leaves pending pool, it'll be published on Redis topic `t`
QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Redis topic `t`
Expand All @@ -130,6 +138,8 @@ RedisDB | Redis database to be used. **[ By default there're 16 of them ]**
ConcurrencyFactor | Whenever concurrency can be leveraged, `harmony` will create worker pool with `#-of logical CPUs x ConcurrencyFactor` go routines. **[ Can be float too ]**
Port | Starts HTTP server on this port ( > 1024 )

> Note : When pool size exceeds, tx with lowest gas price paid to be dropped. Consider setting pool sizes to higher values, if you've enough memory on machine, otherwise it'll crash.
---

### Multi-Node Cluster Setup
Expand Down Expand Up @@ -1226,7 +1236,3 @@ python3 subscribe_1.py
```bash
deactivate
```

---
> Note: `harmony` is not recommended for use in production environment at time of writing this. It's under active development.
---
59 changes: 37 additions & 22 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bootup
import (
"context"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -104,36 +103,47 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
return nil, err
}

// This is communication channel to be used between pending pool
// & queued pool, so that when new tx gets added into pending pool
// queued pool also gets notified & gets to update state if required
alreadyInPendingPoolChan := make(chan *data.MemPoolTx, 4096)

// initialising pending pool
pendingPool := &data.PendingPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024),
Lock: &sync.RWMutex{},
IsPruning: false,
AddTxChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemoveRequest, 1),
TxExistsChan: make(chan data.ExistsRequest, 1),
GetTxChan: make(chan data.GetRequest, 1),
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
PubSub: _redis,
RPC: client,
Transactions: make(map[common.Hash]*data.MemPoolTx),
TxsFromAddress: make(map[common.Address]data.TxList),
DroppedTxs: make(map[common.Hash]bool),
RemovedTxs: make(map[common.Hash]bool),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()),
AddTxChan: make(chan data.AddRequest, 1),
AddFromQueuedPoolChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemoveRequest, 1),
AlreadyInPendingPoolChan: alreadyInPendingPoolChan,
TxExistsChan: make(chan data.ExistsRequest, 1),
GetTxChan: make(chan data.GetRequest, 1),
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
TxsFromAChan: make(chan data.TxsFromARequest, 1),
PubSub: _redis,
RPC: client,
}

// initialising queued pool
queuedPool := &data.QueuedPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024),
Lock: &sync.RWMutex{},
IsPruning: false,
TxsFromAddress: make(map[common.Address]data.TxList),
DroppedTxs: make(map[common.Hash]bool),
RemovedTxs: make(map[common.Hash]bool),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetQueuedPoolSize()),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetQueuedPoolSize()),
AddTxChan: make(chan data.AddRequest, 1),
RemoveTxChan: make(chan data.RemovedUnstuckTx, 1),
TxExistsChan: make(chan data.ExistsRequest, 1),
GetTxChan: make(chan data.GetRequest, 1),
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
TxsFromAChan: make(chan data.TxsFromARequest, 1),
PubSub: _redis,
RPC: client,
PendingPool: pendingPool,
Expand All @@ -146,17 +156,22 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {

// Block head listener & pending pool pruner
// talks over this buffered channel
commChan := make(chan listen.CaughtTxs, 1024)
caughtTxsChan := make(chan listen.CaughtTxs, 16)
confirmedTxsChan := make(chan data.ConfirmedTx, 4096)

// Starting pool life cycle manager go routine
go pool.Pending.Start(ctx)
// (a)
go pool.Pending.Prune(ctx, commChan)
//
// After that this pool will also let (b) know that it can
// update state of txs, which have become unstuck
go pool.Pending.Prune(ctx, caughtTxsChan, confirmedTxsChan)
go pool.Queued.Start(ctx)
go pool.Queued.Prune(ctx)
// (b)
go pool.Queued.Prune(ctx, confirmedTxsChan, alreadyInPendingPoolChan)
// Listens for new block headers & informs 👆 (a) for pruning
// txs which can be/ need to be
go listen.SubscribeHead(ctx, wsClient, commChan)
go listen.SubscribeHead(ctx, wsClient, caughtTxsChan)

// Passed this mempool handle to graphql query resolver
if err := graph.InitMemPool(pool); err != nil {
Expand Down
30 changes: 24 additions & 6 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,33 @@ func GetBool(key string) bool {
// 1000ms & again get to work
func GetMemPoolPollingPeriod() uint64 {

period := Get("MemPoolPollingPeriod")
if period := GetUint("MemPoolPollingPeriod"); period != 0 {
return period
}

_period, err := strconv.ParseUint(period, 10, 64)
if err != nil {
log.Printf("[❗️] Failed to parse mempool polling period : `%s`, using 1000 ms\n", err.Error())
return 1000
return 1000

}

// GetPendingPoolSize - Max #-of pending pool txs can be living in memory
func GetPendingPoolSize() uint64 {

if size := GetUint("PendingPoolSize"); size != 0 {
return size
}

return 1024

}

// GetQueuedPoolSize - Max #-of queued pool txs can be living in memory
func GetQueuedPoolSize() uint64 {

if size := GetUint("QueuedPoolSize"); size != 0 {
return size
}

return _period
return 1024

}

Expand Down
7 changes: 3 additions & 4 deletions app/data/asc_gasprice.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ func (m MemPoolTxsAsc) findTx(low int, high int, tx *MemPoolTx) int {

if low == high {

idx := findTxFromSlice(m[low:], tx)
if idx == -1 {
return -1
if idx := findTxFromSlice(m[low:], tx); idx != -1 {
return low + idx
}

return low + idx
return -1

}

Expand Down
77 changes: 77 additions & 0 deletions app/data/asc_nonce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package data

// TxsFromAddressAsc - List of txs, sent from same address
// sorted by their nonce
type TxsFromAddressAsc []*MemPoolTx

// len - Number of tx(s) living in pool, from this address
func (t TxsFromAddressAsc) len() int {
return len(t)
}

// cap - How many txs can be kept in slice, without further allocation
func (t TxsFromAddressAsc) cap() int {
return cap(t)
}

// get - Return all txs living in pool, sent from specific address
func (t TxsFromAddressAsc) get() []*MemPoolTx {
return t
}

// findInsertionPoint - When attempting to insert new tx into this slice,
// find index where to insert, so that it stays sorted ( ascending ), as per
// nonce field of tx
func (t TxsFromAddressAsc) findInsertionPoint(low int, high int, tx *MemPoolTx) int {

if low > high {
return 0
}

if low == high {

if t[low].Nonce > tx.Nonce {
return low
}

return low + 1

}

mid := (low + high) / 2
if t[mid].Nonce > tx.Nonce {

return t.findInsertionPoint(low, mid, tx)

}

return t.findInsertionPoint(mid+1, high, tx)

}

// findTx - Find index of tx, which is already present in this sorted slice
// of txs, sent from some specific address
func (t TxsFromAddressAsc) findTx(low int, high int, tx *MemPoolTx) int {

if low > high {
return -1
}

if low == high {

if idx := findTxFromSlice(t[low:], tx); idx != -1 {
return low + idx
}

return -1

}

mid := (low + high) / 2
if t[mid].Nonce >= tx.Nonce {
return t.findTx(low, mid, tx)
}

return t.findTx(mid+1, high, tx)

}
7 changes: 3 additions & 4 deletions app/data/desc_gasprice.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ func (m MemPoolTxsDesc) findTx(low int, high int, tx *MemPoolTx) int {

if low == high {

idx := findTxFromSlice(m[low:], tx)
if idx == -1 {
return -1
if idx := findTxFromSlice(m[low:], tx); idx != -1 {
return low + idx
}

return low + idx
return -1

}

Expand Down
7 changes: 7 additions & 0 deletions app/data/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,10 @@ type ListRequest struct {
Order int
ResponseChan chan []*MemPoolTx
}

// TxsFromARequest - When requesting for txs living in pool
// sent from some specific address, use this construct
type TxsFromARequest struct {
From common.Address
ResponseChan chan []*MemPoolTx
}
Loading

0 comments on commit 20e7368

Please sign in to comment.