Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
itzmeanjan committed May 8, 2021
2 parents 0b9c720 + a8ccd50 commit c219ac5
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 303 deletions.
40 changes: 16 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ During my journey of exploring Ethereum MemPool, I found good initiative from [B
## Prerequisite

- Make sure you've _`Go ( >= 1.16)`_, _`make`_ installed
- You need to also have _`Redis ( >= 5.x )`_

> Note : Consider setting up Redis instance with password protection
- Get one Ethereum Node up & running, with `txpool` RPC API enabled. You can always use SaaS Ethereum node.

## Installation
Expand Down Expand Up @@ -112,10 +108,6 @@ PendingTxEntryTopic=pending_pool_entry
PendingTxExitTopic=pending_pool_exit
QueuedTxEntryTopic=queued_pool_entry
QueuedTxExitTopic=queued_pool_exit
RedisConnection=tcp
RedisAddress=127.0.0.1:6379
RedisPassword=password
RedisDB=1
ConcurrencyFactor=10
Port=7000
```
Expand All @@ -127,14 +119,10 @@ WSUrl | To be used for listening to newly mined block headers
MemPoolPollingPeriod | RPC node's mempool to be checked every `X` milliseconds
PendingPoolSize | #-of pending tx(s) to be kept in-memory at a time
QueuedPoolSize | #-of queued tx(s) to be kept in-memory at a time
PendingTxEntryTopic | Whenever tx enters pending pool, it'll be published on Redis topic `t`
PendingTxExitTopic | Whenever tx leaves pending pool, it'll be published on Redis topic `t`
QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Redis topic `t`
QueuedTxExitTopic | Whenever tx leaves queued pool, it'll be published on Redis topic `t`
RedisConnection | Communicate with Redis over transport protocol
RedisAddress | `address:port` combination of Redis
RedisPassword | Authentication details for talking to Redis. **[ Not mandatory ]**
RedisDB | Redis database to be used. **[ By default there're 16 of them ]**
PendingTxEntryTopic | Whenever tx enters pending pool, it'll be published on Pub/Sub topic `t`
PendingTxExitTopic | Whenever tx leaves pending pool, it'll be published on Pub/Sub topic `t`
QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Pub/Sub topic `t`
QueuedTxExitTopic | Whenever tx leaves queued pool, it'll be published on Pub/Sub topic `t`
ConcurrencyFactor | Whenever concurrency can be leveraged, `harmony` will create worker pool with `#-of logical CPUs x ConcurrencyFactor` go routines. **[ Can be float too ]**
Port | Starts HTTP server on this port ( > 1024 )

Expand All @@ -157,7 +145,7 @@ NetworkingStream=this-is-stream
NetworkingBootstrap=
```

As `harmony` nodes will form a P2P network, you need to **first** switch networking on, by setting `NetworkingEnabled` to `true` ( default value is `false` ).
As `harmony` nodes will form a P2P mesh network, you need to **first** switch networking on, by setting `NetworkingEnabled` to `true` ( default value is `false` ).

> If you explicitly set this field to `false`, all `Networking*` fields to be ignored.
Expand Down Expand Up @@ -186,8 +174,6 @@ This way you can keep adding `N`-many nodes to your cluster.

**This is recommended practice, but you can always test multi-node set up, while relying on same Ethereum Node. In that case your interest can be putting all these `harmony` instances behind load balancer & serving client requests in better fashion & it's perfectly okay.**

> ❗️ If you're using same Redis instance for multiple `harmony` nodes, make sure you've changed DB identifier or Pub/Sub topic names, to avoid any kind of clash.
---

- Let's build & run `harmony`
Expand Down Expand Up @@ -215,18 +201,24 @@ You'll receive response like 👇

```json
{
"pendingPoolSize": 67,
"queuedPoolSize": 0,
"uptime": "29.214603s",
"networkID": 137
"pendingPoolSize": 257530,
"queuedPoolSize": 55278,
"uptime": "271h54m7.240520958s",
"processed": 15808071,
"latestBlock": 12359655,
"latestSeenAgo": "8.46197605s",
"networkID": 1
}
```

Field | Interpretation
--- | ---
pendingPoolSize | Currently these many tx(s) are in pending state i.e. waiting to be picked up by some miner when next block gets mined
queuedPoolSize | These tx(s) are stuck, will only be eligible for mining when lower nonce tx(s) of same wallet gets mined
queuedPoolSize | These tx(s) are stuck, will only be eligible for mining when lower nonce tx(s) of same wallet joins pending pool
uptime | This mempool monitoring engine is alive for last `t` time unit
processed | Mempool has seen `N` tx(s) getting confirmed/ dropped i.e. permanently leaving pool
latestBlock | Last block, mempool heard of, from RPC Node
lastestSeenAgo | Last block was seen `t` time unit ago
networkID | The mempool monitoring engine keeps track of mempool of this network

### Mempool
Expand Down
51 changes: 12 additions & 39 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package bootup

import (
"context"
"errors"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/harmony/app/data"
"github.com/itzmeanjan/harmony/app/graph"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/harmony/app/networking"
"github.com/itzmeanjan/pubsub"
)

// GetNetwork - Make RPC call for reading network ID
Expand Down Expand Up @@ -53,47 +54,19 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
return nil, err
}

var options *redis.Options

// If password is given in config file
if config.Get("RedisPassword") != "" {

options = &redis.Options{
Network: config.Get("RedisConnection"),
Addr: config.Get("RedisAddress"),
Password: config.Get("RedisPassword"),
DB: int(config.GetRedisDBIndex()),
}

} else {
// If password is not given, attempting to connect with out it
//
// Though this is not recommended in production environment
options = &redis.Options{
Network: config.Get("RedisConnection"),
Addr: config.Get("RedisAddress"),
DB: int(config.GetRedisDBIndex()),
}

_pubsub := pubsub.New(ctx)
if !_pubsub.Alive {
return nil, errors.New("failed to start pub/sub hub")
}

_redis := redis.NewClient(options)
// Checking whether connection was successful or not
if err := _redis.Ping(ctx).Err(); err != nil {
return nil, err
}

// Passed this redis client handle to graphql query resolver
//
// To be used when subscription requests are received from clients
if err := graph.InitRedisClient(_redis); err != nil {
if err := graph.InitPubSub(_pubsub); err != nil {
return nil, err
}

// Redis client to be used in p2p networking communication
// handling section for letting clients know of some newly
// seen mempool tx
if err := networking.InitRedisClient(_redis); err != nil {
// Pubsub to be used in p2p networking handling section
// for letting clients know of some newly seen mempool tx
if err := networking.InitPubSub(_pubsub); err != nil {
return nil, err
}

Expand Down Expand Up @@ -134,7 +107,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
DoneChan: make(chan chan uint64, 1),
SetLastSeenBlockChan: lastSeenBlockChan,
LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1),
PubSub: _redis,
PubSub: _pubsub,
RPC: client,
}

Expand All @@ -153,7 +126,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
CountTxsChan: make(chan data.CountRequest, 1),
ListTxsChan: make(chan data.ListRequest, 1),
TxsFromAChan: make(chan data.TxsFromARequest, 1),
PubSub: _redis,
PubSub: _pubsub,
RPC: client,
PendingPool: pendingPool,
}
Expand Down Expand Up @@ -242,7 +215,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
RPCClient: client,
WSClient: wsClient,
Pool: pool,
Redis: _redis,
PubSub: _pubsub,
StartedAt: time.Now().UTC(),
NetworkID: network}, nil

Expand Down
19 changes: 0 additions & 19 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"log"
"math"
"runtime"
"strconv"

"github.com/spf13/viper"
)
Expand Down Expand Up @@ -130,24 +129,6 @@ func GetQueuedTxExitPublishTopic() string {

}

// GetRedisDBIndex - Read desired redis database index, which
// user asked `harmony` to use
//
// If nothing is provided, it'll use `1`, by default
func GetRedisDBIndex() uint8 {

db := Get("RedisDB")

_db, err := strconv.ParseUint(db, 10, 8)
if err != nil {
log.Printf("[❗️] Failed to parse redis database index : `%s`, using 1\n", err.Error())
return 1
}

return uint8(_db)

}

// GetConcurrencyFactor - Size of worker pool, is dictated by rule below
//
// @note You can set floating point value for `ConcurrencyFactor` ( > 0 )
Expand Down
31 changes: 16 additions & 15 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
"github.com/go-redis/redis/v8"
"github.com/itzmeanjan/harmony/app/config"
"github.com/itzmeanjan/harmony/app/listen"
"github.com/itzmeanjan/pubsub"
)

// PendingPool - Currently present pending tx(s) i.e. which are ready to
Expand Down Expand Up @@ -40,7 +40,7 @@ type PendingPool struct {
DoneChan chan chan uint64
SetLastSeenBlockChan chan uint64
LastSeenBlockChan chan chan LastSeenBlock
PubSub *redis.Client
PubSub *pubsub.PubSub
RPC *rpc.Client
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (p *PendingPool) Start(ctx context.Context) {
tx.Pool = "pending"

addTx(tx)
p.PublishAdded(ctx, p.PubSub, tx)
p.PublishAdded(ctx, tx)

return true

Expand Down Expand Up @@ -193,7 +193,7 @@ func (p *PendingPool) Start(ctx context.Context) {
}

removeTx(tx)
p.PublishRemoved(ctx, p.PubSub, tx)
p.PublishRemoved(ctx, tx)

return true

Expand Down Expand Up @@ -1022,18 +1022,19 @@ func (p *PendingPool) VerifiedAdd(ctx context.Context, tx *MemPoolTx) bool {

// PublishAdded - Publish new pending tx pool content ( in messagepack serialized format )
// to pubsub topic
func (p *PendingPool) PublishAdded(ctx context.Context, pubsub *redis.Client, msg *MemPoolTx) {
func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
if err != nil {

log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return

}

if err := pubsub.Publish(ctx, config.GetPendingTxEntryPublishTopic(), _msg).Err(); err != nil {
log.Printf("[❗️] Failed to publish new pending tx : %s\n", err.Error())
if ok, _ := p.PubSub.Publish(&pubsub.Message{
Topics: []pubsub.String{pubsub.String(config.GetPendingTxEntryPublishTopic())},
Data: _msg,
}); !ok {
log.Printf("[❗️] Failed to publish new pending tx\n")
}

}
Expand All @@ -1043,7 +1044,6 @@ func (p *PendingPool) PublishAdded(ctx context.Context, pubsub *redis.Client, ms
func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool {

respChan := make(chan bool)

p.RemoveTxChan <- RemoveRequest{TxStat: txStat, ResponseChan: respChan}

return <-respChan
Expand All @@ -1054,18 +1054,19 @@ func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool {
// to pubsub topic
//
// These tx(s) are leaving pending pool i.e. they're confirmed now
func (p *PendingPool) PublishRemoved(ctx context.Context, pubsub *redis.Client, msg *MemPoolTx) {
func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) {

_msg, err := msg.ToMessagePack()
if err != nil {

log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error())
return

}

if err := pubsub.Publish(ctx, config.GetPendingTxExitPublishTopic(), _msg).Err(); err != nil {
log.Printf("[❗️] Failed to publish confirmed tx : %s\n", err.Error())
if ok, _ := p.PubSub.Publish(&pubsub.Message{
Topics: []pubsub.String{pubsub.String(config.GetPendingTxExitPublishTopic())},
Data: _msg,
}); !ok {
log.Printf("[❗️] Failed to publish confirmed/dropped tx\n")
}

}
Expand Down
3 changes: 1 addition & 2 deletions app/data/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/go-redis/redis/v8"
)

// MemPool - Current state of mempool, where all pending/ queued tx(s)
Expand Down Expand Up @@ -179,7 +178,7 @@ func (m *MemPool) Stat(start time.Time) {
// is received from any `harmony` peer, it will be checked against latest state
// of local mempool view, to decide whether this tx can be acted upon
// somehow or not
func (m *MemPool) HandleTxFromPeer(ctx context.Context, pubsub *redis.Client, tx *MemPoolTx) bool {
func (m *MemPool) HandleTxFromPeer(ctx context.Context, tx *MemPoolTx) bool {

// Checking whether we already have this tx included in pool
// or not
Expand Down
Loading

0 comments on commit c219ac5

Please sign in to comment.