diff --git a/packages/errors/src/codes.ts b/packages/errors/src/codes.ts index 08a4ddcd74f..e3fb2cf5616 100644 --- a/packages/errors/src/codes.ts +++ b/packages/errors/src/codes.ts @@ -286,9 +286,9 @@ export const SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD = 8100003 as con // Reserve error codes in the range [8190000-8190999]. export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST = 8190000 as const; export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID = 8190001 as const; -export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED = 8190002 as const; -export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED = 8190003 as const; -export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT = 8190004 as const; +export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED = 8190002 as const; +export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED = 8190003 as const; +export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT = 8190004 as const; // Invariant violation errors. // Reserve error codes in the range [9900000-9900999]. @@ -460,10 +460,10 @@ export type SolanaErrorCode = | typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR | typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID - | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED - | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED - | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT | typeof SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS | typeof SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER | typeof SOLANA_ERROR__SIGNER__EXPECTED_MESSAGE_MODIFYING_SIGNER diff --git a/packages/errors/src/context.ts b/packages/errors/src/context.ts index 7bcb9d536c0..d7a3c5b48a4 100644 --- a/packages/errors/src/context.ts +++ b/packages/errors/src/context.ts @@ -124,7 +124,7 @@ import { SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS, SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER, SOLANA_ERROR__SIGNER__EXPECTED_MESSAGE_MODIFYING_SIGNER, @@ -490,7 +490,7 @@ export type SolanaErrorContext = DefaultUnspecifiedErrorContextToUndefined< [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST]: { notificationName: string; }; - [SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT]: { + [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT]: { errorEvent: Event; }; [SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD]: { diff --git a/packages/errors/src/messages.ts b/packages/errors/src/messages.ts index 6e57f42393d..172dcc59a0d 100644 --- a/packages/errors/src/messages.ts +++ b/packages/errors/src/messages.ts @@ -141,10 +141,10 @@ import { SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS, SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER, SOLANA_ERROR__SIGNER__EXPECTED_MESSAGE_MODIFYING_SIGNER, @@ -451,12 +451,12 @@ export const SolanaErrorMessages: Readonly<{ "Either the notification name must end in 'Notifications' or the API must supply a " + "subscription creator function for the notification '$notificationName' to map between " + 'the notification name and the subscribe/unsubscribe method names.', + [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED]: + 'WebSocket was closed before payload could be added to the send buffer', + [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED]: 'WebSocket connection closed', + [SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT]: 'WebSocket failed to connect', [SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID]: 'Failed to obtain a subscription id from the server', - [SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED]: - 'WebSocket was closed before payload could be added to the send buffer', - [SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED]: 'WebSocket connection closed', - [SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT]: 'WebSocket failed to connect', [SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD]: 'Could not find an API plan for RPC method: `$method`', [SOLANA_ERROR__RPC__INTEGER_OVERFLOW]: 'The $argumentLabel argument to the `$methodName` RPC method$optionalPathLabel was ' + diff --git a/packages/rpc-subscriptions-spec/README.md b/packages/rpc-subscriptions-spec/README.md index 78fb082b198..2250bbd82d7 100644 --- a/packages/rpc-subscriptions-spec/README.md +++ b/packages/rpc-subscriptions-spec/README.md @@ -14,4 +14,44 @@ # @solana/rpc-subscriptions-spec -TODO +This package contains types that describe the implementation of the JSON RPC Subscriptions API, as well as methods to create one. It can be used standalone, but it is also exported as part of the Solana JavaScript SDK [`@solana/web3.js@rc`](https://github.com/solana-labs/solana-web3.js/tree/master/packages/library). + +This API is designed to be used as follows: + +```ts +const rpcSubscriptions = + // Step 1 - Create an `RpcSubscriptions` instance. This may be stateful. + createSolanaRpcSubscriptions(mainnet('wss://api.mainnet-beta.solana.com')); +const response = await rpcSubscriptions + // Step 2 - Call supported methods on it to produce `PendingRpcSubscriptionsRequest` objects. + .slotNotifications({ commitment: 'confirmed' }) + // Step 3 - Call the `subscribe()` method on those pending requests to trigger them. + .subscribe({ abortSignal: AbortSignal.timeout(10_000) }); +// Step 4 - Iterate over the result. +try { + for await (const slotNotification of slotNotifications) { + console.log('Got a slot notification', slotNotification); + } +} catch (e) { + console.error('The subscription closed unexpectedly', e); +} finally { + console.log('We have stopped listening for notifications'); +} +``` + +## Types + +### `RpcSubscriptionsChannel` + +A channel is a `DataPublisher` that you can subscribe to events of type `RpcSubscriptionChannelEvents`. Additionally, you can use it to send messages of type `TOutboundMessage` back to the remote end by calling the `send(message)` method. + +### `RpcSubscriptionsChannelCreator` + +A channel creator is a function that accepts an `AbortSignal`, returns a new `RpcSubscriptionsChannel`, and tears down the channel when the abort signal fires. + +### `RpcSubscriptionChannelEvents` + +Subscription channels publish events on two channel names: + +- `error`: Fires when the channel closes unexpectedly +- `message`: Fires on every message received from the remote end diff --git a/packages/rpc-subscriptions-spec/package.json b/packages/rpc-subscriptions-spec/package.json index 16dfcdcf87c..98eac9cf322 100644 --- a/packages/rpc-subscriptions-spec/package.json +++ b/packages/rpc-subscriptions-spec/package.json @@ -72,7 +72,8 @@ ], "dependencies": { "@solana/errors": "workspace:*", - "@solana/rpc-spec-types": "workspace:*" + "@solana/rpc-spec-types": "workspace:*", + "@solana/subscribable": "workspace:*" }, "peerDependencies": { "typescript": ">=5" diff --git a/packages/rpc-subscriptions-spec/src/index.ts b/packages/rpc-subscriptions-spec/src/index.ts index de4ed76c1f6..33520b89898 100644 --- a/packages/rpc-subscriptions-spec/src/index.ts +++ b/packages/rpc-subscriptions-spec/src/index.ts @@ -1,4 +1,5 @@ export * from './rpc-subscriptions-request'; export * from './rpc-subscriptions'; export * from './rpc-subscriptions-api'; +export * from './rpc-subscriptions-channel'; export * from './rpc-subscriptions-transport'; diff --git a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-channel.ts b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-channel.ts new file mode 100644 index 00000000000..3a168cd9c05 --- /dev/null +++ b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-channel.ts @@ -0,0 +1,28 @@ +import { + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, + SolanaError, +} from '@solana/errors'; +import { DataPublisher } from '@solana/subscribable'; + +type RpcSubscriptionsChannelSolanaErrorCode = + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED + | typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT; + +export type RpcSubscriptionChannelEvents = { + error: SolanaError; + message: TInboundMessage; +}; + +export interface RpcSubscriptionsChannel + extends DataPublisher> { + send(message: TOutboundMessage): Promise; +} + +export type RpcSubscriptionsChannelCreator = ( + config: Readonly<{ + abortSignal: AbortSignal; + }>, +) => Promise>; diff --git a/packages/rpc-subscriptions-transport-websocket/src/__tests__/websocket-connection-test.ts b/packages/rpc-subscriptions-transport-websocket/src/__tests__/websocket-connection-test.ts index 05fcb9f5af7..00b08487da4 100644 --- a/packages/rpc-subscriptions-transport-websocket/src/__tests__/websocket-connection-test.ts +++ b/packages/rpc-subscriptions-transport-websocket/src/__tests__/websocket-connection-test.ts @@ -1,6 +1,6 @@ import { - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, SolanaError, } from '@solana/errors'; import WS from 'jest-websocket-mock'; @@ -44,7 +44,7 @@ describe('createWebSocketConnection', () => { url: 'ws://fake', // Wrong URL! }); await expect(connectionPromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, { + new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, { errorEvent: {} as Event, }), ); @@ -61,7 +61,7 @@ describe('createWebSocketConnection', () => { expect(client).toHaveProperty('readyState', WebSocket.CONNECTING); abortController.abort(); await expect(connectionPromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, { + new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, { errorEvent: {} as Event, }), ); @@ -231,7 +231,7 @@ describe('RpcWebSocketConnection', () => { const sendPromise = connection.send({ some: 'message' }); abortController.abort(); await expect(sendPromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED), + new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED), ); }); it('fatals when the connection encounters an error while a message is queued', async () => { @@ -243,7 +243,7 @@ describe('RpcWebSocketConnection', () => { wasClean: false, }); await expect(sendPromise).rejects.toThrow( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED), + new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED), ); }); }); diff --git a/packages/rpc-subscriptions-transport-websocket/src/websocket-connection.ts b/packages/rpc-subscriptions-transport-websocket/src/websocket-connection.ts index 8a205a48feb..fd8ef4b9fe0 100644 --- a/packages/rpc-subscriptions-transport-websocket/src/websocket-connection.ts +++ b/packages/rpc-subscriptions-transport-websocket/src/websocket-connection.ts @@ -1,9 +1,9 @@ import { SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE, SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED, - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, SolanaError, } from '@solana/errors'; import WebSocket from '@solana/ws-impl'; @@ -78,7 +78,7 @@ export async function createWebSocketConnection({ function handleError(ev: Event) { if (!hasConnected) { reject( - new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, { + new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, { errorEvent: ev, }), ); @@ -113,7 +113,7 @@ export async function createWebSocketConnection({ clearInterval(intervalId); reject( new SolanaError( - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED, ), ); }; @@ -166,7 +166,7 @@ export async function createWebSocketConnection({ return; } else { throw new SolanaError( - SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED, + SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED, { cause: e, }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 30aa554f44a..c322764e45c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -918,6 +918,9 @@ importers: '@solana/rpc-spec-types': specifier: workspace:* version: link:../rpc-spec-types + '@solana/subscribable': + specifier: workspace:* + version: link:../subscribable typescript: specifier: '>=5' version: 5.5.2