Skip to content

Commit

Permalink
Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209)
Browse files Browse the repository at this point in the history
ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim
> Transactions returned are not removed from the mempool transaction
> store or indexes.

However, they use a priority queue to accomplish the claim
> Transaction are retrieved in priority order.

This is accomplished by popping all items out of the whole heap, and
then pushing then back in sequentially. A copy of the heap cannot be
obtained otherwise. Both of the mentioned functions use a read-lock
(RLock) when doing this. This results in a potential scenario where
multiple executions of the ReapMax can be started in parallel, and
both would be popping items out of the priority queue.

In practice, this can be abused by executing the `unconfirmed_txs` RPC
call repeatedly. Based on our observations, running it multiple times
per millisecond results in multiple threads picking it up at the same
time. Such a scenario can be obtained via the WebSocket interface, and
spamming `unconfirmed_txs` calls there. The behavior that happens is a
`Panic in WSJSONRPC handler` when a queue item unexpectedly disappears
for `mempool.(*TxPriorityQueue).Swap`.
(`runtime error: index out of range [0] with length 0`)

This can additionally lead to a `CONSENSUS FAILURE!!!` if the race
condition occurs for `internal/consensus.(*State).finalizeCommit`
when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but
the ReapMax has already removed all elements from the underlying
heap. (`runtime error: index out of range [-1]`)

This commit switches the lock type to a write-lock (Lock) to ensure
no parallel modifications take place. This commit additionally updates
the tests to allow parallel execution of the func calls in testing,
as to prevent regressions (in case someone wants to downgrade the locks
without considering the implications from the underlying heap usage).
  • Loading branch information
sigv authored Mar 12, 2024
1 parent 8061a47 commit 3cc1293
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 34 deletions.
8 changes: 4 additions & 4 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ func (txmp *TxMempool) Flush() {
// - Transactions returned are not removed from the mempool transaction
// store or indexes.
func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

var (
totalGas int64
Expand Down Expand Up @@ -417,8 +417,8 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
// - Transactions returned are not removed from the mempool transaction
// store or indexes.
func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

numTxs := txmp.priorityIndex.NumTxs()
if max < 0 {
Expand Down
92 changes: 62 additions & 30 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,27 +304,43 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
}

var wg sync.WaitGroup

// reap by gas capacity only
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 50)
wg.Add(1)
go func() {
defer wg.Done()
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 50)
}()

// reap by transaction bytes only
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.GreaterOrEqual(t, len(reapedTxs), 16)
wg.Add(1)
go func() {
defer wg.Done()
reapedTxs := txmp.ReapMaxBytesMaxGas(1000, -1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.GreaterOrEqual(t, len(reapedTxs), 16)
}()

// Reap by both transaction bytes and gas, where the size yields 31 reaped
// transactions and the gas limit reaps 25 transactions.
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 25)
wg.Add(1)
go func() {
defer wg.Done()
reapedTxs := txmp.ReapMaxBytesMaxGas(1500, 30)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 25)
}()

wg.Wait()
}

func TestTxMempool_ReapMaxTxs(t *testing.T) {
Expand Down Expand Up @@ -363,26 +379,42 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
}

var wg sync.WaitGroup

// reap all transactions
reapedTxs := txmp.ReapMaxTxs(-1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs))
wg.Add(1)
go func() {
defer wg.Done()
reapedTxs := txmp.ReapMaxTxs(-1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs))
}()

// reap a single transaction
reapedTxs = txmp.ReapMaxTxs(1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 1)
wg.Add(1)
go func() {
defer wg.Done()
reapedTxs := txmp.ReapMaxTxs(1)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, 1)
}()

// reap half of the transactions
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs)/2)
wg.Add(1)
go func() {
defer wg.Done()
reapedTxs := txmp.ReapMaxTxs(len(tTxs) / 2)
ensurePrioritized(reapedTxs)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
require.Len(t, reapedTxs, len(tTxs)/2)
}()

wg.Wait()
}

func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
Expand Down

0 comments on commit 3cc1293

Please sign in to comment.