diff --git a/README.md b/README.md index bcc714bf2fc..6628cf6273b 100644 --- a/README.md +++ b/README.md @@ -624,13 +624,23 @@ try { ### Using Custom RPC Subscriptions Transports -The `createSolanaRpcSubscriptions` function communicates with the RPC server using a default WebSocket transport that should satisfy most use cases. However, you may here as well provide your own transport or decorate existing ones to communicate with RPC servers in any way you see fit. In the example below, we explicitly create a WebSocket transport and use it to create a new RPC Subscriptions client via the `createSolanaRpcSubscriptionsFromTransport` function. +The `createSolanaRpcSubscriptions` function communicates with the RPC server using a default `WebSocket` channel that should satisfy most use cases. However, you may here as well provide your own channel creator or decorate existing ones to communicate with RPC servers in any way you see fit. In the example below, we supply a custom `WebSocket` channel creator and use it to create a new RPC Subscriptions client via the `createSolanaRpcSubscriptionsFromTransport` function. ```ts import { createDefaultRpcSubscriptionsTransport, createSolanaRpcSubscriptionsFromTransport } from '@solana/web3.js'; -// Create a WebSocket transport or any custom transport of your choice. -const transport = createDefaultRpcSubscriptionsTransport({ url: 'ws://127.0.0.1:8900' }); +// Create a transport with a custom channel creator of your choice. +const transport = createDefaultRpcSubscriptionsTransport({ + createChannel({ abortSignal }) { + return createWebSocketChannel({ + maxSubscriptionsPerChannel: 100, + minChannels: 25, + sendBufferHighWatermark: 32_768, + signal: abortSignal, + url: 'ws://127.0.0.1:8900', + }); + }, +}); // Create an RPC client using that transport. const rpcSubscriptions = createSolanaRpcSubscriptionsFromTransport(transport); @@ -661,16 +671,22 @@ If your app needs access to [unstable RPC Subscriptions](https://docs.solana.com ```ts import { + createDefaultRpcSubscriptionsChannelCreator, + createDefaultRpcSubscriptionsTransport, createSolanaRpcSubscriptions_UNSTABLE, createSolanaRpcSubscriptionsFromTransport_UNSTABLE, } from '@solana/web3.js'; -// Using the default WebSocket transport. +// Using the default WebSocket channel. const rpcSubscriptions = createSolanaRpcSubscriptions_UNSTABLE('ws://127.0.0.1:8900'); // ^? RpcSubscriptions // Using a custom transport. -const transport = createDefaultRpcSubscriptionsTransport({ url: 'ws://127.0.0.1:8900' }); +const transport = createDefaultRpcSubscriptionsTransport({ + createChannel: createDefaultRpcSubscriptionsChannelCreator({ + url: 'ws://127.0.0.1:8900', + }), +}); const rpcSubscriptions = createSolanaRpcSubscriptionsFromTransport_UNSTABLE(transport); // ^? RpcSubscriptions ``` @@ -696,6 +712,7 @@ Alternatively, you may explicitly create the RPC Subscriptions API using the `cr ```ts import { + createDefaultRpcSubscriptionsChannelCreator, createDefaultRpcSubscriptionsTransport, createSubscriptionRpc, createSolanaRpcSubscriptionsApi, @@ -705,7 +722,11 @@ import { } from '@solana/web3.js'; const api = createSolanaRpcSubscriptionsApi(DEFAULT_RPC_CONFIG); -const transport = createDefaultRpcSubscriptionsTransport({ url: 'ws://127.0.0.1:8900' }); +const transport = createDefaultRpcSubscriptionsTransport({ + createChannel: createDefaultRpcSubscriptionsChannelCreator({ + url: 'ws://127.0.0.1:8900', + }), +}); const rpcSubscriptions = createSubscriptionRpc({ api, transport }); ``` diff --git a/packages/errors/src/codes.ts b/packages/errors/src/codes.ts index e3fb2cf5616..7935c770e66 100644 --- a/packages/errors/src/codes.ts +++ b/packages/errors/src/codes.ts @@ -284,7 +284,7 @@ export const SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD = 8100003 as con // RPC-Subscriptions-related errors. // Reserve error codes in the range [8190000-8190999]. -export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST = 8190000 as const; +export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN = 8190000 as const; export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID = 8190001 as const; export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED = 8190002 as const; export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED = 8190003 as const; @@ -459,7 +459,7 @@ export type SolanaErrorCode = | typeof SOLANA_ERROR__RPC__INTEGER_OVERFLOW | typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR | typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN - | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT diff --git a/packages/errors/src/context.ts b/packages/errors/src/context.ts index d7a3c5b48a4..7f2cb6cc600 100644 --- a/packages/errors/src/context.ts +++ b/packages/errors/src/context.ts @@ -123,7 +123,7 @@ import { SOLANA_ERROR__RPC__INTEGER_OVERFLOW, SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS, SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER, @@ -487,7 +487,7 @@ export type SolanaErrorContext = DefaultUnspecifiedErrorContextToUndefined< [SOLANA_ERROR__NONCE_ACCOUNT_NOT_FOUND]: { nonceAccountAddress: string; }; - [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST]: { + [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN]: { notificationName: string; }; [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT]: { diff --git a/packages/errors/src/messages.ts b/packages/errors/src/messages.ts index 172dcc59a0d..2320b77b85d 100644 --- a/packages/errors/src/messages.ts +++ b/packages/errors/src/messages.ts @@ -140,7 +140,7 @@ import { SOLANA_ERROR__RPC__INTEGER_OVERFLOW, SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, @@ -447,10 +447,9 @@ export const SolanaErrorMessages: Readonly<{ [SOLANA_ERROR__MALFORMED_BIGINT_STRING]: '`$value` cannot be parsed as a `BigInt`', [SOLANA_ERROR__MALFORMED_NUMBER_STRING]: '`$value` cannot be parsed as a `Number`', [SOLANA_ERROR__NONCE_ACCOUNT_NOT_FOUND]: 'No nonce account could be found at address `$nonceAccountAddress`', - [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST]: - "Either the notification name must end in 'Notifications' or the API must supply a " + - "subscription creator function for the notification '$notificationName' to map between " + - 'the notification name and the subscribe/unsubscribe method names.', + [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN]: + "The notification name must end in 'Notifications' and the API must supply a " + + "subscription plan creator function for the notification '$notificationName'.", [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED]: 'WebSocket was closed before payload could be added to the send buffer', [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED]: 'WebSocket connection closed', diff --git a/packages/rpc-subscriptions-api/README.md b/packages/rpc-subscriptions-api/README.md index 563aa50f50f..6d30e35d93e 100644 --- a/packages/rpc-subscriptions-api/README.md +++ b/packages/rpc-subscriptions-api/README.md @@ -14,4 +14,55 @@ # @solana/rpc-subscriptions-api -TODO +This package contains types that describe the [methods](https://solana.com/docs/rpc/websocket) of the Solana JSON RPC Subscriptions API, and utilities for creating a `RpcSubscriptionsApi` implementation with sensible defaults. It can be used standalone, but it is also exported as part of the Solana JavaScript SDK [`@solana/web3.js@rc`](https://github.com/solana-labs/solana-web3.js/tree/master/packages/library). + +Each RPC subscriptions method is described in terms of a TypeScript type of the following form: + +```ts +type ExampleApi = { + thingNotifications(address: Address): Thing; +}; +``` + +A `RpcSubscriptionsApi` that implements `ExampleApi` will ultimately expose its defined methods on any `RpcSubscriptions` that uses it. + +```ts +const rpcSubscriptions: RpcSubscriptions = createExampleRpcSubscriptions(/* ... */); +const thingNotifications = await rpc + .thingNotifications(address('95DpK3y3GF7U8s1k4EvZ7xqyeCkhsHeZaE97iZpHUGMN')) + .subscribe({ abortSignal: AbortSignal.timeout(5_000) }); +try { + for await (const thing of thingNotifications) { + console.log('Got a thing', thing); + } +} catch (e) { + console.error('Our subscription to `Thing` notifications has failed', e); +} finally { + console.log('We are done listening for `Thing` notifications'); +} +``` + +## Types + +### `SolanaRpcSubscriptionsApi{Devnet|Testnet|Mainnet}` + +These types represent the RPC subscription methods available on a specific Solana cluster. + +## Functions + +### `createSolanaRpcSubscriptionsApi(config)` + +Creates a `RpcSubscriptionsApi` implementation of the Solana JSON RPC Subscriptions API with some default behaviours. + +The default behaviours include: + +- A transform that converts `bigint` inputs to `number` for compatiblity with version 1.0 of the Solana JSON RPC. +- A transform that calls the config's `onIntegerOverflow` handler whenever a `bigint` input would overflow a JavaScript IEEE 754 number. See [this](https://github.com/solana-labs/solana-web3.js/issues/1116) GitHub issue for more information. +- A transform that applies a default commitment wherever not specified + +#### Arguments + +A config object with the following properties: + +- `defaultCommitment`: An optional default `Commitment` value. Given an RPC method that takes `commitment` as a parameter, this value will be used when the caller does not supply one. +- `onIntegerOverflow(request, keyPath, value): void`: An optional function that will be called whenever a `bigint` input exceeds that which can be expressed using JavaScript numbers. This is used in the default `SolanaRpcSubscriptionsApi` to throw an exception rather than to allow truncated values to propagate through a program. diff --git a/packages/rpc-subscriptions-api/package.json b/packages/rpc-subscriptions-api/package.json index 695edc2ea70..9e998187831 100644 --- a/packages/rpc-subscriptions-api/package.json +++ b/packages/rpc-subscriptions-api/package.json @@ -72,6 +72,7 @@ ], "dependencies": { "@solana/addresses": "workspace:*", + "@solana/fast-stable-stringify": "workspace:*", "@solana/keys": "workspace:*", "@solana/rpc-types": "workspace:*", "@solana/rpc-subscriptions-spec": "workspace:*", @@ -80,7 +81,7 @@ "@solana/transactions": "workspace:*" }, "devDependencies": { - "@solana/rpc-subscriptions-transport-websocket": "workspace:*" + "@solana/rpc-subscriptions-channel-websocket": "workspace:*" }, "peerDependencies": { "typescript": ">=5" diff --git a/packages/rpc-subscriptions-api/src/__tests__/__setup__.ts b/packages/rpc-subscriptions-api/src/__tests__/__setup__.ts index f1319ebd647..7970cc373f6 100644 --- a/packages/rpc-subscriptions-api/src/__tests__/__setup__.ts +++ b/packages/rpc-subscriptions-api/src/__tests__/__setup__.ts @@ -1,5 +1,5 @@ -import { createSubscriptionRpc, RpcSubscriptions } from '@solana/rpc-subscriptions-spec'; -import { createWebSocketTransport } from '@solana/rpc-subscriptions-transport-websocket'; +import { createWebSocketChannel } from '@solana/rpc-subscriptions-channel-websocket'; +import { createSubscriptionRpc, RpcSubscriptions, RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec'; import { createSolanaRpcSubscriptionsApi_UNSTABLE, @@ -12,9 +12,33 @@ export function createLocalhostSolanaRpcSubscriptions(): RpcSubscriptions< > { return createSubscriptionRpc({ api: createSolanaRpcSubscriptionsApi_UNSTABLE(), - transport: createWebSocketTransport({ - sendBufferHighWatermark: Number.POSITIVE_INFINITY, - url: 'ws://127.0.0.1:8900', - }), + async transport({ executeSubscriptionPlan, signal }) { + const webSocketChannel = await createWebSocketChannel({ + sendBufferHighWatermark: Number.POSITIVE_INFINITY, + signal, + url: 'ws://127.0.0.1:8900', + }); + const channel = { + ...webSocketChannel, + on(type, listener, options) { + if (type !== 'message') { + return webSocketChannel.on(type, listener, options); + } + return webSocketChannel.on( + 'message', + function deserializingListener(message: string) { + const deserializedMessage = JSON.parse(message); + listener(deserializedMessage); + }, + options, + ); + }, + send(message) { + const serializedMessage = JSON.stringify(message); + return webSocketChannel.send(serializedMessage); + }, + } as RpcSubscriptionsChannel; + return await executeSubscriptionPlan({ channel, signal }); + }, }); } diff --git a/packages/rpc-subscriptions-api/src/__tests__/index-test.ts b/packages/rpc-subscriptions-api/src/__tests__/index-test.ts index b83f2da5931..654dad01d86 100644 --- a/packages/rpc-subscriptions-api/src/__tests__/index-test.ts +++ b/packages/rpc-subscriptions-api/src/__tests__/index-test.ts @@ -1,20 +1,39 @@ -import type { RpcSubscriptionsApi } from '@solana/rpc-subscriptions-spec'; +import { executeRpcPubSubSubscriptionPlan, type RpcSubscriptionsApi } from '@solana/rpc-subscriptions-spec'; -import { createSolanaRpcSubscriptionsApi } from '..'; +import { createSolanaRpcSubscriptionsApi } from '../index'; type TestRpcSubscriptionNotifications = { thingNotifications(...args: unknown[]): unknown; }; -describe('RpcSubscriptionsApi', () => { +jest.mock('@solana/rpc-subscriptions-spec', () => ({ + ...jest.requireActual('@solana/rpc-subscriptions-spec'), + executeRpcPubSubSubscriptionPlan: jest.fn().mockReturnValue( + new Promise(() => { + /* never resolves */ + }), + ), +})); + +describe('createSolanaRpcSubscriptionsApi', () => { let api: RpcSubscriptionsApi; beforeEach(() => { api = createSolanaRpcSubscriptionsApi(); }); - it('synthesizes subscribe/unsubscribe method names from the name of the notification', () => { - expect(api.thingNotifications()).toMatchObject({ - subscribeMethodName: 'thingSubscribe', - unsubscribeMethodName: 'thingUnsubscribe', + it('creates a subscription plan that synthesizes the correct subscribe/unsubscribe method names from the name of the notification', () => { + const { executeSubscriptionPlan } = api.thingNotifications(); + executeSubscriptionPlan({ + channel: { + on: jest.fn(), + send: jest.fn(), + }, + signal: new AbortController().signal, }); + expect(executeRpcPubSubSubscriptionPlan).toHaveBeenCalledWith( + expect.objectContaining({ + subscribeMethodName: 'thingSubscribe', + unsubscribeMethodName: 'thingUnsubscribe', + }), + ); }); }); diff --git a/packages/rpc-subscriptions-api/src/index.ts b/packages/rpc-subscriptions-api/src/index.ts index 344ef0a0590..f7b415dfd3e 100644 --- a/packages/rpc-subscriptions-api/src/index.ts +++ b/packages/rpc-subscriptions-api/src/index.ts @@ -1,5 +1,7 @@ +import fastStableStringify from '@solana/fast-stable-stringify'; import { createRpcSubscriptionsApi, + executeRpcPubSubSubscriptionPlan, RpcSubscriptionsApi, RpcSubscriptionsApiMethods, } from '@solana/rpc-subscriptions-spec'; @@ -49,19 +51,27 @@ type Config = RequestTransformerConfig; function createSolanaRpcSubscriptionsApi_INTERNAL( config?: Config, ): RpcSubscriptionsApi { + const responseTransformer = getDefaultResponseTransformerForSolanaRpcSubscriptions({ + allowedNumericKeyPaths: getAllowedNumericKeypaths(), + }); + // TODO(loris): Replace with request transformer. + const parametersTransformer = (notificationName: string, params?: T) => { + return getDefaultRequestTransformerForSolanaRpc(config)({ methodName: notificationName, params }) + .params as unknown[]; + }; return createRpcSubscriptionsApi({ - // TODO(loris): Replace with request transformer. - parametersTransformer: (params: T, notificationName: string) => { - return getDefaultRequestTransformerForSolanaRpc(config)({ methodName: notificationName, params }) - .params as unknown[]; + getSubscriptionConfigurationHash({ notificationName, params }) { + return fastStableStringify([notificationName, params]); + }, + planExecutor({ notificationName, params, ...rest }) { + return executeRpcPubSubSubscriptionPlan({ + ...rest, + responseTransformer, + subscribeMethodName: notificationName.replace(/Notifications$/, 'Subscribe'), + subscribeParams: parametersTransformer(notificationName, params), + unsubscribeMethodName: notificationName.replace(/Notifications$/, 'Unsubscribe'), + }); }, - responseTransformer: getDefaultResponseTransformerForSolanaRpcSubscriptions({ - allowedNumericKeyPaths: getAllowedNumericKeypaths(), - }), - subscribeNotificationNameTransformer: (notificationName: string) => - notificationName.replace(/Notifications$/, 'Subscribe'), - unsubscribeNotificationNameTransformer: (notificationName: string) => - notificationName.replace(/Notifications$/, 'Unsubscribe'), }); } diff --git a/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscription-test.ts b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscription-test.ts deleted file mode 100644 index 35f115855b4..00000000000 --- a/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscription-test.ts +++ /dev/null @@ -1,369 +0,0 @@ -import { - SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID, - SolanaError, -} from '@solana/errors'; -import { createRpcMessage } from '@solana/rpc-spec-types'; - -import { createSubscriptionRpc, RpcSubscriptions } from '../rpc-subscriptions'; -import { RpcSubscriptionsApi } from '../rpc-subscriptions-api'; -import { RpcSubscriptionsRequest } from '../rpc-subscriptions-request'; -import { RpcSubscriptionsTransport } from '../rpc-subscriptions-transport'; - -// Partially mock the rpc-spec-types package. -jest.mock('@solana/rpc-spec-types', () => ({ - ...jest.requireActual('@solana/rpc-spec-types'), - createRpcMessage: jest.fn(), -})); - -interface TestRpcSubscriptionNotifications { - nonConformingNotif(...args: unknown[]): unknown; - thingNotifications(...args: unknown[]): unknown; -} - -describe('JSON-RPC 2.0 Subscriptions', () => { - let createWebSocketConnection: jest.MockedFn; - let iterable: jest.Mock>; - let rpc: RpcSubscriptions; - let send: jest.Mock<(payload: unknown) => Promise>; - beforeEach(() => { - jest.mocked(createRpcMessage).mockImplementation((method: string, params: TParams) => ({ - id: 0, - jsonrpc: '2.0', - method, - params, - })); - iterable = jest.fn().mockImplementation(async function* () { - yield await new Promise(() => { - /* never resolve */ - }); - }); - send = jest.fn().mockResolvedValue(undefined); - createWebSocketConnection = jest.fn().mockResolvedValue({ - [Symbol.asyncIterator]: iterable, - send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED: send, - }); - rpc = createSubscriptionRpc({ - api: { - // Note the lack of method implementations in the base case. - } as RpcSubscriptionsApi, - transport: createWebSocketConnection, - }); - }); - it('sends a subscription request to the transport', () => { - rpc.thingNotifications(123).subscribe({ abortSignal: new AbortController().signal }); - expect(createWebSocketConnection).toHaveBeenCalledWith( - expect.objectContaining({ - payload: { - ...createRpcMessage('thingSubscribe', [123]), - id: expect.any(Number), - }, - }), - ); - }); - it('returns from the iterator when the connection iterator returns', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: 42 /* subscription id */ }); - return; - }); - const thingNotifications = await rpc - .thingNotifications(123) - .subscribe({ abortSignal: new AbortController().signal }); - const iterator = thingNotifications[Symbol.asyncIterator](); - const thingNotificationPromise = iterator.next(); - await expect(thingNotificationPromise).resolves.toMatchObject({ - done: true, - value: undefined, - }); - }); - it('throws from the iterator when the connection iterator throws', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: 42 /* subscription id */ }); - throw new Error('o no'); - }); - const thingNotifications = await rpc - .thingNotifications(123) - .subscribe({ abortSignal: new AbortController().signal }); - const iterator = thingNotifications[Symbol.asyncIterator](); - const thingNotificationPromise = iterator.next(); - await expect(thingNotificationPromise).rejects.toThrow('o no'); - }); - it('aborts the connection when aborting the subscription before the subscription has been established', async () => { - expect.assertions(2); - jest.useFakeTimers(); - iterable.mockImplementation(async function* () { - yield await new Promise(() => { - /* never resolve */ - }); - }); - const abortController = new AbortController(); - rpc.thingNotifications(123).subscribe({ abortSignal: abortController.signal }); - const [{ signal: connectionAbortSignal }] = createWebSocketConnection.mock.lastCall!; - expect(connectionAbortSignal).toHaveProperty('aborted', false); - abortController.abort(); - await jest.runAllTimersAsync(); - expect(connectionAbortSignal).toHaveProperty('aborted', true); - }); - it('aborts the connection when aborting given an established subscription', async () => { - expect.assertions(2); - jest.useFakeTimers(); - iterable.mockImplementation(async function* () { - yield { id: 0, result: 42 /* subscription id */ }; - yield await new Promise(() => { - /* never resolve */ - }); - }); - const abortController = new AbortController(); - await rpc.thingNotifications(123).subscribe({ abortSignal: abortController.signal }); - const [{ signal: connectionAbortSignal }] = createWebSocketConnection.mock.lastCall!; - expect(connectionAbortSignal).toHaveProperty('aborted', false); - abortController.abort(); - await jest.runAllTimersAsync(); - expect(connectionAbortSignal).toHaveProperty('aborted', true); - }); - it('sends an unsubscribe request to the transport when aborted given an established subscription', async () => { - expect.assertions(1); - const abortController = new AbortController(); - iterable.mockImplementation(async function* () { - yield { id: 0, result: 42 /* subscription id */ }; - yield await new Promise(() => { - /* never resolve */ - }); - }); - await rpc.thingNotifications(123).subscribe({ abortSignal: abortController.signal }); - abortController.abort(); - expect(send).toHaveBeenCalledWith( - expect.objectContaining({ - method: 'thingUnsubscribe', - params: [42], - }), - ); - }); - it('does not send an unsubscribe request to the transport when aborted if the subscription has not yet been established', () => { - jest.useFakeTimers(); - const abortController = new AbortController(); - rpc.thingNotifications(123).subscribe({ abortSignal: abortController.signal }); - abortController.abort(); - expect(send).not.toHaveBeenCalledWith( - expect.objectContaining({ - method: 'thingUnsubscribe', - }), - ); - }); - it('does not send an unsubscribe request to the transport when aborted after the connection iterator returns given an established subscription', async () => { - expect.assertions(1); - jest.useFakeTimers(); - const abortController = new AbortController(); - let returnFromConnection: () => void; - iterable.mockImplementation(async function* () { - yield { id: 0, result: 42 /* subscription id */ }; - try { - yield await new Promise((_, reject) => { - returnFromConnection = reject; - }); - } catch { - return; - } - }); - await rpc.thingNotifications(123).subscribe({ abortSignal: abortController.signal }); - // FIXME: https://github.com/microsoft/TypeScript/issues/11498 - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - returnFromConnection(); - await jest.runAllTimersAsync(); - abortController.abort(); - expect(send).not.toHaveBeenCalledWith( - expect.objectContaining({ - method: 'thingUnsubscribe', - }), - ); - }); - it('does not send an unsubscribe request to the transport when aborted after the connection iterator fatals given an established subscription', async () => { - expect.assertions(1); - jest.useFakeTimers(); - const abortController = new AbortController(); - let killConnection: () => void; - iterable.mockImplementation(async function* () { - yield { id: 0, result: 42 /* subscription id */ }; - yield await new Promise((_, reject) => { - killConnection = reject; - }); - }); - await rpc.thingNotifications(123).subscribe({ abortSignal: abortController.signal }); - // FIXME: https://github.com/microsoft/TypeScript/issues/11498 - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - killConnection(new Error('o no')); - await jest.runAllTimersAsync(); - abortController.abort(); - expect(send).not.toHaveBeenCalledWith( - expect.objectContaining({ - method: 'thingUnsubscribe', - }), - ); - }); - it('delivers only messages destined for a particular subscription', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: 42 /* subscription id */ }); - yield Promise.resolve({ params: { result: 123, subscription: 41 } }); - yield Promise.resolve({ params: { result: 456, subscription: 42 } }); - }); - const thingNotifications = await rpc - .thingNotifications() - .subscribe({ abortSignal: new AbortController().signal }); - const iterator = thingNotifications[Symbol.asyncIterator](); - await expect(iterator.next()).resolves.toHaveProperty('value', { result: 456, subscription: 42 }); - }); - it.each([null, undefined])( - 'fatals when the subscription id returned from the server is `%s`', - async subscriptionId => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: subscriptionId /* subscription id */ }); - }); - const thingNotificationsPromise = rpc - .thingNotifications() - .subscribe({ abortSignal: new AbortController().signal }); - await expect(thingNotificationsPromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID), - ); - }, - ); - it("fatals when called with a method that does not end in 'Notifications'", () => { - expect(() => { - rpc.nonConformingNotif().subscribe({ abortSignal: new AbortController().signal }); - }).toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, { - notificationName: 'nonConformingNotif', - }), - ); - }); - it('fatals when called with an already aborted signal', async () => { - expect.assertions(1); - const abortController = new AbortController(); - abortController.abort(); - const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: abortController.signal }); - await expect(subscribePromise).rejects.toThrow(/operation was aborted/); - }); - it('fatals when the server fails to respond with a subscription id', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: undefined /* subscription id */ }); - }); - const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal }); - await expect(subscribePromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID), - ); - }); - it('fatals when the server responds with an error', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ - error: { code: SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID }, - id: 0, - }); - }); - const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal }); - await expect(subscribePromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID), - ); - }); - it('throws errors when the connection fails to construct', async () => { - expect.assertions(1); - createWebSocketConnection.mockRejectedValue(new Error('o no')); - const subscribePromise = rpc.thingNotifications().subscribe({ abortSignal: new AbortController().signal }); - await expect(subscribePromise).rejects.toThrow(/o no/); - }); - describe('when calling a method having a concrete implementation', () => { - let rpc: RpcSubscriptions; - beforeEach(() => { - rpc = createSubscriptionRpc({ - api: { - nonConformingNotif(...params: unknown[]): RpcSubscriptionsRequest { - return { - params: [...params, 'augmented', 'params'], - subscribeMethodName: 'nonConformingSubscribeAugmented', - unsubscribeMethodName: 'nonConformingUnsubscribeAugmented', - }; - }, - } as RpcSubscriptionsApi, - transport: createWebSocketConnection, - }); - }); - it('converts the returned subscription to a JSON-RPC 2.0 message and sends it to the transport', () => { - rpc.nonConformingNotif(123).subscribe({ abortSignal: new AbortController().signal }); - expect(createWebSocketConnection).toHaveBeenCalledWith( - expect.objectContaining({ - payload: { - ...createRpcMessage('nonConformingSubscribeAugmented', [123, 'augmented', 'params']), - id: expect.any(Number), - }, - }), - ); - }); - it('uses the returned unsubscribe method name when unsubscribing', async () => { - expect.assertions(1); - jest.useFakeTimers(); - const abortController = new AbortController(); - iterable.mockImplementation(async function* () { - yield { id: 0, result: 42 /* subscription id */ }; - yield new Promise(() => { - /* never resolve */ - }); - }); - await rpc.nonConformingNotif(123).subscribe({ abortSignal: abortController.signal }); - await jest.runAllTimersAsync(); - abortController.abort(); - expect(send).toHaveBeenCalledWith(createRpcMessage('nonConformingUnsubscribeAugmented', [42])); - }); - }); - describe('when calling a method whose concrete implementation returns a response processor', () => { - let responseTransformer: jest.Mock; - let rpc: RpcSubscriptions; - beforeEach(() => { - responseTransformer = jest.fn(response => `${response.result} processed response`); - rpc = createSubscriptionRpc({ - api: { - thingNotifications(...params: unknown[]): RpcSubscriptionsRequest { - return { - params, - responseTransformer, - subscribeMethodName: 'thingSubscribe', - unsubscribeMethodName: 'thingUnsubscribe', - }; - }, - } as RpcSubscriptionsApi, - transport: createWebSocketConnection, - }); - }); - it('calls the response processor with the response from the JSON-RPC 2.0 endpoint', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: 42 /* subscription id */ }); - yield Promise.resolve({ params: { result: 123, subscription: 42 } }); - }); - const thingNotifications = await rpc - .thingNotifications() - .subscribe({ abortSignal: new AbortController().signal }); - await thingNotifications[Symbol.asyncIterator]().next(); - expect(responseTransformer).toHaveBeenCalledWith({ result: 123, subscription: 42 }, 'thingSubscribe'); - }); - it('returns the processed response', async () => { - expect.assertions(1); - iterable.mockImplementation(async function* () { - yield Promise.resolve({ id: 0, result: 42 /* subscription id */ }); - yield Promise.resolve({ params: { result: 123, subscription: 42 } }); - }); - const thingNotifications = await rpc - .thingNotifications() - .subscribe({ abortSignal: new AbortController().signal }); - await expect(thingNotifications[Symbol.asyncIterator]().next()).resolves.toHaveProperty( - 'value', - '123 processed response', - ); - }); - }); -}); diff --git a/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts new file mode 100644 index 00000000000..46d2db0ada2 --- /dev/null +++ b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts @@ -0,0 +1,71 @@ +import { createRpcSubscriptionsApi } from '../rpc-subscriptions-api'; +import { RpcSubscriptionsChannel } from '../rpc-subscriptions-channel'; + +describe('createRpcSubscriptionsApi', () => { + let mockChannel: RpcSubscriptionsChannel; + beforeEach(() => { + mockChannel = { on: jest.fn(), send: jest.fn() }; + }); + describe('executeSubscriptionPlan', () => { + it('calls the plan executor with the expected params', () => { + const mockPlanExecutor = jest.fn(); + const api = createRpcSubscriptionsApi({ planExecutor: mockPlanExecutor }); + const expectedParams = [1, 'hi', 3]; + const expectedSignal = new AbortController().signal; + api.foo(...expectedParams).executeSubscriptionPlan({ + channel: mockChannel, + signal: expectedSignal, + }); + expect(mockPlanExecutor).toHaveBeenCalledWith({ + channel: mockChannel, + notificationName: 'foo', + params: expectedParams, + signal: expectedSignal, + }); + }); + }); + describe('subscriptionConfigurationHash', () => { + it('does not call the hash creator before it is accessed', () => { + const mockGetSubscriptionConfigurationHash = jest.fn(); + const api = createRpcSubscriptionsApi({ + getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, + planExecutor: jest.fn(), + }); + api.foo('hi'); + expect(mockGetSubscriptionConfigurationHash).not.toHaveBeenCalled(); + }); + it('calls the hash creator when it is accessed', () => { + const mockGetSubscriptionConfigurationHash = jest.fn(); + const api = createRpcSubscriptionsApi({ + getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, + planExecutor: jest.fn(), + }); + const result = api.foo('hi'); + result.subscriptionConfigurationHash; + expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledWith({ + notificationName: 'foo', + params: ['hi'], + }); + }); + it('memoizes the result of the hash creator', () => { + const mockGetSubscriptionConfigurationHash = jest.fn(); + const api = createRpcSubscriptionsApi({ + getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, + planExecutor: jest.fn(), + }); + const result = api.foo('hi'); + result.subscriptionConfigurationHash; + result.subscriptionConfigurationHash; + expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledTimes(1); + }); + it('returns the result of the hash creator', () => { + const mockGetSubscriptionConfigurationHash = jest.fn().mockReturnValue('MOCK_HASH'); + const api = createRpcSubscriptionsApi({ + getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, + planExecutor: jest.fn(), + }); + const result = api.foo('hi'); + expect(result.subscriptionConfigurationHash).toBe('MOCK_HASH'); + }); + }); +}); diff --git a/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-test.ts b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-test.ts new file mode 100644 index 00000000000..b162d55490e --- /dev/null +++ b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-test.ts @@ -0,0 +1,24 @@ +import { SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SolanaError } from '@solana/errors'; + +import { createSubscriptionRpc } from '../rpc-subscriptions'; + +interface TestRpcSubscriptionNotifications { + thingNotifications(...args: unknown[]): unknown; +} + +describe('createSubscriptionRpc', () => { + it('throws when the API produces no subscription plan', () => { + const rpcSubscriptions = createSubscriptionRpc({ + // @ts-expect-error Does not implement API on purpose + api: {}, + transport: jest.fn(), + }); + expect(() => { + rpcSubscriptions.thingNotifications(); + }).toThrow( + new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, { + notificationName: 'thingNotifications', + }), + ); + }); +}); diff --git a/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-api-typetest.ts b/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-api-typetest.ts index e9be18701f8..57f7818636b 100644 --- a/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-api-typetest.ts +++ b/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-api-typetest.ts @@ -18,4 +18,6 @@ type NftCollectionDetailsApi = { type QuickNodeRpcMethods = NftCollectionDetailsApi; -createRpcSubscriptionsApi() satisfies RpcSubscriptionsApi; +createRpcSubscriptionsApi( + ...(null as unknown as Parameters), +) satisfies RpcSubscriptionsApi; diff --git a/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-typetest.ts b/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-typetest.ts index 2ee298ba4b9..e81dcba1eec 100644 --- a/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-typetest.ts +++ b/packages/rpc-subscriptions-spec/src/__typetests__/rpc-subscriptions-typetest.ts @@ -7,7 +7,9 @@ type MySubscriptionApiMethods = { foo(): number; }; -const api = createRpcSubscriptionsApi(); +const api = createRpcSubscriptionsApi( + ...(null as unknown as Parameters), +); const transport = null as unknown as RpcSubscriptionsTransport; createSubscriptionRpc({ api, transport }) satisfies RpcSubscriptions; diff --git a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts index 11044c5c004..c0e8117dab7 100644 --- a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts +++ b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts @@ -1,12 +1,43 @@ import { Callable } from '@solana/rpc-spec-types'; +import { DataPublisher } from '@solana/subscribable'; -import { RpcSubscriptionsRequest } from './rpc-subscriptions-request'; +import { RpcSubscriptionsChannel } from './rpc-subscriptions-channel'; +import { RpcSubscriptionsTransportDataEvents } from './rpc-subscriptions-transport'; -export type RpcSubscriptionsApiConfig = Readonly<{ - parametersTransformer?: (params: T, notificationName: string) => unknown[]; - responseTransformer?: (response: unknown, notificationName: string) => T; - subscribeNotificationNameTransformer?: (notificationName: string) => string; - unsubscribeNotificationNameTransformer?: (notificationName: string) => string; +export type RpcSubscriptionsApiConfig = Readonly<{ + getSubscriptionConfigurationHash?: ( + details: Readonly<{ + notificationName: string; + params: unknown; + }>, + ) => string | undefined; + planExecutor: RpcSubscriptionsPlanExecutor>; +}>; + +type RpcSubscriptionsPlanExecutor = ( + config: Readonly<{ + channel: RpcSubscriptionsChannel; + notificationName: string; + params?: unknown[]; + signal: AbortSignal; + }>, +) => Promise>>; + +export type RpcSubscriptionsPlan = Readonly<{ + /** + * This method may be called with a newly-opened channel or a pre-established channel. + */ + executeSubscriptionPlan: ( + config: Readonly<{ + channel: RpcSubscriptionsChannel; + signal: AbortSignal; + }>, + ) => Promise>>; + /** + * This hash uniquely identifies the configuration of a subscription. It is typically used by + * consumers of this API to deduplicate multiple subscriptions for the same notification. + */ + subscriptionConfigurationHash: string | undefined; }>; export type RpcSubscriptionsApi = { @@ -16,7 +47,7 @@ export type RpcSubscriptionsApi = { }; type RpcSubscriptionsReturnTypeMapper = TRpcMethod extends Callable - ? (...rawParams: unknown[]) => RpcSubscriptionsRequest> + ? (...rawParams: unknown[]) => RpcSubscriptionsPlan> : never; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -25,8 +56,10 @@ export interface RpcSubscriptionsApiMethods { [methodName: string]: RpcSubscriptionsApiMethod; } +const UNINITIALIZED = Symbol(); + export function createRpcSubscriptionsApi( - config?: RpcSubscriptionsApiConfig, + config: RpcSubscriptionsApiConfig, ): RpcSubscriptionsApi { return new Proxy({} as RpcSubscriptionsApi, { defineProperty() { @@ -41,30 +74,30 @@ export function createRpcSubscriptionsApi - ): RpcSubscriptionsRequest> { - const params = config?.parametersTransformer - ? config?.parametersTransformer(rawParams, notificationName) - : rawParams; - const responseTransformer = config?.responseTransformer - ? config?.responseTransformer> - : (rawResponse: unknown) => - rawResponse as ReturnType; - const subscribeMethodName = config?.subscribeNotificationNameTransformer - ? config?.subscribeNotificationNameTransformer(notificationName) - : notificationName; - const unsubscribeMethodName = config?.unsubscribeNotificationNameTransformer - ? config?.unsubscribeNotificationNameTransformer(notificationName) - : notificationName; + ): RpcSubscriptionsPlan> { + let _cachedSubscriptionHash: string | typeof UNINITIALIZED | undefined = UNINITIALIZED; return { - params, - responseTransformer, - subscribeMethodName, - unsubscribeMethodName, + executeSubscriptionPlan(planConfig) { + return config.planExecutor({ + ...planConfig, + notificationName, + params, + }); + }, + get subscriptionConfigurationHash() { + if (_cachedSubscriptionHash === UNINITIALIZED) { + _cachedSubscriptionHash = config?.getSubscriptionConfigurationHash?.({ + notificationName, + params, + }); + } + return _cachedSubscriptionHash; + }, }; }; }, diff --git a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-transport.ts b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-transport.ts index cac6d084230..84166531135 100644 --- a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-transport.ts +++ b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-transport.ts @@ -1,14 +1,19 @@ -type RpcSubscriptionsTransportConfig = Readonly<{ - payload: unknown; +import { SolanaError } from '@solana/errors'; +import { DataPublisher } from '@solana/subscribable'; + +import { RpcSubscriptionsPlan } from './rpc-subscriptions-api'; + +export type RpcSubscriptionsTransportDataEvents = { + error: SolanaError; + notification: TNotification; +}; + +interface RpcSubscriptionsTransportConfig extends RpcSubscriptionsPlan { signal: AbortSignal; -}>; +} export interface RpcSubscriptionsTransport { - (config: RpcSubscriptionsTransportConfig): Promise< - Readonly< - AsyncIterable & { - send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED: (payload: unknown) => Promise; - } - > - >; + ( + config: RpcSubscriptionsTransportConfig, + ): Promise>>; } diff --git a/packages/rpc-subscriptions-spec/src/rpc-subscriptions.ts b/packages/rpc-subscriptions-spec/src/rpc-subscriptions.ts index 4c956e65838..7f13eb15be2 100644 --- a/packages/rpc-subscriptions-spec/src/rpc-subscriptions.ts +++ b/packages/rpc-subscriptions-spec/src/rpc-subscriptions.ts @@ -1,32 +1,14 @@ -import { - SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID, - SolanaError, -} from '@solana/errors'; -import { getSolanaErrorFromJsonRpcError } from '@solana/errors'; -import { - Callable, - createRpcMessage, - Flatten, - OverloadImplementations, - RpcResponseData, - UnionToIntersection, -} from '@solana/rpc-spec-types'; +import { SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SolanaError } from '@solana/errors'; +import { Callable, Flatten, OverloadImplementations, UnionToIntersection } from '@solana/rpc-spec-types'; +import { createAsyncIterableFromDataPublisher } from '@solana/subscribable'; -import { RpcSubscriptionsApi } from './rpc-subscriptions-api'; -import { - PendingRpcSubscriptionsRequest, - RpcSubscribeOptions, - RpcSubscriptionsRequest, -} from './rpc-subscriptions-request'; +import { RpcSubscriptionsApi, RpcSubscriptionsPlan } from './rpc-subscriptions-api'; +import { PendingRpcSubscriptionsRequest, RpcSubscribeOptions } from './rpc-subscriptions-request'; import { RpcSubscriptionsTransport } from './rpc-subscriptions-transport'; -export type RpcSubscriptionsConfig< - TRpcMethods, - TRpcSubscriptionsTransport extends RpcSubscriptionsTransport, -> = Readonly<{ +export type RpcSubscriptionsConfig = Readonly<{ api: RpcSubscriptionsApi; - transport: TRpcSubscriptionsTransport; + transport: RpcSubscriptionsTransport; }>; export type RpcSubscriptions = { @@ -51,26 +33,8 @@ type PendingRpcSubscriptionsRequestReturnTypeMapper PendingRpcSubscriptionsRequest> : never; -type RpcNotification = Readonly<{ - params: Readonly<{ - result: TNotification; - subscription: number; - }>; -}>; - -type RpcSubscriptionId = number; - -export function createSubscriptionRpc< - TRpcSubscriptionsApiMethods, - TRpcSubscriptionsTransport extends RpcSubscriptionsTransport, ->( - rpcConfig: RpcSubscriptionsConfig, -): RpcSubscriptions { - return makeProxy(rpcConfig); -} - -function makeProxy( - rpcConfig: RpcSubscriptionsConfig, +export function createSubscriptionRpc( + rpcConfig: RpcSubscriptionsConfig, ): RpcSubscriptions { return new Proxy(rpcConfig.api, { defineProperty() { @@ -82,111 +46,35 @@ function makeProxy; } -function registerIterableCleanup(iterable: AsyncIterable, cleanupFn: CallableFunction) { - (async () => { - try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of iterable); - } catch { - /* empty */ - } finally { - // Run the cleanup function. - cleanupFn(); - } - })(); -} - -function createPendingRpcSubscription< - TRpcSubscriptionsApiMethods, - TRpcSubscriptionsTransport extends RpcSubscriptionsTransport, - TNotification, ->( - rpcConfig: RpcSubscriptionsConfig, - { params, subscribeMethodName, unsubscribeMethodName, responseTransformer }: RpcSubscriptionsRequest, +function createPendingRpcSubscription( + transport: RpcSubscriptionsTransport, + subscriptionsPlan: RpcSubscriptionsPlan, ): PendingRpcSubscriptionsRequest { return { async subscribe({ abortSignal }: RpcSubscribeOptions): Promise> { - abortSignal.throwIfAborted(); - let subscriptionId: number | undefined; - function handleCleanup() { - if (subscriptionId !== undefined) { - const payload = createRpcMessage(unsubscribeMethodName, [subscriptionId]); - connection.send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED(payload).finally(() => { - connectionAbortController.abort(); - }); - } else { - connectionAbortController.abort(); - } - } - abortSignal.addEventListener('abort', handleCleanup); - /** - * STEP 1: Send the subscribe message. - */ - const connectionAbortController = new AbortController(); - const subscribeMessage = createRpcMessage(subscribeMethodName, params); - const connection = await rpcConfig.transport({ - payload: subscribeMessage, - signal: connectionAbortController.signal, + const notificationsDataPublisher = await transport({ + signal: abortSignal, + ...subscriptionsPlan, + }); + return createAsyncIterableFromDataPublisher({ + abortSignal, + dataChannelName: 'notification', + dataPublisher: notificationsDataPublisher, + errorChannelName: 'error', }); - function handleConnectionCleanup() { - abortSignal.removeEventListener('abort', handleCleanup); - } - registerIterableCleanup(connection, handleConnectionCleanup); - /** - * STEP 2: Wait for the acknowledgement from the server with the subscription id. - */ - for await (const message of connection as AsyncIterable< - RpcNotification | RpcResponseData - >) { - if ('id' in message && message.id === subscribeMessage.id) { - if ('error' in message) { - throw getSolanaErrorFromJsonRpcError(message.error); - } else { - subscriptionId = message.result as RpcSubscriptionId; - break; - } - } - } - if (subscriptionId == null) { - throw new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID); - } - /** - * STEP 3: Return an iterable that yields notifications for this subscription id. - */ - return { - async *[Symbol.asyncIterator]() { - for await (const message of connection as AsyncIterable< - RpcNotification | RpcResponseData - >) { - if (!('params' in message) || message.params.subscription !== subscriptionId) { - continue; - } - const notification = message.params as TNotification; - yield responseTransformer - ? responseTransformer(notification, subscribeMethodName) - : notification; - } - }, - }; }, }; } diff --git a/packages/rpc-subscriptions/CHANGELOG.md b/packages/rpc-subscriptions/CHANGELOG.md index 0461120a32e..87a655241de 100644 --- a/packages/rpc-subscriptions/CHANGELOG.md +++ b/packages/rpc-subscriptions/CHANGELOG.md @@ -13,7 +13,7 @@ - @solana/fast-stable-stringify@2.0.0-rc.1 - @solana/functional@2.0.0-rc.1 - @solana/rpc-subscriptions-spec@2.0.0-rc.1 - - @solana/rpc-subscriptions-transport-websocket@2.0.0-rc.1 + - @solana/rpc-subscriptions-channel-websocket@2.0.0-rc.1 - @solana/rpc-transformers@2.0.0-rc.1 - @solana/rpc-types@2.0.0-rc.1 @@ -26,7 +26,7 @@ - Updated dependencies [[`29821df`](https://github.com/solana-labs/solana-web3.js/commit/29821df246b14eb41dd4606913f44fac40183957), [`677a9c4`](https://github.com/solana-labs/solana-web3.js/commit/677a9c4eb88a8ac6a9ede8d82f367c5ac8d69ff4)]: - @solana/rpc-transformers@2.0.0-rc.0 - @solana/errors@2.0.0-rc.0 - - @solana/rpc-subscriptions-transport-websocket@2.0.0-rc.0 + - @solana/rpc-subscriptions-channel-websocket@2.0.0-rc.0 - @solana/rpc-subscriptions-api@2.0.0-rc.0 - @solana/rpc-subscriptions-spec@2.0.0-rc.0 - @solana/rpc-types@2.0.0-rc.0 @@ -42,7 +42,7 @@ - Updated dependencies [[`4f19842`](https://github.com/solana-labs/solana-web3.js/commit/4f198423997d28d927f982333d268e19940656df), [`73bd5a9`](https://github.com/solana-labs/solana-web3.js/commit/73bd5a9e0b32846cd5d76f2d2d1b21661eab0677), [`be36bab`](https://github.com/solana-labs/solana-web3.js/commit/be36babd752b1c987a2f53b4ff83ac8c045a3418), [`367b8ad`](https://github.com/solana-labs/solana-web3.js/commit/367b8ad0cce55a916abfb0125f36b6e844333b2b)]: - @solana/errors@2.0.0-preview.4 - @solana/rpc-types@2.0.0-preview.4 - - @solana/rpc-subscriptions-transport-websocket@2.0.0-preview.4 + - @solana/rpc-subscriptions-channel-websocket@2.0.0-preview.4 - @solana/rpc-subscriptions-spec@2.0.0-preview.4 - @solana/fast-stable-stringify@2.0.0-preview.4 - @solana/rpc-subscriptions-api@2.0.0-preview.4 @@ -61,7 +61,7 @@ - @solana/rpc-types@2.0.0-preview.3 - @solana/rpc-transformers@2.0.0-preview.3 - @solana/rpc-subscriptions-spec@2.0.0-preview.3 - - @solana/rpc-subscriptions-transport-websocket@2.0.0-preview.3 + - @solana/rpc-subscriptions-channel-websocket@2.0.0-preview.3 - @solana/rpc-subscriptions-api@2.0.0-preview.3 - @solana/functional@2.0.0-preview.3 @@ -118,6 +118,6 @@ - @solana/functional@2.0.0-preview.2 - @solana/rpc-subscriptions-api@2.0.0-preview.2 - @solana/rpc-subscriptions-spec@2.0.0-preview.2 - - @solana/rpc-subscriptions-transport-websocket@2.0.0-preview.2 + - @solana/rpc-subscriptions-channel-websocket@2.0.0-preview.2 - @solana/rpc-transformers@2.0.0-preview.2 - @solana/rpc-types@2.0.0-preview.2 diff --git a/packages/rpc-subscriptions/README.md b/packages/rpc-subscriptions/README.md index a0ab99fc6ce..45cf4e6364b 100644 --- a/packages/rpc-subscriptions/README.md +++ b/packages/rpc-subscriptions/README.md @@ -18,6 +18,19 @@ This package contains types that implement RPC subscriptions as required by the ## Functions +### `createDefaultRpcSubscriptionsChannelCreator(config)` + +Creates a function that returns new subscription channels when called. + +#### Arguments + +A config object with the following properties: + +- `intervalMs`: The number of milliseconds to wait since the last message sent or received over the channel before sending a ping message to keep the channel open. +- `maxSubscriptionsPerChannel`: The number of subscribers that may share a channel before a new channel must be created. Set this to the maximum number of subscriptions that your RPC provider recommends making over a single connection. +- `minChannels`: The number of channels to create before reusing a channel for a new subscription. +- `sendBufferHighWatermark`: The number of bytes of data to admint into the `WebSocket` buffer before buffering data on the client. -`url`: The URL of the web socket server. Must use the `ws` or `wss` protocols. + ### `demultiplexDataPublisher(publisher, sourceChannelName, messageTransformer)` Given a channel that carries messages for multiple subscribers on a single channel name, this function returns a new `DataPublisher` that splits them into multiple channel names. diff --git a/packages/rpc-subscriptions/package.json b/packages/rpc-subscriptions/package.json index 886dbcd5c0b..0e3cbd7c2a2 100644 --- a/packages/rpc-subscriptions/package.json +++ b/packages/rpc-subscriptions/package.json @@ -72,10 +72,11 @@ ], "dependencies": { "@solana/errors": "workspace:*", + "@solana/functional": "workspace:*", "@solana/promises": "workspace:*", "@solana/rpc-subscriptions-api": "workspace:*", + "@solana/rpc-subscriptions-channel-websocket": "workspace:*", "@solana/rpc-subscriptions-spec": "workspace:*", - "@solana/rpc-subscriptions-transport-websocket": "workspace:*", "@solana/rpc-transformers": "workspace:*", "@solana/rpc-types": "workspace:*" }, diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts new file mode 100644 index 00000000000..e2755d3a82a --- /dev/null +++ b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts @@ -0,0 +1,51 @@ +import { createRpcSubscriptionsTransportFromChannelCreator } from '../rpc-subscriptions-transport'; + +describe('createRpcSubscriptionsTransportFromChannelCreator', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + it('creates a function that calls `createChannel` with the abort signal', () => { + const mockCreateChannel = jest.fn(); + const creator = createRpcSubscriptionsTransportFromChannelCreator(mockCreateChannel); + const abortSignal = new AbortController().signal; + creator({ + executeSubscriptionPlan: jest.fn(), + signal: abortSignal, + subscriptionConfigurationHash: undefined, + }); + expect(mockCreateChannel).toHaveBeenCalledWith({ abortSignal }); + }); + it('creates a function that calls `executeSubscriptionPlan` with the created channel', async () => { + expect.assertions(1); + const creator = createRpcSubscriptionsTransportFromChannelCreator(jest.fn().mockResolvedValue('MOCK_CHANNEL')); + const mockExecuteSubscriptionPlan = jest.fn(); + creator({ + executeSubscriptionPlan: mockExecuteSubscriptionPlan, + signal: new AbortController().signal, + subscriptionConfigurationHash: undefined, + }); + await jest.runAllTimersAsync(); + expect(mockExecuteSubscriptionPlan).toHaveBeenCalledWith( + expect.objectContaining({ + channel: 'MOCK_CHANNEL', + }), + ); + }); + it('creates a function that calls `executeSubscriptionPlan` with the abort signal', async () => { + expect.assertions(1); + const creator = createRpcSubscriptionsTransportFromChannelCreator(jest.fn()); + const mockExecuteSubscriptionPlan = jest.fn(); + const signal = new AbortController().signal; + creator({ + executeSubscriptionPlan: mockExecuteSubscriptionPlan, + signal, + subscriptionConfigurationHash: undefined, + }); + await jest.runAllTimersAsync(); + expect(mockExecuteSubscriptionPlan).toHaveBeenCalledWith( + expect.objectContaining({ + signal, + }), + ); + }); +}); diff --git a/packages/rpc-subscriptions/src/__typetests__/rpc-subscriptions-clusters-typetest.ts b/packages/rpc-subscriptions/src/__typetests__/rpc-subscriptions-clusters-typetest.ts index 9c614cc01d2..ea819f9e239 100644 --- a/packages/rpc-subscriptions/src/__typetests__/rpc-subscriptions-clusters-typetest.ts +++ b/packages/rpc-subscriptions/src/__typetests__/rpc-subscriptions-clusters-typetest.ts @@ -1,14 +1,21 @@ import type { SolanaRpcSubscriptionsApi, SolanaRpcSubscriptionsApiUnstable } from '@solana/rpc-subscriptions-api'; -import type { RpcSubscriptions, RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; +import type { + RpcSubscriptions, + RpcSubscriptionsChannelCreator, + RpcSubscriptionsTransport, +} from '@solana/rpc-subscriptions-spec'; import { devnet, mainnet, testnet } from '@solana/rpc-types'; import { createSolanaRpcSubscriptions, createSolanaRpcSubscriptions_UNSTABLE, createSolanaRpcSubscriptionsFromTransport, - createSolanaRpcSubscriptionsFromTransport_UNSTABLE, } from '../rpc-subscriptions'; +import { createDefaultRpcSubscriptionsChannelCreator } from '../rpc-subscriptions-channel'; import type { + RpcSubscriptionsChannelCreatorDevnet, + RpcSubscriptionsChannelCreatorMainnet, + RpcSubscriptionsChannelCreatorTestnet, RpcSubscriptionsDevnet, RpcSubscriptionsMainnet, RpcSubscriptionsTestnet, @@ -16,7 +23,7 @@ import type { RpcSubscriptionsTransportMainnet, RpcSubscriptionsTransportTestnet, } from '../rpc-subscriptions-clusters'; -import { createDefaultRpcSubscriptionsTransport } from '../rpc-subscriptions-transport'; +import { createRpcSubscriptionsTransportFromChannelCreator } from '../rpc-subscriptions-transport'; // Define cluster-aware URLs and transports. @@ -25,64 +32,118 @@ const devnetUrl = devnet('https://api.devnet.solana.com'); const testnetUrl = testnet('https://api.testnet.solana.com'); const mainnetUrl = mainnet('https://api.mainnet-beta.solana.com'); -const genericTransport = createDefaultRpcSubscriptionsTransport({ url: genericUrl }); -const devnetTransport = createDefaultRpcSubscriptionsTransport({ url: devnetUrl }); -const testnetTransport = createDefaultRpcSubscriptionsTransport({ url: testnetUrl }); -const mainnetTransport = createDefaultRpcSubscriptionsTransport({ url: mainnetUrl }); +// [DESCRIBE] createDefaultRpcSubscriptionsChannelCreator. +{ + const genericChannelCreator = createDefaultRpcSubscriptionsChannelCreator({ url: genericUrl }); + const devnetChannelCreator = createDefaultRpcSubscriptionsChannelCreator({ url: devnetUrl }); + const testnetChannelCreator = createDefaultRpcSubscriptionsChannelCreator({ url: testnetUrl }); + const mainnetChannelCreator = createDefaultRpcSubscriptionsChannelCreator({ url: mainnetUrl }); + + // When no cluster is specified, it should be a generic `RpcSubscriptionsChannel`. + { + genericChannelCreator satisfies RpcSubscriptionsChannelCreator; + // @ts-expect-error Should not be a testnet channel + genericChannelCreator satisfies RpcSubscriptionsChannelCreatorDevnet; + // @ts-expect-error Should not be a testnet channel + genericChannelCreator satisfies RpcSubscriptionsChannelCreatorTestnet; + // @ts-expect-error Should not be a mainnet channel + genericChannelCreator satisfies RpcSubscriptionsChannelCreatorMainnet; + } + + // Devnet cluster should be `RpcSubscriptionsChannelCreatorDevnet`. + { + devnetChannelCreator satisfies RpcSubscriptionsChannelCreatorDevnet; + // @ts-expect-error Should not be a testnet channel + devnetChannelCreator satisfies RpcSubscriptionsChannelCreatorTestnet; + // @ts-expect-error Should not be a mainnet channel + devnetChannelCreator satisfies RpcSubscriptionsChannelCreatorMainnet; + } + + // Testnet cluster should be `RpcSubscriptionsChannelCreatorTestnet`. + { + testnetChannelCreator satisfies RpcSubscriptionsChannelCreatorTestnet; + // @ts-expect-error Should not be a devnet channel + testnetChannelCreator satisfies RpcSubscriptionsChannelCreatorDevnet; + // @ts-expect-error Should not be a mainnet channel + testnetChannelCreator satisfies RpcSubscriptionsChannelCreatorMainnet; + } + + // Mainnet cluster should be `RpcSubscriptionsChannelCreatorMainnet`. + { + mainnetChannelCreator satisfies RpcSubscriptionsChannelCreatorMainnet; + // @ts-expect-error Should not be a devnet channel + mainnetChannelCreator satisfies RpcSubscriptionsChannelCreatorDevnet; + // @ts-expect-error Should not be a testnet channel + mainnetChannelCreator satisfies RpcSubscriptionsChannelCreatorTestnet; + } +} -// [DESCRIBE] createDefaultRpcSubscriptionsTransport. +// [DESCRIBE] createRpcSubscriptionsTransportFromChannelCreator. { - // No cluster specified should be generic `RpcSubscriptionsTransport`. + const genericTransport = createRpcSubscriptionsTransportFromChannelCreator( + null as unknown as RpcSubscriptionsChannelCreator, + ); + const devnetTransport = createRpcSubscriptionsTransportFromChannelCreator( + null as unknown as RpcSubscriptionsChannelCreatorDevnet, + ); + const testnetTransport = createRpcSubscriptionsTransportFromChannelCreator( + null as unknown as RpcSubscriptionsChannelCreatorTestnet, + ); + const mainnetTransport = createRpcSubscriptionsTransportFromChannelCreator( + null as unknown as RpcSubscriptionsChannelCreatorMainnet, + ); + + // When no cluster is specified, it should be a generic `RpcSubscriptionsTransport{ { genericTransport satisfies RpcSubscriptionsTransport; - //@ts-expect-error Should not be a devnet transport + // @ts-expect-error Should not be a testnet channel genericTransport satisfies RpcSubscriptionsTransportDevnet; - //@ts-expect-error Should not be a testnet transport + // @ts-expect-error Should not be a testnet channel genericTransport satisfies RpcSubscriptionsTransportTestnet; - //@ts-expect-error Should not be a mainnet transport + // @ts-expect-error Should not be a mainnet channel genericTransport satisfies RpcSubscriptionsTransportMainnet; } // Devnet cluster should be `RpcSubscriptionsTransportDevnet`. { devnetTransport satisfies RpcSubscriptionsTransportDevnet; - //@ts-expect-error Should not be a testnet transport + // @ts-expect-error Should not be a testnet channel devnetTransport satisfies RpcSubscriptionsTransportTestnet; - //@ts-expect-error Should not be a mainnet transport + // @ts-expect-error Should not be a mainnet channel devnetTransport satisfies RpcSubscriptionsTransportMainnet; } // Testnet cluster should be `RpcSubscriptionsTransportTestnet`. { testnetTransport satisfies RpcSubscriptionsTransportTestnet; - //@ts-expect-error Should not be a devnet transport + // @ts-expect-error Should not be a devnet channel testnetTransport satisfies RpcSubscriptionsTransportDevnet; - //@ts-expect-error Should not be a mainnet transport + // @ts-expect-error Should not be a mainnet channel testnetTransport satisfies RpcSubscriptionsTransportMainnet; } // Mainnet cluster should be `RpcSubscriptionsTransportMainnet`. { mainnetTransport satisfies RpcSubscriptionsTransportMainnet; - //@ts-expect-error Should not be a devnet transport + // @ts-expect-error Should not be a devnet channel mainnetTransport satisfies RpcSubscriptionsTransportDevnet; - //@ts-expect-error Should not be a testnet transport + // @ts-expect-error Should not be a testnet channel mainnetTransport satisfies RpcSubscriptionsTransportTestnet; } } // [DESCRIBE] createSolanaRpcSubscriptionsFromTransport. { - const genericRpc = createSolanaRpcSubscriptionsFromTransport(genericTransport); - const devnetRpc = createSolanaRpcSubscriptionsFromTransport(devnetTransport); - const tesnetRpc = createSolanaRpcSubscriptionsFromTransport(testnetTransport); - const mainnetRpc = createSolanaRpcSubscriptionsFromTransport(mainnetTransport); + const genericRpc = createSolanaRpcSubscriptionsFromTransport(null as unknown as RpcSubscriptionsTransport); + const devnetRpc = createSolanaRpcSubscriptionsFromTransport(null as unknown as RpcSubscriptionsTransportDevnet); + const testnetRpc = createSolanaRpcSubscriptionsFromTransport(null as unknown as RpcSubscriptionsTransportTestnet); + const mainnetRpc = createSolanaRpcSubscriptionsFromTransport(null as unknown as RpcSubscriptionsTransportMainnet); // Checking stable subscriptions. { genericRpc satisfies RpcSubscriptions; devnetRpc satisfies RpcSubscriptions; - tesnetRpc satisfies RpcSubscriptions; + testnetRpc satisfies RpcSubscriptions; mainnetRpc satisfies RpcSubscriptions; // @ts-expect-error Should not have unstable subscriptions @@ -90,19 +151,19 @@ const mainnetTransport = createDefaultRpcSubscriptionsTransport({ url: mainnetUr // @ts-expect-error Should not have unstable subscriptions devnetRpc satisfies RpcSubscriptions; // @ts-expect-error Should not have unstable subscriptions - tesnetRpc satisfies RpcSubscriptions; + testnetRpc satisfies RpcSubscriptions; // @ts-expect-error Should not have unstable subscriptions mainnetRpc satisfies RpcSubscriptions; } - // No cluster specified should be generic `RpcSubscriptions`. + // When no cluster is specified, it should be a generic `RpcSubscriptions`. { genericRpc satisfies RpcSubscriptions; - //@ts-expect-error Should not be a devnet RPC + // @ts-expect-error Should not be a devnet RPC genericRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a testnet RPC + // @ts-expect-error Should not be a testnet RPC genericRpc satisfies RpcSubscriptionsTestnet; - //@ts-expect-error Should not be a mainnet RPC + // @ts-expect-error Should not be a mainnet RPC genericRpc satisfies RpcSubscriptionsMainnet; } @@ -110,61 +171,45 @@ const mainnetTransport = createDefaultRpcSubscriptionsTransport({ url: mainnetUr { devnetRpc satisfies RpcSubscriptions; devnetRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a testnet RPC + // @ts-expect-error Should not be a testnet RPC devnetRpc satisfies RpcSubscriptionsTestnet; - //@ts-expect-error Should not be a mainnet RPC + // @ts-expect-error Should not be a mainnet RPC devnetRpc satisfies RpcSubscriptionsMainnet; } // Testnet cluster should be `RpcSubscriptionsTestnet`. { - tesnetRpc satisfies RpcSubscriptions; - tesnetRpc satisfies RpcSubscriptionsTestnet; - //@ts-expect-error Should not be a devnet RPC - tesnetRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a mainnet RPC - tesnetRpc satisfies RpcSubscriptionsMainnet; + testnetRpc satisfies RpcSubscriptions; + testnetRpc satisfies RpcSubscriptionsTestnet; + // @ts-expect-error Should not be a devnet RPC + testnetRpc satisfies RpcSubscriptionsDevnet; + // @ts-expect-error Should not be a mainnet RPC + testnetRpc satisfies RpcSubscriptionsMainnet; } // Mainnet cluster should be `RpcSubscriptionsMainnet`. { mainnetRpc satisfies RpcSubscriptions; mainnetRpc satisfies RpcSubscriptionsMainnet; - //@ts-expect-error Should not be a devnet RPC + // @ts-expect-error Should not be a devnet RPC mainnetRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a testnet RPC + // @ts-expect-error Should not be a testnet RPC mainnetRpc satisfies RpcSubscriptionsTestnet; } } -// [DESCRIBE] createSolanaRpcSubscriptionsFromTransport_UNSTABLE. -{ - const genericRpc = createSolanaRpcSubscriptionsFromTransport_UNSTABLE(genericTransport); - const devnetRpc = createSolanaRpcSubscriptionsFromTransport_UNSTABLE(devnetTransport); - const tesnetRpc = createSolanaRpcSubscriptionsFromTransport_UNSTABLE(testnetTransport); - const mainnetRpc = createSolanaRpcSubscriptionsFromTransport_UNSTABLE(mainnetTransport); - - // Checking unstable subscriptions. - { - genericRpc satisfies RpcSubscriptions; - devnetRpc satisfies RpcSubscriptionsDevnet; - tesnetRpc satisfies RpcSubscriptionsTestnet; - mainnetRpc satisfies RpcSubscriptionsMainnet; - } -} - // [DESCRIBE] createSolanaRpcSubscriptions. { const genericRpc = createSolanaRpcSubscriptions(genericUrl); const devnetRpc = createSolanaRpcSubscriptions(devnetUrl); - const tesnetRpc = createSolanaRpcSubscriptions(testnetUrl); + const testnetRpc = createSolanaRpcSubscriptions(testnetUrl); const mainnetRpc = createSolanaRpcSubscriptions(mainnetUrl); // Checking stable subscriptions. { genericRpc satisfies RpcSubscriptions; devnetRpc satisfies RpcSubscriptions; - tesnetRpc satisfies RpcSubscriptions; + testnetRpc satisfies RpcSubscriptions; mainnetRpc satisfies RpcSubscriptions; // @ts-expect-error Should not have unstable subscriptions @@ -172,19 +217,19 @@ const mainnetTransport = createDefaultRpcSubscriptionsTransport({ url: mainnetUr // @ts-expect-error Should not have unstable subscriptions devnetRpc satisfies RpcSubscriptions; // @ts-expect-error Should not have unstable subscriptions - tesnetRpc satisfies RpcSubscriptions; + testnetRpc satisfies RpcSubscriptions; // @ts-expect-error Should not have unstable subscriptions mainnetRpc satisfies RpcSubscriptions; } - // No cluster specified should be generic `RpcSubscriptions`. + // When no cluster is specified, it should be a generic `RpcSubscriptions`. { genericRpc satisfies RpcSubscriptions; - //@ts-expect-error Should not be a devnet RPC + // @ts-expect-error Should not be a devnet RPC genericRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a testnet RPC + // @ts-expect-error Should not be a testnet RPC genericRpc satisfies RpcSubscriptionsTestnet; - //@ts-expect-error Should not be a mainnet RPC + // @ts-expect-error Should not be a mainnet RPC genericRpc satisfies RpcSubscriptionsMainnet; } @@ -192,29 +237,29 @@ const mainnetTransport = createDefaultRpcSubscriptionsTransport({ url: mainnetUr { devnetRpc satisfies RpcSubscriptions; devnetRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a testnet RPC + // @ts-expect-error Should not be a testnet RPC devnetRpc satisfies RpcSubscriptionsTestnet; - //@ts-expect-error Should not be a mainnet RPC + // @ts-expect-error Should not be a mainnet RPC devnetRpc satisfies RpcSubscriptionsMainnet; } // Testnet cluster should be `RpcSubscriptionsTestnet`. { - tesnetRpc satisfies RpcSubscriptions; - tesnetRpc satisfies RpcSubscriptionsTestnet; - //@ts-expect-error Should not be a devnet RPC - tesnetRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a mainnet RPC - tesnetRpc satisfies RpcSubscriptionsMainnet; + testnetRpc satisfies RpcSubscriptions; + testnetRpc satisfies RpcSubscriptionsTestnet; + // @ts-expect-error Should not be a devnet RPC + testnetRpc satisfies RpcSubscriptionsDevnet; + // @ts-expect-error Should not be a mainnet RPC + testnetRpc satisfies RpcSubscriptionsMainnet; } // Mainnet cluster should be `RpcSubscriptionsMainnet`. { mainnetRpc satisfies RpcSubscriptions; mainnetRpc satisfies RpcSubscriptionsMainnet; - //@ts-expect-error Should not be a devnet RPC + // @ts-expect-error Should not be a devnet RPC mainnetRpc satisfies RpcSubscriptionsDevnet; - //@ts-expect-error Should not be a testnet RPC + // @ts-expect-error Should not be a testnet RPC mainnetRpc satisfies RpcSubscriptionsTestnet; } } @@ -223,14 +268,14 @@ const mainnetTransport = createDefaultRpcSubscriptionsTransport({ url: mainnetUr { const genericRpc = createSolanaRpcSubscriptions_UNSTABLE(genericUrl); const devnetRpc = createSolanaRpcSubscriptions_UNSTABLE(devnetUrl); - const tesnetRpc = createSolanaRpcSubscriptions_UNSTABLE(testnetUrl); + const testnetRpc = createSolanaRpcSubscriptions_UNSTABLE(testnetUrl); const mainnetRpc = createSolanaRpcSubscriptions_UNSTABLE(mainnetUrl); // Checking unstable subscriptions. { genericRpc satisfies RpcSubscriptions; devnetRpc satisfies RpcSubscriptionsDevnet; - tesnetRpc satisfies RpcSubscriptionsTestnet; + testnetRpc satisfies RpcSubscriptionsTestnet; mainnetRpc satisfies RpcSubscriptionsMainnet; } } diff --git a/packages/rpc-subscriptions/src/index.ts b/packages/rpc-subscriptions/src/index.ts index 1c8a70849fa..526cddb901f 100644 --- a/packages/rpc-subscriptions/src/index.ts +++ b/packages/rpc-subscriptions/src/index.ts @@ -4,4 +4,6 @@ export * from '@solana/rpc-subscriptions-spec'; export * from './rpc-default-config'; export * from './rpc-subscriptions'; export * from './rpc-subscriptions-clusters'; +export * from './rpc-subscriptions-json'; +export * from './rpc-subscriptions-channel'; export * from './rpc-subscriptions-transport'; diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-channel.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-channel.ts new file mode 100644 index 00000000000..7ee64a893b1 --- /dev/null +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-channel.ts @@ -0,0 +1,55 @@ +import { createWebSocketChannel } from '@solana/rpc-subscriptions-channel-websocket'; +import type { ClusterUrl } from '@solana/rpc-types'; + +import { getRpcSubscriptionsChannelWithAutoping } from './rpc-subscriptions-autopinger'; +import { getChannelPoolingChannelCreator } from './rpc-subscriptions-channel-pool'; +import { RpcSubscriptionsChannelCreatorFromClusterUrl } from './rpc-subscriptions-clusters'; +import { getRpcSubscriptionsChannelWithJSONSerialization } from './rpc-subscriptions-json'; + +export type DefaultRpcSubscriptionsChannelConfig = Readonly<{ + intervalMs?: number; + maxSubscriptionsPerChannel?: number; + minChannels?: number; + sendBufferHighWatermark?: number; + url: TClusterUrl; +}>; + +export function createDefaultRpcSubscriptionsChannelCreator( + config: DefaultRpcSubscriptionsChannelConfig, +): RpcSubscriptionsChannelCreatorFromClusterUrl { + if (/^wss?:/i.test(config.url) === false) { + const protocolMatch = config.url.match(/^([^:]+):/); + throw new DOMException( + protocolMatch + ? "Failed to construct 'WebSocket': The URL's scheme must be either 'ws' or " + + `'wss'. '${protocolMatch[1]}:' is not allowed.` + : `Failed to construct 'WebSocket': The URL '${config.url}' is invalid.`, + ); + } + const { intervalMs, ...rest } = config; + const createDefaultRpcSubscriptionsChannel = (({ abortSignal }) => { + return createWebSocketChannel({ + ...rest, + sendBufferHighWatermark: + config.sendBufferHighWatermark ?? + // Let 128KB of data into the WebSocket buffer before buffering it in the app. + 131_072, + signal: abortSignal, + }) + .then(getRpcSubscriptionsChannelWithJSONSerialization) + .then(channel => + getRpcSubscriptionsChannelWithAutoping({ + abortSignal, + channel, + intervalMs: intervalMs ?? 5_000, + }), + ); + }) as RpcSubscriptionsChannelCreatorFromClusterUrl; + return getChannelPoolingChannelCreator(createDefaultRpcSubscriptionsChannel, { + maxSubscriptionsPerChannel: + config.maxSubscriptionsPerChannel ?? + // TODO: Determine this experimentally + 1_000, + minChannels: config.minChannels ?? 1, + }); +} diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-clusters.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-clusters.ts index 34fee35baa2..1ba7d3d1303 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-clusters.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-clusters.ts @@ -1,6 +1,73 @@ -import type { RpcSubscriptions, RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; +import type { + RpcSubscriptions, + RpcSubscriptionsChannel, + RpcSubscriptionsChannelCreator, + RpcSubscriptionsTransport, +} from '@solana/rpc-subscriptions-spec'; import type { ClusterUrl, DevnetUrl, MainnetUrl, TestnetUrl } from '@solana/rpc-types'; +export type RpcSubscriptionsChannelCreatorDevnet = RpcSubscriptionsChannelCreator< + TOutboundMessage, + TInboundMessage +> & { + '~cluster': 'devnet'; +}; +export type RpcSubscriptionsChannelCreatorTestnet = RpcSubscriptionsChannelCreator< + TOutboundMessage, + TInboundMessage +> & { + '~cluster': 'testnet'; +}; +export type RpcSubscriptionsChannelCreatorMainnet = RpcSubscriptionsChannelCreator< + TOutboundMessage, + TInboundMessage +> & { + '~cluster': 'mainnet'; +}; +export type RpcSubscriptionsChannelCreatorWithCluster = + | RpcSubscriptionsChannelCreatorDevnet + | RpcSubscriptionsChannelCreatorMainnet + | RpcSubscriptionsChannelCreatorTestnet; +export type RpcSubscriptionsChannelCreatorFromClusterUrl< + TClusterUrl extends ClusterUrl, + TOutboundMessage, + TInboundMessage, +> = TClusterUrl extends DevnetUrl + ? RpcSubscriptionsChannelCreatorDevnet + : TClusterUrl extends TestnetUrl + ? RpcSubscriptionsChannelCreatorTestnet + : TClusterUrl extends MainnetUrl + ? RpcSubscriptionsChannelCreatorMainnet + : RpcSubscriptionsChannelCreator; + +export type RpcSubscriptionsChannelDevnet = RpcSubscriptionsChannel< + TOutboundMessage, + TInboundMessage +> & { '~cluster': 'devnet' }; +export type RpcSubscriptionsChannelTestnet = RpcSubscriptionsChannel< + TOutboundMessage, + TInboundMessage +> & { '~cluster': 'testnet' }; +export type RpcSubscriptionsChannelMainnet = RpcSubscriptionsChannel< + TOutboundMessage, + TInboundMessage +> & { '~cluster': 'mainnet' }; +export type RpcSubscriptionsChannelWithCluster = + | RpcSubscriptionsChannelDevnet + | RpcSubscriptionsChannelMainnet + | RpcSubscriptionsChannelTestnet; +export type RpcSubscriptionsChannelFromClusterUrl< + TClusterUrl extends ClusterUrl, + TOutboundMessage, + TInboundMessage, +> = TClusterUrl extends DevnetUrl + ? RpcSubscriptionsChannelDevnet + : TClusterUrl extends TestnetUrl + ? RpcSubscriptionsChannelTestnet + : TClusterUrl extends MainnetUrl + ? RpcSubscriptionsChannelMainnet + : RpcSubscriptionsChannel; + export type RpcSubscriptionsTransportDevnet = RpcSubscriptionsTransport & { '~cluster': 'devnet' }; export type RpcSubscriptionsTransportTestnet = RpcSubscriptionsTransport & { '~cluster': 'testnet' }; export type RpcSubscriptionsTransportMainnet = RpcSubscriptionsTransport & { '~cluster': 'mainnet' }; diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts index e22d03694ed..354947bc948 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts @@ -1,23 +1,45 @@ -import { createWebSocketTransport } from '@solana/rpc-subscriptions-transport-websocket'; -import type { ClusterUrl } from '@solana/rpc-types'; +import { pipe } from '@solana/functional'; +import { RpcSubscriptionsChannelCreator, RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; +import { ClusterUrl } from '@solana/rpc-types'; -import { RpcSubscriptionsTransportFromClusterUrl } from './rpc-subscriptions-clusters'; +import { + RpcSubscriptionsChannelCreatorDevnet, + RpcSubscriptionsChannelCreatorFromClusterUrl, + RpcSubscriptionsChannelCreatorMainnet, + RpcSubscriptionsChannelCreatorTestnet, + RpcSubscriptionsTransportDevnet, + RpcSubscriptionsTransportFromClusterUrl, + RpcSubscriptionsTransportMainnet, + RpcSubscriptionsTransportTestnet, +} from './rpc-subscriptions-clusters'; export type DefaultRpcSubscriptionsTransportConfig = Readonly<{ - intervalMs?: number; - sendBufferHighWatermark?: number; - url: TClusterUrl; + createChannel: RpcSubscriptionsChannelCreatorFromClusterUrl; }>; -export function createDefaultRpcSubscriptionsTransport( - config: DefaultRpcSubscriptionsTransportConfig, -): RpcSubscriptionsTransportFromClusterUrl { - const { /* `intervalMs` will make a comeback; stay tuned */ ...rest } = config; - return createWebSocketTransport({ - ...rest, - sendBufferHighWatermark: - config.sendBufferHighWatermark ?? - // Let 128KB of data into the WebSocket buffer before buffering it in the app. - 131_072, - }) as RpcSubscriptionsTransportFromClusterUrl; +export function createDefaultRpcSubscriptionsTransport({ + createChannel, +}: DefaultRpcSubscriptionsTransportConfig) { + return pipe( + createRpcSubscriptionsTransportFromChannelCreator( + createChannel, + ) as RpcSubscriptionsTransport as RpcSubscriptionsTransportFromClusterUrl, + ); +} + +export function createRpcSubscriptionsTransportFromChannelCreator< + TChannelCreator extends RpcSubscriptionsChannelCreator, + TInboundMessage, + TOutboundMessage, +>(createChannel: TChannelCreator) { + return (async ({ executeSubscriptionPlan, signal }) => { + const channel = await createChannel({ abortSignal: signal }); + return await executeSubscriptionPlan({ channel, signal }); + }) as TChannelCreator extends RpcSubscriptionsChannelCreatorDevnet + ? RpcSubscriptionsTransportDevnet + : TChannelCreator extends RpcSubscriptionsChannelCreatorTestnet + ? RpcSubscriptionsTransportTestnet + : TChannelCreator extends RpcSubscriptionsChannelCreatorMainnet + ? RpcSubscriptionsTransportMainnet + : RpcSubscriptionsTransport; } diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions.ts b/packages/rpc-subscriptions/src/rpc-subscriptions.ts index 2b75b0e37d8..e755ad707a0 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions.ts @@ -8,25 +8,42 @@ import { import { ClusterUrl } from '@solana/rpc-types'; import { DEFAULT_RPC_SUBSCRIPTIONS_CONFIG } from './rpc-default-config'; +import { + createDefaultRpcSubscriptionsChannelCreator, + DefaultRpcSubscriptionsChannelConfig, +} from './rpc-subscriptions-channel'; import type { RpcSubscriptionsFromTransport } from './rpc-subscriptions-clusters'; import { createDefaultRpcSubscriptionsTransport, DefaultRpcSubscriptionsTransportConfig, } from './rpc-subscriptions-transport'; -export function createSolanaRpcSubscriptions< - TClusterUrl extends ClusterUrl, - TApi extends RpcSubscriptionsApiMethods = SolanaRpcSubscriptionsApi, ->(clusterUrl: TClusterUrl, config?: Omit, 'url'>) { - const transport = createDefaultRpcSubscriptionsTransport({ url: clusterUrl, ...config }); +interface DefaultRpcSubscriptionsConfig + extends DefaultRpcSubscriptionsTransportConfig, + DefaultRpcSubscriptionsChannelConfig {} + +function createSolanaRpcSubscriptionsImpl( + clusterUrl: TClusterUrl, + config?: Omit, 'url'>, +) { + const transport = createDefaultRpcSubscriptionsTransport({ + createChannel: createDefaultRpcSubscriptionsChannelCreator({ ...config, url: clusterUrl }), + }); return createSolanaRpcSubscriptionsFromTransport(transport); } +export function createSolanaRpcSubscriptions( + clusterUrl: TClusterUrl, + config?: Omit, 'url'>, +) { + return createSolanaRpcSubscriptionsImpl(clusterUrl, config); +} + export function createSolanaRpcSubscriptions_UNSTABLE( clusterUrl: TClusterUrl, config?: Omit, 'url'>, ) { - return createSolanaRpcSubscriptions( + return createSolanaRpcSubscriptionsImpl( clusterUrl, config, ); @@ -41,12 +58,3 @@ export function createSolanaRpcSubscriptionsFromTransport< transport, }) as RpcSubscriptionsFromTransport; } - -export function createSolanaRpcSubscriptionsFromTransport_UNSTABLE( - transport: TTransport, -) { - return createSolanaRpcSubscriptionsFromTransport< - TTransport, - SolanaRpcSubscriptionsApi & SolanaRpcSubscriptionsApiUnstable - >(transport); -} diff --git a/packages/rpc-transformers/src/response-transformer.ts b/packages/rpc-transformers/src/response-transformer.ts index e24997e64e9..40a86d59816 100644 --- a/packages/rpc-transformers/src/response-transformer.ts +++ b/packages/rpc-transformers/src/response-transformer.ts @@ -1,4 +1,3 @@ -import { getSolanaErrorFromJsonRpcError } from '@solana/errors'; import { pipe } from '@solana/functional'; import { RpcRequest, RpcResponse, RpcResponseTransformer } from '@solana/rpc-spec'; @@ -28,16 +27,10 @@ export function getDefaultResponseTransformerForSolanaRpc( }; } -type JsonRpcResponse = { error: Parameters[0] } | { result: unknown }; - export function getDefaultResponseTransformerForSolanaRpcSubscriptions( config?: ResponseTransformerConfig, -): (response: unknown, notificationName: string) => T { - return (rawResponse: unknown, notificationName: string): T => { - const rawData = rawResponse as JsonRpcResponse; - if ('error' in rawData) { - throw getSolanaErrorFromJsonRpcError(rawData.error); - } +): (notification: unknown, notificationName: string) => T { + return (notification: unknown, notificationName: string): T => { const keyPaths = config?.allowedNumericKeyPaths && notificationName ? config.allowedNumericKeyPaths[notificationName as keyof TApi] @@ -46,6 +39,6 @@ export function getDefaultResponseTransformerForSolanaRpcSubscriptions( const initialState = { keyPath: [], }; - return traverse(rawData.result, initialState) as T; + return traverse(notification, initialState) as T; }; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5c4251034b9..ec88d44eeb2 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -857,18 +857,21 @@ importers: '@solana/errors': specifier: workspace:* version: link:../errors + '@solana/functional': + specifier: workspace:* + version: link:../functional '@solana/promises': specifier: workspace:* version: link:../promises '@solana/rpc-subscriptions-api': specifier: workspace:* version: link:../rpc-subscriptions-api + '@solana/rpc-subscriptions-channel-websocket': + specifier: workspace:* + version: link:../rpc-subscriptions-channel-websocket '@solana/rpc-subscriptions-spec': specifier: workspace:* version: link:../rpc-subscriptions-spec - '@solana/rpc-subscriptions-transport-websocket': - specifier: workspace:* - version: link:../rpc-subscriptions-transport-websocket '@solana/rpc-transformers': specifier: workspace:* version: link:../rpc-transformers @@ -884,6 +887,9 @@ importers: '@solana/addresses': specifier: workspace:* version: link:../addresses + '@solana/fast-stable-stringify': + specifier: workspace:* + version: link:../fast-stable-stringify '@solana/keys': specifier: workspace:* version: link:../keys @@ -906,39 +912,27 @@ importers: specifier: '>=5' version: 5.5.2 devDependencies: - '@solana/rpc-subscriptions-transport-websocket': + '@solana/rpc-subscriptions-channel-websocket': specifier: workspace:* - version: link:../rpc-subscriptions-transport-websocket + version: link:../rpc-subscriptions-channel-websocket - packages/rpc-subscriptions-spec: + packages/rpc-subscriptions-channel-websocket: dependencies: '@solana/errors': specifier: workspace:* version: link:../errors - '@solana/promises': + '@solana/functional': specifier: workspace:* - version: link:../promises - '@solana/rpc-spec-types': + version: link:../functional + '@solana/rpc-subscriptions-spec': specifier: workspace:* - version: link:../rpc-spec-types + version: link:../rpc-subscriptions-spec '@solana/subscribable': specifier: workspace:* version: link:../subscribable typescript: specifier: '>=5' - version: 5.5.2 - - packages/rpc-subscriptions-transport-websocket: - dependencies: - '@solana/errors': - specifier: workspace:* - version: link:../errors - '@solana/rpc-subscriptions-spec': - specifier: workspace:* - version: link:../rpc-subscriptions-spec - typescript: - specifier: '>=5' - version: 5.5.2 + version: 5.6.2 ws: specifier: ^8.14.0 version: 8.14.2(bufferutil@4.0.8)(utf-8-validate@5.0.10) @@ -950,6 +944,24 @@ importers: specifier: ^2.5.0 version: 2.5.0 + packages/rpc-subscriptions-spec: + dependencies: + '@solana/errors': + specifier: workspace:* + version: link:../errors + '@solana/promises': + specifier: workspace:* + version: link:../promises + '@solana/rpc-spec-types': + specifier: workspace:* + version: link:../rpc-spec-types + '@solana/subscribable': + specifier: workspace:* + version: link:../subscribable + typescript: + specifier: '>=5' + version: 5.5.2 + packages/rpc-transformers: dependencies: '@solana/errors':