diff --git a/packages/rpc-subscriptions/README.md b/packages/rpc-subscriptions/README.md index ccb81fdcd05..aaa19a23e6a 100644 --- a/packages/rpc-subscriptions/README.md +++ b/packages/rpc-subscriptions/README.md @@ -48,3 +48,7 @@ Given an `RpcSubscriptionsChannel`, will return a new channel that parses data p ### `getRpcSubscriptionsChannelWithAutoping(channel)` Given an `RpcSubscriptionsChannel`, will return a new channel that sends a ping message to the inner channel if a message has not been sent or received in the last `intervalMs`. In web browsers, this implementation sends no ping when the network is down, and sends a ping immediately upon the network coming back up. + +### `getRpcSubscriptionsTransportWithSubscriptionCoalescing(transport)` + +Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `subscriptionConfigurationHash` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error. diff --git a/packages/rpc-subscriptions/package.json b/packages/rpc-subscriptions/package.json index 0e3cbd7c2a2..459d68ed740 100644 --- a/packages/rpc-subscriptions/package.json +++ b/packages/rpc-subscriptions/package.json @@ -78,7 +78,8 @@ "@solana/rpc-subscriptions-channel-websocket": "workspace:*", "@solana/rpc-subscriptions-spec": "workspace:*", "@solana/rpc-transformers": "workspace:*", - "@solana/rpc-types": "workspace:*" + "@solana/rpc-types": "workspace:*", + "@solana/subscribable": "workspace:*" }, "peerDependencies": { "typescript": ">=5" diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts index b41ea77500d..3b1ef624ae3 100644 --- a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts +++ b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts @@ -1,342 +1,239 @@ -import type { RpcSubscriptions } from '@solana/rpc-subscriptions-spec'; +import type { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; -import { getRpcSubscriptionsWithSubscriptionCoalescing } from '../rpc-subscriptions-coalescer'; +import { getRpcSubscriptionsTransportWithSubscriptionCoalescing } from '../rpc-subscriptions-coalescer'; -interface TestRpcSubscriptionNotifications { - nonFunctionProperty: string; - thingNotifications(...args: unknown[]): unknown; -} - -describe('getRpcSubscriptionsWithSubscriptionCoalescing', () => { - let getAsyncIterable: jest.MockedFn<() => AsyncIterable>; - let createPendingSubscription: jest.Mock; - let getDeduplicationKey: jest.Mock; - let subscribe: jest.Mock; - let rpcSubscriptions: RpcSubscriptions; +describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { + let mockInnerTransport: jest.Mock; + let mockOn: jest.Mock; + let coalescedTransport: RpcSubscriptionsTransport; + function receiveError(err?: unknown) { + mockOn.mock.calls.filter(([type]) => type === 'error').forEach(([_, listener]) => listener(err)); + } beforeEach(() => { + mockOn = jest.fn(); + mockInnerTransport = jest.fn().mockResolvedValue({ on: mockOn }); + coalescedTransport = getRpcSubscriptionsTransportWithSubscriptionCoalescing(mockInnerTransport); + }); + it('returns the inner transport', async () => { + expect.assertions(1); + const expectedDataPublisher = { on: mockOn }; + mockInnerTransport.mockResolvedValue(expectedDataPublisher); + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const transportPromise = coalescedTransport(config); + await expect(transportPromise).resolves.toBe(expectedDataPublisher); + }); + it('passes the `executeSubscriptionPlan` config to the inner transport', () => { + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + coalescedTransport(config); + expect(mockInnerTransport).toHaveBeenCalledWith( + expect.objectContaining({ + executeSubscriptionPlan: config.executeSubscriptionPlan, + }), + ); + }); + it('passes the `subscriptionConfigurationHash` config to the inner transport', () => { + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + coalescedTransport(config); + expect(mockInnerTransport).toHaveBeenCalledWith( + expect.objectContaining({ + subscriptionConfigurationHash: 'MOCK_HASH', + }), + ); + }); + it('calls the inner transport once per subscriber whose hashes do not match, in the same runloop', () => { + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + }; + coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_A' }); + coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_B' }); + expect(mockInnerTransport).toHaveBeenCalledTimes(2); + }); + it('calls the inner transport once per subscriber whose hashes do not match, in different runloops', async () => { + expect.assertions(1); + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + }; + await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_A' }); + await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_B' }); + expect(mockInnerTransport).toHaveBeenCalledTimes(2); + }); + it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in the same runloop", () => { + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + }; + coalescedTransport({ ...config, subscriptionConfigurationHash: undefined }); + coalescedTransport({ ...config, subscriptionConfigurationHash: undefined }); + expect(mockInnerTransport).toHaveBeenCalledTimes(2); + }); + it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in different runloops", async () => { + expect.assertions(1); + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + }; + await coalescedTransport({ ...config, subscriptionConfigurationHash: undefined }); + await coalescedTransport({ ...config, subscriptionConfigurationHash: undefined }); + expect(mockInnerTransport).toHaveBeenCalledTimes(2); + }); + it('only calls the inner transport once, in the same runloop', () => { + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + coalescedTransport(config); + coalescedTransport(config); + expect(mockInnerTransport).toHaveBeenCalledTimes(1); + }); + it('only calls the inner transport once, in different runloops', async () => { + expect.assertions(1); + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + await coalescedTransport(config); + await coalescedTransport(config); + expect(mockInnerTransport).toHaveBeenCalledTimes(1); + }); + it('delivers the same value to each subscriber, in the same runloop', async () => { + expect.assertions(1); + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const [publisherA, publisherB] = await Promise.all([coalescedTransport(config), coalescedTransport(config)]); + expect(publisherA).toBe(publisherB); + }); + it('delivers the same value to each subscriber, in different runloops', async () => { + expect.assertions(1); + const config = { + executeSubscriptionPlan: jest.fn(), + signal: new AbortController().signal, + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const publisherA = await coalescedTransport(config); + const publisherB = await coalescedTransport(config); + expect(publisherA).toBe(publisherB); + }); + it('does not fire the inner abort signal if fewer than all subscribers abort, in the same runloop', () => { jest.useFakeTimers(); - getAsyncIterable = jest.fn().mockImplementation(async function* () { - yield await new Promise(() => { - /* never resolve */ - }); - }); - getDeduplicationKey = jest.fn(); - subscribe = jest.fn().mockReturnValue({ - [Symbol.asyncIterator]() { - return getAsyncIterable()[Symbol.asyncIterator](); - }, - }); - createPendingSubscription = jest.fn().mockReturnValue({ subscribe }); - rpcSubscriptions = getRpcSubscriptionsWithSubscriptionCoalescing({ - getDeduplicationKey, - rpcSubscriptions: { - nonFunctionProperty: 'foo', - thingNotifications: createPendingSubscription, - }, - }); + const config = { + executeSubscriptionPlan: jest.fn(), + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const abortControllerB = new AbortController(); + coalescedTransport({ ...config, signal: new AbortController().signal }); + coalescedTransport({ ...config, signal: abortControllerB.signal }); + abortControllerB.abort(); + jest.runAllTicks(); + expect(mockInnerTransport.mock.lastCall?.[0].signal).toHaveProperty('aborted', false); }); - describe('given invocations that produce the same deduplication key', () => { - beforeEach(() => { - getDeduplicationKey.mockReturnValue('deduplication-key'); - }); - it("creates a pending subscription once, with the first invocation's config", async () => { - expect.assertions(2); - await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(createPendingSubscription).toHaveBeenCalledTimes(1); - expect(createPendingSubscription).toHaveBeenCalledWith({ - payload: 'hello', - }); - }); - it('only calls subscribe once, in the same runloop', async () => { - expect.assertions(1); - await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(subscribe).toHaveBeenCalledTimes(1); - }); - it('only calls subscribe once, in different runloops', async () => { - expect.assertions(1); - await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - expect(subscribe).toHaveBeenCalledTimes(1); - }); - it('delivers different iterables to each subscription, in the same runloop', async () => { - expect.assertions(1); - const [iterableA, iterableB] = await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(iterableA).not.toBe(iterableB); - }); - it('delivers different iterables to each subscription, in different runloops', async () => { - expect.assertions(1); - const iterableA = await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iterableB = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - expect(iterableA).not.toBe(iterableB); - }); - it('publishes the same messages through both iterables', async () => { - expect.assertions(2); - getAsyncIterable.mockImplementation(async function* () { - yield Promise.resolve('hello'); - }); - const iterableA = await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iterableB = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iteratorA = iterableA[Symbol.asyncIterator](); - const iteratorB = iterableB[Symbol.asyncIterator](); - const messagePromiseA = iteratorA.next(); - const messagePromiseB = iteratorB.next(); - await jest.runAllTimersAsync(); - await expect(messagePromiseA).resolves.toHaveProperty('value', 'hello'); - await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello'); - }); - it('publishes the final message when the iterable returns', async () => { - expect.assertions(1); - getAsyncIterable.mockImplementation( - // eslint-disable-next-line require-yield - async function* () { - return await Promise.resolve('hello'); - }, - ); - const iterable = await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iterator = iterable[Symbol.asyncIterator](); - const messagePromise = iterator.next(); - await expect(messagePromise).resolves.toHaveProperty('value', 'hello'); - }); - it('aborting a subscription causes it to return', async () => { - expect.assertions(1); - getAsyncIterable.mockImplementation(async function* () { - yield Promise.resolve('hello'); - }); - const abortController = new AbortController(); - const iterable = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: abortController.signal }); - const iterator = iterable[Symbol.asyncIterator](); - const messagePromise = iterator.next(); - abortController.abort(); - await expect(messagePromise).resolves.toHaveProperty('done', true); - }); - it('aborting one subscription does not abort the other', async () => { - expect.assertions(1); - getAsyncIterable.mockImplementation(async function* () { - yield Promise.resolve('hello'); - }); - const abortControllerA = new AbortController(); - await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: abortControllerA.signal }); - const iterableB = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iteratorB = iterableB[Symbol.asyncIterator](); - const messagePromiseB = iteratorB.next(); - abortControllerA.abort(); - await jest.runAllTimersAsync(); - await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello'); - }); + it('fires the inner abort signal if all subscribers abort, in the same runloop', () => { + jest.useFakeTimers(); + const config = { + executeSubscriptionPlan: jest.fn(), + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const abortControllerA = new AbortController(); + const abortControllerB = new AbortController(); + coalescedTransport({ ...config, signal: abortControllerA.signal }); + coalescedTransport({ ...config, signal: abortControllerB.signal }); + abortControllerA.abort(); + abortControllerB.abort(); + jest.runAllTicks(); + expect(mockInnerTransport.mock.lastCall?.[0].signal).toHaveProperty('aborted', true); + }); + it('fires the inner abort signal if all subscribers abort, in different runloops', async () => { + expect.assertions(1); + const config = { + executeSubscriptionPlan: jest.fn(), + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const abortControllerA = new AbortController(); + const abortControllerB = new AbortController(); + coalescedTransport({ ...config, signal: abortControllerA.signal }); + await jest.runAllTimersAsync(); + coalescedTransport({ ...config, signal: abortControllerB.signal }); + await jest.runAllTimersAsync(); + abortControllerA.abort(); + abortControllerB.abort(); + jest.runAllTicks(); + expect(mockInnerTransport.mock.lastCall?.[0].signal).toHaveProperty('aborted', true); }); - describe('given payloads that produce different deduplication keys', () => { - beforeEach(() => { - let deduplicationKey = 0; - getDeduplicationKey.mockImplementation(() => `${++deduplicationKey}`); - }); - it('creates a pending subscription for each', async () => { - expect.assertions(3); - await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(createPendingSubscription).toHaveBeenCalledTimes(2); - expect(createPendingSubscription).toHaveBeenNthCalledWith(1, { - payload: 'hello', - }); - expect(createPendingSubscription).toHaveBeenNthCalledWith(2, { - payload: 'world', - }); - }); - it('calls subscribe once for each subscription, in the same runloop', async () => { - expect.assertions(1); - await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(subscribe).toHaveBeenCalledTimes(2); - }); - it('calls subscribe once for each subscription, in different runloops', async () => { - expect.assertions(1); - await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - expect(subscribe).toHaveBeenCalledTimes(2); - }); - it('delivers different iterables to each subscription, in the same runloop', async () => { - expect.assertions(1); - const [iterableA, iterableB] = await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(iterableA).not.toBe(iterableB); - }); - it('delivers different iterables to each subscription, in different runloops', async () => { - expect.assertions(1); - const iterableA = await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iterableB = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - expect(iterableA).not.toBe(iterableB); - }); - it('publishes messages through the correct iterable', async () => { - expect.assertions(2); - subscribe.mockResolvedValueOnce({ - async *[Symbol.asyncIterator]() { - yield Promise.resolve('hello'); - }, - }); - const iterableA = await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - subscribe.mockResolvedValueOnce({ - async *[Symbol.asyncIterator]() { - yield Promise.resolve('world'); - }, - }); - const iterableB = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iteratorA = iterableA[Symbol.asyncIterator](); - const iteratorB = iterableB[Symbol.asyncIterator](); - const messagePromiseA = iteratorA.next(); - const messagePromiseB = iteratorB.next(); - await jest.runAllTimersAsync(); - await expect(messagePromiseA).resolves.toHaveProperty('value', 'hello'); - await expect(messagePromiseB).resolves.toHaveProperty('value', 'world'); - }); - it('aborting a subscription causes it to return', async () => { - expect.assertions(1); - getAsyncIterable.mockImplementation(async function* () { - yield Promise.resolve('hello'); - }); - const abortController = new AbortController(); - const iterable = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: abortController.signal }); - const iterator = iterable[Symbol.asyncIterator](); - const messagePromise = iterator.next(); - abortController.abort(); - await expect(messagePromise).resolves.toHaveProperty('done', true); - }); - it('aborting one subscription does not abort the other', async () => { - expect.assertions(1); - getAsyncIterable.mockImplementation(async function* () { - yield Promise.resolve('hello'); - }); - const abortControllerA = new AbortController(); - await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: abortControllerA.signal }); - const iterableB = await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - const iteratorB = iterableB[Symbol.asyncIterator](); - const messagePromiseB = iteratorB.next(); - abortControllerA.abort(); - await jest.runAllTimersAsync(); - await expect(messagePromiseB).resolves.toHaveProperty('value', 'hello'); - }); + it('does not fire the inner abort signal if the subscriber count is non zero at the end of the runloop, despite having aborted all in the middle of it', () => { + const config = { + executeSubscriptionPlan: jest.fn(), + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const abortControllerA = new AbortController(); + coalescedTransport({ ...config, signal: abortControllerA.signal }); + abortControllerA.abort(); + coalescedTransport({ ...config, signal: new AbortController().signal }); + jest.runAllTicks(); + expect(mockInnerTransport.mock.lastCall?.[0].signal).toHaveProperty('aborted', false); }); - describe('given payloads that produce no deduplcation key', () => { - beforeEach(() => { - getDeduplicationKey.mockReturnValue(undefined); - }); - it('creates a pending subscription for each', async () => { - expect.assertions(3); - await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(createPendingSubscription).toHaveBeenCalledTimes(2); - expect(createPendingSubscription).toHaveBeenNthCalledWith(1, { - payload: 'hello', - }); - expect(createPendingSubscription).toHaveBeenNthCalledWith(2, { - payload: 'world', - }); - }); - it('calls subscribe once for each subscription, in the same runloop', async () => { - expect.assertions(1); - await Promise.all([ - rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }), - rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }), - ]); - expect(subscribe).toHaveBeenCalledTimes(2); - }); - it('calls subscribe once for each subscription, in different runloops', async () => { - expect.assertions(1); - await rpcSubscriptions - .thingNotifications({ payload: 'hello' }) - .subscribe({ abortSignal: new AbortController().signal }); - await rpcSubscriptions - .thingNotifications({ payload: 'world' }) - .subscribe({ abortSignal: new AbortController().signal }); - expect(subscribe).toHaveBeenCalledTimes(2); - }); + it('does not re-coalesce new requests behind an errored transport', async () => { + expect.assertions(1); + jest.useFakeTimers(); + const config = { + executeSubscriptionPlan: jest.fn(), + subscriptionConfigurationHash: 'MOCK_HASH', + }; + coalescedTransport({ ...config, signal: new AbortController().signal }); + await jest.runAllTimersAsync(); + receiveError('o no'); + coalescedTransport({ ...config, signal: new AbortController().signal }); + expect(mockInnerTransport).toHaveBeenCalledTimes(2); }); - it('does not shim non-function properties on the RPC', () => { - expect(rpcSubscriptions.nonFunctionProperty).toBe('foo'); + it('does not cancel a newly-coalesced transport when an old errored one is aborted', async () => { + expect.assertions(2); + jest.useFakeTimers(); + const config = { + executeSubscriptionPlan: jest.fn(), + subscriptionConfigurationHash: 'MOCK_HASH', + }; + const abortControllerA = new AbortController(); + /** + * PHASE 1 + * Create and fail a transport. + */ + await coalescedTransport({ ...config, signal: abortControllerA.signal }); + receiveError('o no'); + mockInnerTransport.mockClear(); + /** + * PHASE 2 + * Create a new transport + */ + const publisherA = await coalescedTransport({ ...config, signal: new AbortController().signal }); + /** + * PHASE 3 + * Abort the original subscriber + */ + abortControllerA.abort(); + jest.runAllTicks(); + /** + * PHASE 4 + * Create a new transport and expect it to coalesce behind the one in phase 2 + */ + const publisherB = await coalescedTransport({ ...config, signal: new AbortController().signal }); + expect(publisherA).toBe(publisherB); + expect(mockInnerTransport).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts index bfcf66fa344..3245a273df5 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts @@ -1,132 +1,63 @@ -import { safeRace } from '@solana/promises'; -import { PendingRpcSubscriptionsRequest, RpcSubscriptions } from '@solana/rpc-subscriptions-spec'; +import { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; +import { DataPublisher } from '@solana/subscribable'; -import { getCachedAbortableIterableFactory } from './cached-abortable-iterable'; +type CacheEntry = { + readonly abortController: AbortController; + readonly dataPublisherPromise: Promise; + numSubscribers: number; +}; -type CacheKey = string | undefined; -type Config = Readonly<{ - getDeduplicationKey: GetDeduplicationKeyFn; - rpcSubscriptions: RpcSubscriptions; -}>; -type GetDeduplicationKeyFn = (subscriptionMethod: string | symbol, payload: unknown) => CacheKey; - -let EXPLICIT_ABORT_TOKEN: symbol; -function createExplicitAbortToken() { - // This function is an annoying workaround to prevent `process.env.NODE_ENV` from appearing at - // the top level of this module and thwarting an optimizing compiler's attempt to tree-shake. - return Symbol( - __DEV__ - ? "This symbol is thrown from a subscription's iterator when the subscription is " + - 'explicitly aborted by the user' - : undefined, - ); -} - -function registerIterableCleanup(iterable: AsyncIterable, cleanupFn: CallableFunction) { - (async () => { - try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of iterable); - } catch { - /* empty */ - } finally { - // Run the cleanup function. - cleanupFn(); +export function getRpcSubscriptionsTransportWithSubscriptionCoalescing( + transport: TTransport, +): TTransport { + const cache = new Map(); + return function rpcSubscriptionsTransportWithSubscriptionCoalescing(config) { + const { subscriptionConfigurationHash, signal } = config; + if (subscriptionConfigurationHash === undefined) { + return transport(config); } - })(); -} - -export function getRpcSubscriptionsWithSubscriptionCoalescing({ - getDeduplicationKey, - rpcSubscriptions, -}: Config): RpcSubscriptions { - const cache = new Map>(); - return new Proxy(rpcSubscriptions, { - defineProperty() { - return false; - }, - deleteProperty() { - return false; - }, - get(target, p, receiver) { - const subscriptionMethod = Reflect.get(target, p, receiver); - if (typeof subscriptionMethod !== 'function') { - return subscriptionMethod; - } - return function (...rawParams: unknown[]) { - const deduplicationKey = getDeduplicationKey(p, rawParams); - if (deduplicationKey === undefined) { - return (subscriptionMethod as CallableFunction)(...rawParams); - } - if (cache.has(deduplicationKey)) { - return cache.get(deduplicationKey)!; - } - const iterableFactory = getCachedAbortableIterableFactory< - Parameters['subscribe']>, - AsyncIterable - >({ - getAbortSignalFromInputArgs: ({ abortSignal }) => abortSignal, - getCacheKeyFromInputArgs: () => deduplicationKey, - async onCacheHit(_iterable, _config) { - /** - * This transport's goal is to prevent duplicate subscriptions from - * being made. If a cached iterable] is found, do not send the subscribe - * message again. - */ - }, - async onCreateIterable(abortSignal, config) { - const pendingSubscription = (subscriptionMethod as CallableFunction)( - ...rawParams, - ) as PendingRpcSubscriptionsRequest; - const iterable = await pendingSubscription.subscribe({ - ...config, - abortSignal, - }); - registerIterableCleanup(iterable, () => { - cache.delete(deduplicationKey); - }); - return iterable; - }, - }); - const pendingSubscription: PendingRpcSubscriptionsRequest = { - async subscribe(...args) { - const iterable = await iterableFactory(...args); - const { abortSignal } = args[0]; - let abortPromise; - return { - ...iterable, - async *[Symbol.asyncIterator]() { - abortPromise ||= abortSignal.aborted - ? Promise.reject((EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken())) - : new Promise((_, reject) => { - abortSignal.addEventListener('abort', () => { - reject((EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken())); - }); - }); - try { - const iterator = iterable[Symbol.asyncIterator](); - while (true) { - const iteratorResult = await safeRace([iterator.next(), abortPromise]); - if (iteratorResult.done) { - return iteratorResult.value; - } else { - yield iteratorResult.value; - } - } - } catch (e) { - if (e === (EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken())) { - return; - } - cache.delete(deduplicationKey); - throw e; - } - }, - }; + let cachedDataPublisherPromise = cache.get(subscriptionConfigurationHash); + if (!cachedDataPublisherPromise) { + const abortController = new AbortController(); + const dataPublisherPromise = transport({ + ...config, + signal: abortController.signal, + }); + dataPublisherPromise.then(dataPublisher => { + dataPublisher.on( + 'error', + () => { + cache.delete(subscriptionConfigurationHash); + abortController.abort(); }, - }; - cache.set(deduplicationKey, pendingSubscription); - return pendingSubscription; - }; - }, - }); + { signal: abortController.signal }, + ); + }); + cache.set( + subscriptionConfigurationHash, + (cachedDataPublisherPromise = { + abortController, + dataPublisherPromise, + numSubscribers: 0, + }), + ); + } + cachedDataPublisherPromise.numSubscribers++; + signal.addEventListener( + 'abort', + () => { + cachedDataPublisherPromise.numSubscribers--; + if (cachedDataPublisherPromise.numSubscribers === 0) { + queueMicrotask(() => { + if (cachedDataPublisherPromise.numSubscribers === 0) { + cache.delete(subscriptionConfigurationHash); + cachedDataPublisherPromise.abortController.abort(); + } + }); + } + }, + { signal: cachedDataPublisherPromise.abortController.signal }, + ); + return cachedDataPublisherPromise.dataPublisherPromise; + } as TTransport; } diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts index 354947bc948..4da366a71e8 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts @@ -12,6 +12,7 @@ import { RpcSubscriptionsTransportMainnet, RpcSubscriptionsTransportTestnet, } from './rpc-subscriptions-clusters'; +import { getRpcSubscriptionsTransportWithSubscriptionCoalescing } from './rpc-subscriptions-coalescer'; export type DefaultRpcSubscriptionsTransportConfig = Readonly<{ createChannel: RpcSubscriptionsChannelCreatorFromClusterUrl; @@ -24,6 +25,7 @@ export function createDefaultRpcSubscriptionsTransport, + transport => getRpcSubscriptionsTransportWithSubscriptionCoalescing(transport), ); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7ff1d02d90f..f5080da5359 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -878,6 +878,9 @@ importers: '@solana/rpc-types': specifier: workspace:* version: link:../rpc-types + '@solana/subscribable': + specifier: workspace:* + version: link:../subscribable typescript: specifier: '>=5' version: 5.5.2