Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #16 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Reduce overtime memory usage
  • Loading branch information
itzmeanjan authored May 3, 2021
2 parents cd01e26 + f34a3e6 commit 0b9c720
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 23 deletions.
8 changes: 4 additions & 4 deletions app/bootup/bootup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
pendingPool := &data.PendingPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
TxsFromAddress: make(map[common.Address]data.TxList),
DroppedTxs: make(map[common.Hash]bool),
RemovedTxs: make(map[common.Hash]bool),
DroppedTxs: make(map[common.Hash]time.Time),
RemovedTxs: make(map[common.Hash]time.Time),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()),
Done: 0,
Expand All @@ -142,8 +142,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) {
queuedPool := &data.QueuedPool{
Transactions: make(map[common.Hash]*data.MemPoolTx),
TxsFromAddress: make(map[common.Address]data.TxList),
DroppedTxs: make(map[common.Hash]bool),
RemovedTxs: make(map[common.Hash]bool),
DroppedTxs: make(map[common.Hash]time.Time),
RemovedTxs: make(map[common.Hash]time.Time),
AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetQueuedPoolSize()),
DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetQueuedPoolSize()),
AddTxChan: make(chan data.AddRequest, 1),
Expand Down
44 changes: 36 additions & 8 deletions app/data/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
type PendingPool struct {
Transactions map[common.Hash]*MemPoolTx
TxsFromAddress map[common.Address]TxList
DroppedTxs map[common.Hash]bool
RemovedTxs map[common.Hash]bool
DroppedTxs map[common.Hash]time.Time
RemovedTxs map[common.Hash]time.Time
AscTxsByGasPrice TxList
DescTxsByGasPrice TxList
Done uint64
Expand Down Expand Up @@ -133,7 +133,7 @@ func (p *PendingPool) Start(ctx context.Context) {
// is due to the fact, no other competing
// worker attempting to read from/ write to
// this one, now
p.DroppedTxs[tx.Hash] = true
p.DroppedTxs[tx.Hash] = time.Now().UTC()

}

Expand All @@ -145,19 +145,17 @@ func (p *PendingPool) Start(ctx context.Context) {
}

if _, ok := p.DroppedTxs[tx.Hash]; ok {
p.DroppedTxs[tx.Hash] = time.Now().UTC()
return false
}

if _, ok := p.RemovedTxs[tx.Hash]; ok {
p.RemovedTxs[tx.Hash] = time.Now().UTC()
return false
}

if needToDropTxs() {
dropTx(pickTxWithLowestGasPrice())

if len(p.DroppedTxs)%10 == 0 {
log.Printf("🧹 Dropped 10 pending txs, was about to hit limit\n")
}
}

// Marking we found this tx in mempool now
Expand Down Expand Up @@ -235,7 +233,7 @@ func (p *PendingPool) Start(ctx context.Context) {
if removed {
// Marking that tx has been removed, so that
// it won't get picked up next time
p.RemovedTxs[req.TxStat.Hash] = true
p.RemovedTxs[req.TxStat.Hash] = time.Now().UTC()
p.Done++
}

Expand Down Expand Up @@ -334,6 +332,36 @@ func (p *PendingPool) Start(ctx context.Context) {

req <- LastSeenBlock{Number: p.LastSeenBlock, At: p.LastSeenAt}

case <-time.After(time.Duration(1) * time.Millisecond):
// After 1 hour of keeping entries which were previously removed
// are now being deleted from memory, so that memory usage for keeping track of
// which were removed in past doesn't become a problem for us.
//
// 1 hour is just a random time period, it can be possibly improved
//
// Just hoping after 1 hour of last time this tx was seen to be added
// into this pool, it has been either dropped/ confirmed, so it won't
// be attempted to be added here again

for k := range p.DroppedTxs {

if time.Now().UTC().Sub(p.DroppedTxs[k]) > time.Duration(1)*time.Hour {
delete(p.DroppedTxs, k)
}

}

case <-time.After(time.Duration(1) * time.Millisecond):
// Read 👆 comment

for k := range p.RemovedTxs {

if time.Now().UTC().Sub(p.RemovedTxs[k]) > time.Duration(1)*time.Hour {
delete(p.RemovedTxs, k)
}

}

}

}
Expand Down
53 changes: 42 additions & 11 deletions app/data/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
type QueuedPool struct {
Transactions map[common.Hash]*MemPoolTx
TxsFromAddress map[common.Address]TxList
DroppedTxs map[common.Hash]bool
RemovedTxs map[common.Hash]bool
DroppedTxs map[common.Hash]time.Time
RemovedTxs map[common.Hash]time.Time
AscTxsByGasPrice TxList
DescTxsByGasPrice TxList
AddTxChan chan AddRequest
Expand Down Expand Up @@ -118,7 +118,7 @@ func (q *QueuedPool) Start(ctx context.Context) {
removeTx(tx)
// Marking that tx has been dropped, so that
// it won't get picked up next time
q.DroppedTxs[tx.Hash] = true
q.DroppedTxs[tx.Hash] = time.Now().UTC()

}

Expand All @@ -129,19 +129,17 @@ func (q *QueuedPool) Start(ctx context.Context) {
}

if _, ok := q.DroppedTxs[tx.Hash]; ok {
q.DroppedTxs[tx.Hash] = time.Now().UTC()
return false
}

if _, ok := q.RemovedTxs[tx.Hash]; ok {
q.RemovedTxs[tx.Hash] = time.Now().UTC()
return false
}

if needToDropTxs() {
dropTx(pickTxWithLowestGasPrice())

if len(q.DroppedTxs)%10 == 0 {
log.Printf("🧹 Dropped 10 queued txs, was about to hit limit\n")
}
}

// Marking we found this tx in mempool now
Expand Down Expand Up @@ -184,11 +182,14 @@ func (q *QueuedPool) Start(ctx context.Context) {
case req := <-q.RemoveTxChan:

// if removed will return non-nil reference to removed tx
req.ResponseChan <- txRemover(req.Hash)
removed := txRemover(req.Hash)
req.ResponseChan <- removed

// Marking that tx has been removed, so that
// it won't get picked up next time
q.RemovedTxs[req.Hash] = true
if removed != nil {
// Marking that tx has been removed, so that
// it won't get picked up next time
q.RemovedTxs[req.Hash] = time.Now().UTC()
}

case req := <-q.TxExistsChan:

Expand Down Expand Up @@ -260,6 +261,36 @@ func (q *QueuedPool) Start(ctx context.Context) {

req.ResponseChan <- nil

case <-time.After(time.Duration(1) * time.Millisecond):
// After 1 hour of keeping entries which were previously removed
// are now being deleted from memory, so that memory usage for keeping track of
// which were removed in past doesn't become a problem for us.
//
// 1 hour is just a random time period, it can be possibly improved
//
// Just hoping after 1 hour of last time this tx was seen to be added
// into this pool, it has been either dropped/ confirmed/ unstuck, so it won't
// be attempted to be added here again

for k := range q.DroppedTxs {

if time.Now().UTC().Sub(q.DroppedTxs[k]) > time.Duration(1)*time.Hour {
delete(q.DroppedTxs, k)
}

}

case <-time.After(time.Duration(1) * time.Millisecond):
// Read 👆 comment

for k := range q.RemovedTxs {

if time.Now().UTC().Sub(q.RemovedTxs[k]) > time.Duration(1)*time.Hour {
delete(q.RemovedTxs, k)
}

}

}

}
Expand Down

0 comments on commit 0b9c720

Please sign in to comment.