diff --git a/packages/rpc-subscriptions/README.md b/packages/rpc-subscriptions/README.md index 97e49ae19e4..a769b843694 100644 --- a/packages/rpc-subscriptions/README.md +++ b/packages/rpc-subscriptions/README.md @@ -18,6 +18,16 @@ This package contains types that implement RPC subscriptions as required by the ## Functions +### `getChannelPoolingChannelCreator({ createChannel, maxSubscriptionsPerChannel, minChannels })` + +Given a channel creator, will return a new channel creator with the following behavior. + +1. When called, returns an `RpcSubscriptionsChannel`. Adds that channel to a pool. +2. When called again, creates and returns new `RpcSubscriptionChannels` up to the number specified by `minChannels`. +3. When `minChannels` channels have been created, subsequent calls vend whichever existing channel from the pool has the fewest subscribers, or the next one in rotation in the event of a tie. +4. Once all channels carry the number of subscribers specified by the number `maxSubscriptionsPerChannel`, new channels in excess of `minChannel` will be created, returned, and added to the pool. +5. A channel will be destroyed once all of its subscribers' abort signals fire. + ### `getRpcSubscriptionsChannelWithJSONSerialization(channel)` Given an `RpcSubscriptionsChannel`, will return a new channel that parses data published to the `'message'` channel as JSON, and JSON-stringifies messages sent via the `send(message)` method. diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-channel-pool-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-channel-pool-test.ts new file mode 100644 index 00000000000..f1ec5703801 --- /dev/null +++ b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-channel-pool-test.ts @@ -0,0 +1,277 @@ +import { RpcSubscriptionsChannel, RpcSubscriptionsChannelCreator } from '@solana/rpc-subscriptions-spec'; + +import { getChannelPoolingChannelCreator } from '../rpc-subscriptions-channel-pool'; +import { ChannelPoolEntry, createChannelPool } from '../rpc-subscriptions-channel-pool-internal'; + +jest.mock('../rpc-subscriptions-channel-pool-internal.ts'); + +describe('getChannelPoolingChannelCreator', () => { + let channelPool: { entries: ChannelPoolEntry[]; freeChannelIndex: number }; + let createChannel: jest.MockedFunction>; + beforeEach(() => { + channelPool = { entries: [], freeChannelIndex: -1 }; + jest.mocked(createChannelPool).mockReturnValue(channelPool); + createChannel = jest + .fn() + // We need this to return a new promise on every call. + // eslint-disable-next-line jest/prefer-mock-promise-shorthand + .mockImplementation(() => + Promise.resolve({ + on: jest.fn().mockReturnValue(() => {}), + send: jest.fn().mockResolvedValue(void 0), + }), + ); + }); + it('creates a new channel when there are fewer than `minChannels`', async () => { + expect.assertions(2); + const newChannel = {} as RpcSubscriptionsChannel; + createChannel.mockResolvedValue(newChannel); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + await poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(createChannel).toHaveBeenCalledTimes(1); + expect(channelPool).toMatchObject({ + entries: [{ channel: newChannel, subscriptionCount: 1 }], + freeChannelIndex: 0, + }); + }); + it('increments the subscriber count of an existing channel pool entry when vending it', () => { + const newChannel = {} as RpcSubscriptionsChannel; + createChannel.mockResolvedValue(newChannel); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(channelPool).toMatchObject({ + entries: [{ channel: newChannel, subscriptionCount: 2 }], + freeChannelIndex: 0, + }); + }); + it('creates a new channel pool entry when the existing one already has `maxSubscriptionsPerChannel` consumers', () => { + const newChannelA = {} as RpcSubscriptionsChannel; + const newChannelB = {} as RpcSubscriptionsChannel; + createChannel.mockResolvedValueOnce(newChannelA).mockResolvedValueOnce(newChannelB); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: 1, + minChannels: 1, + }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(channelPool).toMatchObject({ + entries: [ + { channel: newChannelA, subscriptionCount: 1 }, + { channel: newChannelB, subscriptionCount: 1 }, + ], + freeChannelIndex: -1, + }); + }); + it('destroys a channel when the last subscriber aborts', () => { + const newChannelA = {} as RpcSubscriptionsChannel; + const newChannelB = {} as RpcSubscriptionsChannel; + createChannel.mockResolvedValueOnce(newChannelA).mockResolvedValueOnce(newChannelB); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const abortController = new AbortController(); + poolingChannelCreator({ abortSignal: abortController.signal }); + expect(channelPool).toMatchObject({ + entries: [{ channel: newChannelA, subscriptionCount: 1 }], + freeChannelIndex: 0, + }); + abortController.abort(); + expect(channelPool).toMatchObject({ + entries: [], + freeChannelIndex: -1, + }); + }); + it('moves the free channel index to the next channel with the most capacity when destroying the existing one', () => { + const channelPool = { entries: [] as ChannelPoolEntry[], freeChannelIndex: -1 }; + jest.mocked(createChannelPool).mockReturnValue(channelPool); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const abortController = new AbortController(); + poolingChannelCreator({ abortSignal: abortController.signal }); + channelPool.entries = [ + { subscriptionCount: 2 }, + ...channelPool.entries, + { subscriptionCount: 3 }, + ] as ChannelPoolEntry[]; + channelPool.freeChannelIndex = 1; + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }, { subscriptionCount: 3 }], + freeChannelIndex: 1, + }); + abortController.abort(); + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }, { subscriptionCount: 3 }], + freeChannelIndex: 0, + }); + }); + it('preserves the free channel index when destroying a channel even if that channel is now tied for the highest capacity', () => { + const channelPool = { entries: [] as ChannelPoolEntry[], freeChannelIndex: -1 }; + jest.mocked(createChannelPool).mockReturnValue(channelPool); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const abortController = new AbortController(); + poolingChannelCreator({ abortSignal: abortController.signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + channelPool.entries = [...channelPool.entries, { subscriptionCount: 1 }] as ChannelPoolEntry[]; + channelPool.freeChannelIndex = 1; + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }], + freeChannelIndex: 1, + }); + abortController.abort(); + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 1 }, { subscriptionCount: 1 }], + freeChannelIndex: 1, + }); + }); + it('resets the free channel index whenever destroying a channel results in there being fewer than `minChannels`', () => { + const channelPool = { entries: [] as ChannelPoolEntry[], freeChannelIndex: -1 }; + jest.mocked(createChannelPool).mockReturnValue(channelPool); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: 2, + minChannels: 2, + }); + const abortController = new AbortController(); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + poolingChannelCreator({ abortSignal: abortController.signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }], + freeChannelIndex: 1, + }); + abortController.abort(); + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }], + freeChannelIndex: -1, + }); + }); + it('vends an existing channel when called in a separate runloop', async () => { + expect.assertions(1); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const channelA = await poolingChannelCreator({ abortSignal: new AbortController().signal }); + const channelB = await poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(channelA).toBe(channelB); + }); + it('vends an existing channel when called concurrently in the same runloop', async () => { + expect.assertions(1); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const [channelA, channelB] = await Promise.all([ + poolingChannelCreator({ abortSignal: new AbortController().signal }), + poolingChannelCreator({ abortSignal: new AbortController().signal }), + ]); + expect(channelA).toBe(channelB); + }); + it("fires a created channel's abort signal when the outer signal is aborted", async () => { + expect.assertions(1); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const abortController = new AbortController(); + await poolingChannelCreator({ abortSignal: abortController.signal }); + abortController.abort(); + expect(createChannel.mock.lastCall?.[0].abortSignal).toHaveProperty('aborted', true); + }); + it("fires a created channel's abort signal when the outer signal is aborted within the runloop", () => { + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const abortController = new AbortController(); + poolingChannelCreator({ abortSignal: abortController.signal }); + abortController.abort(); + expect(createChannel.mock.lastCall?.[0].abortSignal).toHaveProperty('aborted', true); + }); + it('vends the next existing channel with the fewest consumers', async () => { + expect.assertions(2); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: 2, + minChannels: 1, + }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }], + freeChannelIndex: 1, + }); + const channel = poolingChannelCreator({ abortSignal: new AbortController().signal }); + await expect(channel).resolves.toBe(await channelPool.entries[1].channel); + }); + it('does not create a channel pool entry when the channel fails to construct', async () => { + expect.assertions(3); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + createChannel.mockRejectedValueOnce('o no'); + const channelA = poolingChannelCreator({ abortSignal: new AbortController().signal }); + const channelB = poolingChannelCreator({ abortSignal: new AbortController().signal }); + await expect(channelA).rejects.toBe('o no'); + await expect(channelB).rejects.toBe('o no'); + expect(channelPool).toMatchObject({ entries: [], freeChannelIndex: -1 }); + }); + it("destroys a channel's pool entry when the channel encounters an error message", async () => { + expect.assertions(2); + jest.useFakeTimers(); + const poolingChannelCreator = getChannelPoolingChannelCreator({ + createChannel, + maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY, + minChannels: 1, + }); + const errorListeners: CallableFunction[] = []; + createChannel.mockResolvedValue({ + on(type, listener) { + // eslint-disable-next-line jest/no-conditional-in-test + if (type === 'error') { + errorListeners.push(listener); + } + return () => {}; + }, + send: jest.fn(), + }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + poolingChannelCreator({ abortSignal: new AbortController().signal }); + expect(channelPool).toMatchObject({ + entries: [{ subscriptionCount: 2 }], + freeChannelIndex: 0, + }); + // Allow time for the channel to open and the error listener attach. + await jest.runAllTimersAsync(); + errorListeners.forEach(listener => { + listener('o no'); + }); + expect(channelPool).toMatchObject({ entries: [], freeChannelIndex: -1 }); + }); +}); diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool-internal.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool-internal.ts new file mode 100644 index 00000000000..c79ef325c7b --- /dev/null +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool-internal.ts @@ -0,0 +1,16 @@ +import { RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec'; + +export type ChannelPoolEntry = { + channel: PromiseLike> | RpcSubscriptionsChannel; + readonly dispose: () => void; + subscriptionCount: number; +}; + +type ChannelPool = { readonly entries: ChannelPoolEntry[]; freeChannelIndex: number }; + +export function createChannelPool(): ChannelPool { + return { + entries: [], + freeChannelIndex: -1, + }; +} diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool.ts new file mode 100644 index 00000000000..efb8c0cc9e8 --- /dev/null +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool.ts @@ -0,0 +1,98 @@ +import { RpcSubscriptionsChannelCreator } from '@solana/rpc-subscriptions-spec'; + +import { ChannelPoolEntry, createChannelPool } from './rpc-subscriptions-channel-pool-internal'; + +type Config = { + createChannel: TChannelCreator; + maxSubscriptionsPerChannel: number; + minChannels: number; +}; + +export function getChannelPoolingChannelCreator< + TChannelCreator extends RpcSubscriptionsChannelCreator, +>({ createChannel, maxSubscriptionsPerChannel, minChannels }: Config): TChannelCreator { + const pool = createChannelPool(); + /** + * This function advances the free channel index to the pool entry with the most capacity. It + * sets the index to `-1` if all channels are full. + */ + function recomputeFreeChannelIndex() { + if (pool.entries.length < minChannels) { + // Don't set the free channel index until the pool fills up; we want to keep creating + // channels before we start rotating among them. + pool.freeChannelIndex = -1; + return; + } + let mostFreeChannel: Readonly<{ poolIndex: number; subscriptionCount: number }> | undefined; + for (let ii = 0; ii < pool.entries.length; ii++) { + const nextPoolIndex = (pool.freeChannelIndex + ii + 2) % pool.entries.length; + const nextPoolEntry = + // Start from the item two positions after the current item. This way, the + // search will finish on the item after the current one. This ensures that, if + // any channels tie for having the most capacity, the one that will be chosen is + // the one immediately to the current one's right (wrapping around). + pool.entries[nextPoolIndex]; + if ( + nextPoolEntry.subscriptionCount < maxSubscriptionsPerChannel && + (!mostFreeChannel || mostFreeChannel.subscriptionCount >= nextPoolEntry.subscriptionCount) + ) { + mostFreeChannel = { + poolIndex: nextPoolIndex, + subscriptionCount: nextPoolEntry.subscriptionCount, + }; + } + } + pool.freeChannelIndex = mostFreeChannel?.poolIndex ?? -1; + } + return function getExistingChannelWithMostCapacityOrCreateChannel({ abortSignal }) { + let poolEntry: ChannelPoolEntry; + function destroyPoolEntry() { + const index = pool.entries.findIndex(entry => entry === poolEntry); + pool.entries.splice(index, 1); + poolEntry.dispose(); + recomputeFreeChannelIndex(); + } + if (pool.freeChannelIndex === -1) { + const abortController = new AbortController(); + const newChannelPromise = createChannel({ abortSignal: abortController.signal }); + newChannelPromise + .then(newChannel => { + newChannel.on('error', destroyPoolEntry, { signal: abortController.signal }); + }) + .catch(destroyPoolEntry); + poolEntry = { + channel: newChannelPromise, + dispose() { + abortController.abort(); + }, + subscriptionCount: 0, + }; + pool.entries.push(poolEntry); + } else { + poolEntry = pool.entries[pool.freeChannelIndex]; + } + /** + * A note about subscription counts. + * Because of https://github.com/solana-labs/solana/pull/18943, two subscriptions for + * materially the same notification will be coalesced on the server. This means they will be + * assigned the same subscription id, and will occupy one subscription slot. We can't tell, + * from here, whether a subscription will be treated in this way or not, so we + * unconditionally increment the subscription count every time a subscription request is + * made. This may result in subscription channels being treated as out-of-capacity when in + * fact they are not. + */ + poolEntry.subscriptionCount++; + abortSignal.addEventListener('abort', function destroyConsumer() { + poolEntry.subscriptionCount--; + if (poolEntry.subscriptionCount === 0) { + destroyPoolEntry(); + } else if (pool.freeChannelIndex !== -1) { + // Back the free channel index up one position, and recompute it. + pool.freeChannelIndex--; + recomputeFreeChannelIndex(); + } + }); + recomputeFreeChannelIndex(); + return poolEntry.channel; + } as TChannelCreator; +}