From 93f75409715208fd8c8c163bc1e7a2e0c9f6d1ce Mon Sep 17 00:00:00 2001 From: Vlad Frangu Date: Mon, 2 Sep 2024 18:07:51 +0300 Subject: [PATCH] fix(RequestQueueV2): implement an isFinished escape hatch if the queue ends up in what seems to be a deadlock state --- packages/core/src/storages/request_provider.ts | 10 ++++++++++ packages/core/src/storages/request_queue_v2.ts | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/packages/core/src/storages/request_provider.ts b/packages/core/src/storages/request_provider.ts index 45f1466d4a78..9558d37a14ad 100644 --- a/packages/core/src/storages/request_provider.ts +++ b/packages/core/src/storages/request_provider.ts @@ -57,6 +57,8 @@ export abstract class RequestProvider implements IStorage { protected isFinishedCalledWhileHeadWasNotEmpty = 0; + protected isFinishedWarningsEmitted = 0; + constructor( options: InternalRequestProviderOptions, readonly config = Configuration.getGlobalConfig(), @@ -612,6 +614,8 @@ export abstract class RequestProvider implements IStorage { clientKey: this.clientKey, }, ); + + this.isFinishedWarningsEmitted++; } else { this.log.debug( 'Queue head still returned requests that need to be processed (or that are locked by other clients)', @@ -622,6 +626,12 @@ export abstract class RequestProvider implements IStorage { } } + // Temporary deadlock exit: somehow, we can end up in a state where the isEmpty function says its true, but then the head is not only not empty, + // but never empties either! This is a temporary fix to let crawlers exit after a while if they're stuck in this state. + if (this.isFinishedWarningsEmitted >= 3) { + return true; + } + return currentHead.items.length === 0; } diff --git a/packages/core/src/storages/request_queue_v2.ts b/packages/core/src/storages/request_queue_v2.ts index 41dfc5e711e4..2d4daffb1767 100644 --- a/packages/core/src/storages/request_queue_v2.ts +++ b/packages/core/src/storages/request_queue_v2.ts @@ -177,14 +177,21 @@ export class RequestQueue extends RequestProvider { // Stop fetching if we are paused for migration if (this.queuePausedForMigration) { + this.log.debug('Queue is paused for migration, skipping fetching of new requests'); return; } // We want to fetch ahead of time to minimize dead time if (this.queueHeadIds.length() > 1) { + // Not logging here because that'll be _real_ spammy return; } + // TODO: if this logs multiple times in a row, we need to double check that we don't need a mutex! + if (!this._listHeadAndLockPromise) { + this.log.debug('Scheduling fetching of new requests from queue head'); + } + this._listHeadAndLockPromise ??= this._listHeadAndLock().finally(() => { this._listHeadAndLockPromise = null; });