Skip to content

Commit

Permalink
query: fix retry query case.
Browse files Browse the repository at this point in the history
In case the backend is very unstable and times out the batch we
need to make sure ongoing queryJobs are droped and already
registered queryJobs are removed from the heap as well.
  • Loading branch information
ziggie1984 committed Apr 3, 2024
1 parent 43f5a58 commit 535aa62
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 52 deletions.
1 change: 1 addition & 0 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func defaultQueryOptions() *queryOptions {
timeout: defaultQueryTimeout,
encoding: defaultQueryEncoding,
numRetries: defaultNumRetries,
cancelChan: make(chan struct{}),
}
}

Expand Down
127 changes: 75 additions & 52 deletions query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ func (w *peerWorkManager) workDispatcher() {
heap.Init(work)

type batchProgress struct {
noRetryMax bool
maxRetries uint8
timeout <-chan time.Time
rem int
errChan chan error
noRetryMax bool
maxRetries uint8
timeout <-chan time.Time
rem int
errChan chan error
cancelWorkChan chan struct{}
}

// We set up a batch index counter to keep track of batches that still
Expand Down Expand Up @@ -311,6 +312,19 @@ Loop:
delete(currentQueries, result.job.index)
batch := currentBatches[batchNum]

// In case the batch is already canceled we return
// early.
if batch == nil {
log.Warnf("Query(%d) result from peer %v "+
"discarded with retries %d, because "+
"batch already canceled: %v",
result.job.index,
result.peer.Addr(),
result.job.tries, result.err)

continue Loop
}

switch {
// If the query ended because it was canceled, drop it.
case result.err == ErrJobCanceled:
Expand All @@ -322,30 +336,35 @@ Loop:
// was canceled, forward the error on the
// batch's error channel. We do this since a
// cancellation applies to the whole batch.
if batch != nil {
batch.errChan <- result.err
delete(currentBatches, batchNum)
batch.errChan <- result.err
delete(currentBatches, batchNum)

log.Debugf("Canceled batch %v",
batchNum)
continue Loop
}
log.Debugf("Canceled batch %v",
batchNum)
continue Loop

// If the query ended with any other error, put it back
// into the work queue if it has not reached the
// maximum number of retries.
case result.err != nil:
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(
result.peer.Addr(),
)
} else {
// Punish the peer for the failed query.
w.cfg.Ranking.Punish(result.peer.Addr())
}

if batch != nil && !batch.noRetryMax {
if !batch.noRetryMax {
result.job.tries++
}

// Check if this query has reached its maximum
// number of retries. If so, remove it from the
// batch and don't reschedule it.
if batch != nil && !batch.noRetryMax &&
if !batch.noRetryMax &&
result.job.tries >= batch.maxRetries {

log.Warnf("Query(%d) from peer %v "+
Expand Down Expand Up @@ -380,11 +399,6 @@ Loop:
result.job.timeout = newTimeout
}

// Refresh peer rank on disconnect.
if result.err == ErrPeerDisconnected {
w.cfg.Ranking.ResetRanking(result.peer.Addr())
}

heap.Push(work, result.job)
currentQueries[result.job.index] = batchNum

Expand All @@ -396,42 +410,48 @@ Loop:

// Decrement the number of queries remaining in
// the batch.
if batch != nil {
batch.rem--
log.Tracef("Remaining jobs for batch "+
"%v: %v ", batchNum, batch.rem)

// If this was the last query in flight
// for this batch, we can notify that
// it finished, and delete it.
if batch.rem == 0 {
batch.errChan <- nil
delete(currentBatches, batchNum)

log.Tracef("Batch %v done",
batchNum)
continue Loop
}
batch.rem--
log.Tracef("Remaining jobs for batch "+
"%v: %v ", batchNum, batch.rem)

// If this was the last query in flight
// for this batch, we can notify that
// it finished, and delete it.
if batch.rem == 0 {
batch.errChan <- nil
delete(currentBatches, batchNum)

log.Tracef("Batch %v done",
batchNum)
continue Loop
}

}

// If the total timeout for this batch has passed,
// return an error.
if batch != nil {
select {
case <-batch.timeout:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)
select {
case <-batch.timeout:
batch.errChan <- ErrQueryTimeout
delete(currentBatches, batchNum)

// When deleting the particular batch
// number we need to make sure to cancel
// all queued and ongoing queryJobs
// to not waste resources when the batch
// call is already canceled.
if batch.cancelWorkChan != nil {
close(batch.cancelWorkChan)
}

log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)
log.Warnf("Query(%d) failed with "+
"error: %v. Timing out.",
result.job.index, result.err)

log.Debugf("Batch %v timed out",
batchNum)
log.Warnf("Batch %v timed out",
batchNum)

default:
}
default:
}

// A new batch of queries where scheduled.
Expand All @@ -457,9 +477,12 @@ Loop:
currentBatches[batchIndex] = &batchProgress{
noRetryMax: batch.options.noRetryMax,
maxRetries: batch.options.numRetries,
timeout: time.After(batch.options.timeout),
rem: len(batch.requests),
errChan: batch.errChan,
timeout: time.After(
batch.options.timeout,
),
rem: len(batch.requests),
errChan: batch.errChan,
cancelWorkChan: batch.options.cancelChan,
}
batchIndex++

Expand Down

0 comments on commit 535aa62

Please sign in to comment.