Skip to content

Commit

Permalink
A channel pool
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher committed Oct 4, 2024
1 parent 994441a commit 78d9c24
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 0 deletions.
10 changes: 10 additions & 0 deletions packages/rpc-subscriptions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ This package contains types that implement RPC subscriptions as required by the

## Functions

### `getChannelPoolingChannelCreator(createChannel, { maxSubscriptionsPerChannel, minChannels })`

Given a channel creator, will return a new channel creator with the following behavior.

1. When called, returns an `RpcSubscriptionsChannel`. Adds that channel to a pool.
2. When called again, creates and returns new `RpcSubscriptionChannels` up to the number specified by `minChannels`.
3. When `minChannels` channels have been created, subsequent calls vend whichever existing channel from the pool has the fewest subscribers, or the next one in rotation in the event of a tie.
4. Once all channels carry the number of subscribers specified by the number `maxSubscriptionsPerChannel`, new channels in excess of `minChannel` will be created, returned, and added to the pool.
5. A channel will be destroyed once all of its subscribers' abort signals fire.

### `getRpcSubscriptionsChannelWithJSONSerialization(channel)`

Given an `RpcSubscriptionsChannel`, will return a new channel that parses data published to the `'message'` channel as JSON, and JSON-stringifies messages sent via the `send(message)` method.
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import { RpcSubscriptionsChannel, RpcSubscriptionsChannelCreator } from '@solana/rpc-subscriptions-spec';

import { getChannelPoolingChannelCreator } from '../rpc-subscriptions-channel-pool';
import { ChannelPoolEntry, createChannelPool } from '../rpc-subscriptions-channel-pool-internal';

jest.mock('../rpc-subscriptions-channel-pool-internal.ts');

