Skip to content

Commit

Permalink
add cancellation support to async iterable iteration (#4274)
Browse files Browse the repository at this point in the history
When the abort signal is triggered, any pending `.next()` calls should
return immediately
  • Loading branch information
yaacovCR authored Nov 6, 2024
1 parent cca3f98 commit 1d98a6a
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 55 deletions.
29 changes: 26 additions & 3 deletions src/execution/PromiseCanceller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

/**
* A PromiseCanceller object can be used to cancel multiple promises
* using a single AbortSignal.
* A PromiseCanceller object can be used to trigger multiple responses
* in response to a single AbortSignal.
*
* @internal
*/
Expand All @@ -28,7 +28,7 @@ export class PromiseCanceller {
this.abortSignal.removeEventListener('abort', this.abort);
}

withCancellation<T>(originalPromise: Promise<T>): Promise<T> {
cancellablePromise<T>(originalPromise: Promise<T>): Promise<T> {
if (this.abortSignal.aborted) {
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
return Promise.reject(this.abortSignal.reason);
Expand All @@ -50,4 +50,27 @@ export class PromiseCanceller {

return promise;
}

cancellableIterable<T>(iterable: AsyncIterable<T>): AsyncIterable<T> {
const iterator = iterable[Symbol.asyncIterator]();

const _next = iterator.next.bind(iterator);

if (iterator.return) {
const _return = iterator.return.bind(iterator);

return {
[Symbol.asyncIterator]: () => ({
next: () => this.cancellablePromise(_next()),
return: () => this.cancellablePromise(_return()),
}),
};
}

return {
[Symbol.asyncIterator]: () => ({
next: () => this.cancellablePromise(_next()),
}),
};
}
}
119 changes: 92 additions & 27 deletions src/execution/__tests__/PromiseCanceller-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,117 @@ import { expectPromise } from '../../__testUtils__/expectPromise.js';
import { PromiseCanceller } from '../PromiseCanceller.js';

describe('PromiseCanceller', () => {
it('works to cancel an already resolved promise', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
describe('cancellablePromise', () => {
it('works to cancel an already resolved promise', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

const promiseCanceller = new PromiseCanceller(abortSignal);
const promiseCanceller = new PromiseCanceller(abortSignal);

const promise = Promise.resolve(1);
const promise = Promise.resolve(1);

const withCancellation = promiseCanceller.withCancellation(promise);
const withCancellation = promiseCanceller.cancellablePromise(promise);

abortController.abort(new Error('Cancelled!'));
abortController.abort(new Error('Cancelled!'));

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});
await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel an already resolved promise after abort signal triggered', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

abortController.abort(new Error('Cancelled!'));

it('works to cancel a hanging promise', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
const promiseCanceller = new PromiseCanceller(abortSignal);

const promiseCanceller = new PromiseCanceller(abortSignal);
const promise = Promise.resolve(1);

const promise = new Promise(() => {
/* never resolves */
const withCancellation = promiseCanceller.cancellablePromise(promise);

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

const withCancellation = promiseCanceller.withCancellation(promise);
it('works to cancel a hanging promise', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

const promiseCanceller = new PromiseCanceller(abortSignal);

const promise = new Promise(() => {
/* never resolves */
});

const withCancellation = promiseCanceller.cancellablePromise(promise);

abortController.abort(new Error('Cancelled!'));

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel a hanging promise created after abort signal triggered', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

abortController.abort(new Error('Cancelled!'));

abortController.abort(new Error('Cancelled!'));
const promiseCanceller = new PromiseCanceller(abortSignal);

await expectPromise(withCancellation).toRejectWith('Cancelled!');
const promise = new Promise(() => {
/* never resolves */
});

const withCancellation = promiseCanceller.cancellablePromise(promise);

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});
});

it('works to cancel a hanging promise created after abort signal triggered', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
describe('cancellableAsyncIterable', () => {
it('works to abort a next call', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

const promiseCanceller = new PromiseCanceller(abortSignal);

const asyncIterable = {
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ value: 1, done: false }),
}),
};

const cancellableAsyncIterable =
promiseCanceller.cancellableIterable(asyncIterable);

abortController.abort(new Error('Cancelled!'));
const nextPromise =
cancellableAsyncIterable[Symbol.asyncIterator]().next();

const promiseCanceller = new PromiseCanceller(abortSignal);
abortController.abort(new Error('Cancelled!'));

const promise = new Promise(() => {
/* never resolves */
await expectPromise(nextPromise).toRejectWith('Cancelled!');
});

const withCancellation = promiseCanceller.withCancellation(promise);
it('works to abort a next call when already aborted', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

await expectPromise(withCancellation).toRejectWith('Cancelled!');
abortController.abort(new Error('Cancelled!'));

const promiseCanceller = new PromiseCanceller(abortSignal);

const asyncIterable = {
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ value: 1, done: false }),
}),
};

const cancellableAsyncIterable =
promiseCanceller.cancellableIterable(asyncIterable);

const nextPromise =
cancellableAsyncIterable[Symbol.asyncIterator]().next();

await expectPromise(nextPromise).toRejectWith('Cancelled!');
});
});
});
Loading

0 comments on commit 1d98a6a

Please sign in to comment.