Skip to content

Commit

Permalink
fix: Use correct limit when retrying a limit query stream with a curs…
Browse files Browse the repository at this point in the history
…or (#2203)

* fix: Use correct limit when retrying a limit query stream with a cursor

When a query is streamed and the stream fails after receiving part of the
result set, the client retries the stream with a cursor. If the original
query has a limit, the retry logic should use a modified limit for the
remainder of the stream to ensure the final result contains the correct
number of documents.

* Update recursiveDelete test.

* Add tests.

* Address feedback.

* Address feedback (2).
  • Loading branch information
ehsannas authored Oct 2, 2024
1 parent 1775ce6 commit ab94092
Show file tree
Hide file tree
Showing 3 changed files with 360 additions and 67 deletions.
30 changes: 28 additions & 2 deletions dev/src/reference/query-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export class QueryUtil<
const startTime = Date.now();
const isExplain = explainOptions !== undefined;

let numDocumentsReceived = 0;
let lastReceivedDocument: QueryDocumentSnapshot<
AppModelType,
DbModelType
Expand Down Expand Up @@ -239,6 +240,7 @@ export class QueryUtil<
);
}

++numDocumentsReceived;
callback(undefined, output);

if (proto.done) {
Expand Down Expand Up @@ -317,6 +319,12 @@ export class QueryUtil<
stream.destroy(err);
streamActive.resolve(/* active= */ false);
} else if (lastReceivedDocument && retryWithCursor) {
if (query instanceof VectorQuery) {
throw new Error(
'Unimplemented: Vector query does not support cursors yet.'
);
}

logger(
'Query._stream',
tag,
Expand All @@ -330,12 +338,30 @@ export class QueryUtil<
// the query cursor. Note that we do not use backoff here. The
// call to `requestStream()` will backoff should the restart
// fail before delivering any results.
let newQuery: Query<AppModelType, DbModelType>;
if (!this._queryOptions.limit) {
newQuery = query;
} else {
const newLimit =
this._queryOptions.limit - numDocumentsReceived;
if (
this._queryOptions.limitType === undefined ||
this._queryOptions.limitType === LimitType.First
) {
newQuery = query.limit(newLimit);
} else {
newQuery = query.limitToLast(newLimit);
}
}

if (this._queryOptions.requireConsistency) {
request = query
request = newQuery
.startAfter(lastReceivedDocument)
.toProto(lastReceivedDocument.readTime);
} else {
request = query.startAfter(lastReceivedDocument).toProto();
request = newQuery
.startAfter(lastReceivedDocument)
.toProto();
}

// Set lastReceivedDocument to null before each retry attempt to ensure the retry makes progress
Expand Down
Loading

0 comments on commit ab94092

Please sign in to comment.