describe('getChannelPoolingChannelCreator', () => {
let channelPool: { entries: ChannelPoolEntry[]; freeChannelIndex: number };
let createChannel: jest.MockedFunction<RpcSubscriptionsChannelCreator<unknown, unknown>>;
beforeEach(() => {
channelPool = { entries: [], freeChannelIndex: -1 };
jest.mocked(createChannelPool).mockReturnValue(channelPool);
createChannel = jest
.fn()
// We need this to return a new promise on every call.
// eslint-disable-next-line jest/prefer-mock-promise-shorthand
.mockImplementation(() =>
Promise.resolve({
on: jest.fn().mockReturnValue(() => {}),
send: jest.fn().mockResolvedValue(void 0),
}),
);
});
it('creates a new channel when there are fewer than `minChannels`', async () => {
expect.assertions(2);
const newChannel = {} as RpcSubscriptionsChannel<unknown, unknown>;
createChannel.mockResolvedValue(newChannel);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
await poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(createChannel).toHaveBeenCalledTimes(1);
expect(channelPool).toMatchObject({
entries: [{ channel: newChannel, subscriptionCount: 1 }],
freeChannelIndex: 0,
});
});
it('increments the subscriber count of an existing channel pool entry when vending it', () => {
const newChannel = {} as RpcSubscriptionsChannel<unknown, unknown>;
createChannel.mockResolvedValue(newChannel);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
poolingChannelCreator({ abortSignal: new AbortController().signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(channelPool).toMatchObject({
entries: [{ channel: newChannel, subscriptionCount: 2 }],
freeChannelIndex: 0,
});
});
it('creates a new channel pool entry when the existing one already has `maxSubscriptionsPerChannel` consumers', () => {
const newChannelA = {} as RpcSubscriptionsChannel<unknown, unknown>;
const newChannelB = {} as RpcSubscriptionsChannel<unknown, unknown>;
createChannel.mockResolvedValueOnce(newChannelA).mockResolvedValueOnce(newChannelB);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: 1,
minChannels: 1,
});
poolingChannelCreator({ abortSignal: new AbortController().signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(channelPool).toMatchObject({
entries: [
{ channel: newChannelA, subscriptionCount: 1 },
{ channel: newChannelB, subscriptionCount: 1 },
],
freeChannelIndex: -1,
});
});
it('destroys a channel when the last subscriber aborts', () => {
const newChannelA = {} as RpcSubscriptionsChannel<unknown, unknown>;
const newChannelB = {} as RpcSubscriptionsChannel<unknown, unknown>;
createChannel.mockResolvedValueOnce(newChannelA).mockResolvedValueOnce(newChannelB);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const abortController = new AbortController();
poolingChannelCreator({ abortSignal: abortController.signal });
expect(channelPool).toMatchObject({
entries: [{ channel: newChannelA, subscriptionCount: 1 }],
freeChannelIndex: 0,
});
abortController.abort();
expect(channelPool).toMatchObject({
entries: [],
freeChannelIndex: -1,
});
});
it('moves the free channel index to the next channel with the most capacity when destroying the existing one', () => {
const channelPool = { entries: [] as ChannelPoolEntry[], freeChannelIndex: -1 };
jest.mocked(createChannelPool).mockReturnValue(channelPool);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const abortController = new AbortController();
poolingChannelCreator({ abortSignal: abortController.signal });
channelPool.entries = [
{ subscriptionCount: 2 },
...channelPool.entries,
{ subscriptionCount: 3 },
] as ChannelPoolEntry[];
channelPool.freeChannelIndex = 1;
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }, { subscriptionCount: 3 }],
freeChannelIndex: 1,
});
abortController.abort();
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }, { subscriptionCount: 3 }],
freeChannelIndex: 0,
});
});
it('preserves the free channel index when destroying a channel even if that channel is now tied for the highest capacity', () => {
const channelPool = { entries: [] as ChannelPoolEntry[], freeChannelIndex: -1 };
jest.mocked(createChannelPool).mockReturnValue(channelPool);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const abortController = new AbortController();
poolingChannelCreator({ abortSignal: abortController.signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
channelPool.entries = [...channelPool.entries, { subscriptionCount: 1 }] as ChannelPoolEntry[];
channelPool.freeChannelIndex = 1;
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }],
freeChannelIndex: 1,
});
abortController.abort();
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 1 }, { subscriptionCount: 1 }],
freeChannelIndex: 1,
});
});
it('resets the free channel index whenever destroying a channel results in there being fewer than `minChannels`', () => {
const channelPool = { entries: [] as ChannelPoolEntry[], freeChannelIndex: -1 };
jest.mocked(createChannelPool).mockReturnValue(channelPool);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: 2,
minChannels: 2,
});
const abortController = new AbortController();
poolingChannelCreator({ abortSignal: new AbortController().signal });
poolingChannelCreator({ abortSignal: abortController.signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }],
freeChannelIndex: 1,
});
abortController.abort();
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }],
freeChannelIndex: -1,
});
});
it('vends an existing channel when called in a separate runloop', async () => {
expect.assertions(1);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const channelA = await poolingChannelCreator({ abortSignal: new AbortController().signal });
const channelB = await poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(channelA).toBe(channelB);
});
it('vends an existing channel when called concurrently in the same runloop', async () => {
expect.assertions(1);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const [channelA, channelB] = await Promise.all([
poolingChannelCreator({ abortSignal: new AbortController().signal }),
poolingChannelCreator({ abortSignal: new AbortController().signal }),
]);
expect(channelA).toBe(channelB);
});
it("fires a created channel's abort signal when the outer signal is aborted", async () => {
expect.assertions(1);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const abortController = new AbortController();
await poolingChannelCreator({ abortSignal: abortController.signal });
abortController.abort();
expect(createChannel.mock.lastCall?.[0].abortSignal).toHaveProperty('aborted', true);
});
it("fires a created channel's abort signal when the outer signal is aborted within the runloop", () => {
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const abortController = new AbortController();
poolingChannelCreator({ abortSignal: abortController.signal });
abortController.abort();
expect(createChannel.mock.lastCall?.[0].abortSignal).toHaveProperty('aborted', true);
});
it('vends the next existing channel with the fewest consumers', async () => {
expect.assertions(2);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: 2,
minChannels: 1,
});
poolingChannelCreator({ abortSignal: new AbortController().signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }, { subscriptionCount: 1 }],
freeChannelIndex: 1,
});
const channel = poolingChannelCreator({ abortSignal: new AbortController().signal });
await expect(channel).resolves.toBe(await channelPool.entries[1].channel);
});
it('does not create a channel pool entry when the channel fails to construct', async () => {
expect.assertions(3);
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
createChannel.mockRejectedValueOnce('o no');
const channelA = poolingChannelCreator({ abortSignal: new AbortController().signal });
const channelB = poolingChannelCreator({ abortSignal: new AbortController().signal });
await expect(channelA).rejects.toBe('o no');
await expect(channelB).rejects.toBe('o no');
expect(channelPool).toMatchObject({ entries: [], freeChannelIndex: -1 });
});
it("destroys a channel's pool entry when the channel encounters an error message", async () => {
expect.assertions(2);
jest.useFakeTimers();
const poolingChannelCreator = getChannelPoolingChannelCreator(createChannel, {
maxSubscriptionsPerChannel: Number.POSITIVE_INFINITY,
minChannels: 1,
});
const errorListeners: CallableFunction[] = [];
createChannel.mockResolvedValue({
on(type, listener) {
// eslint-disable-next-line jest/no-conditional-in-test
if (type === 'error') {
errorListeners.push(listener);
}
return () => {};
},
send: jest.fn(),
});
poolingChannelCreator({ abortSignal: new AbortController().signal });
poolingChannelCreator({ abortSignal: new AbortController().signal });
expect(channelPool).toMatchObject({
entries: [{ subscriptionCount: 2 }],
freeChannelIndex: 0,
});
// Allow time for the channel to open and the error listener attach.
await jest.runAllTimersAsync();
errorListeners.forEach(listener => {
listener('o no');
});
expect(channelPool).toMatchObject({ entries: [], freeChannelIndex: -1 });
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec';

export type ChannelPoolEntry = {
channel: PromiseLike<RpcSubscriptionsChannel<unknown, unknown>> | RpcSubscriptionsChannel<unknown, unknown>;
readonly dispose: () => void;
subscriptionCount: number;
};

type ChannelPool = { readonly entries: ChannelPoolEntry[]; freeChannelIndex: number };

export function createChannelPool(): ChannelPool {
return {
entries: [],
freeChannelIndex: -1,
};
}
97 changes: 97 additions & 0 deletions packages/rpc-subscriptions/src/rpc-subscriptions-channel-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { RpcSubscriptionsChannelCreator } from '@solana/rpc-subscriptions-spec';

import { ChannelPoolEntry, createChannelPool } from './rpc-subscriptions-channel-pool-internal';

type Config = Readonly<{
maxSubscriptionsPerChannel: number;
minChannels: number;
}>;

export function getChannelPoolingChannelCreator<
TChannelCreator extends RpcSubscriptionsChannelCreator<unknown, unknown>,
>(createChannel: TChannelCreator, { maxSubscriptionsPerChannel, minChannels }: Config): TChannelCreator {
const pool = createChannelPool();
/**
* This function advances the free channel index to the pool entry with the most capacity. It
* sets the index to `-1` if all channels are full.
*/
function recomputeFreeChannelIndex() {
if (pool.entries.length < minChannels) {
// Don't set the free channel index until the pool fills up; we want to keep creating
// channels before we start rotating among them.
pool.freeChannelIndex = -1;
return;
}
let mostFreeChannel: Readonly<{ poolIndex: number; subscriptionCount: number }> | undefined;
for (let ii = 0; ii < pool.entries.length; ii++) {
const nextPoolIndex = (pool.freeChannelIndex + ii + 2) % pool.entries.length;
const nextPoolEntry =
// Start from the item two positions after the current item. This way, the
// search will finish on the item after the current one. This ensures that, if
// any channels tie for having the most capacity, the one that will be chosen is
// the one immediately to the current one's right (wrapping around).
pool.entries[nextPoolIndex];
if (
nextPoolEntry.subscriptionCount < maxSubscriptionsPerChannel &&
(!mostFreeChannel || mostFreeChannel.subscriptionCount >= nextPoolEntry.subscriptionCount)
) {
mostFreeChannel = {
poolIndex: nextPoolIndex,
subscriptionCount: nextPoolEntry.subscriptionCount,
};
}
}
pool.freeChannelIndex = mostFreeChannel?.poolIndex ?? -1;
}
return function getExistingChannelWithMostCapacityOrCreateChannel({ abortSignal }) {
let poolEntry: ChannelPoolEntry;
function destroyPoolEntry() {
const index = pool.entries.findIndex(entry => entry === poolEntry);
pool.entries.splice(index, 1);
poolEntry.dispose();
recomputeFreeChannelIndex();
}
if (pool.freeChannelIndex === -1) {
const abortController = new AbortController();
const newChannelPromise = createChannel({ abortSignal: abortController.signal });
newChannelPromise
.then(newChannel => {
newChannel.on('error', destroyPoolEntry, { signal: abortController.signal });
})
.catch(destroyPoolEntry);
poolEntry = {
channel: newChannelPromise,
dispose() {
abortController.abort();
},
subscriptionCount: 0,
};
pool.entries.push(poolEntry);
} else {
poolEntry = pool.entries[pool.freeChannelIndex];
}
/**
* A note about subscription counts.
* Because of https://github.com/solana-labs/solana/pull/18943, two subscriptions for
* materially the same notification will be coalesced on the server. This means they will be
* assigned the same subscription id, and will occupy one subscription slot. We can't tell,
* from here, whether a subscription will be treated in this way or not, so we
* unconditionally increment the subscription count every time a subscription request is
* made. This may result in subscription channels being treated as out-of-capacity when in
* fact they are not.
*/
poolEntry.subscriptionCount++;
abortSignal.addEventListener('abort', function destroyConsumer() {
poolEntry.subscriptionCount--;
if (poolEntry.subscriptionCount === 0) {
destroyPoolEntry();
} else if (pool.freeChannelIndex !== -1) {
// Back the free channel index up one position, and recompute it.
pool.freeChannelIndex--;
recomputeFreeChannelIndex();
}
});
recomputeFreeChannelIndex();
return poolEntry.channel;
} as TChannelCreator;
}

0 comments on commit 78d9c24

Please sign in to comment.