Skip to content

Commit

Permalink
Convert the web socket transport to a channel
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher committed Oct 4, 2024
1 parent d341abb commit 859d964
Show file tree
Hide file tree
Showing 29 changed files with 782 additions and 732 deletions.
33 changes: 27 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,13 +624,23 @@ try {

### Using Custom RPC Subscriptions Transports

The `createSolanaRpcSubscriptions` function communicates with the RPC server using a default WebSocket transport that should satisfy most use cases. However, you may here as well provide your own transport or decorate existing ones to communicate with RPC servers in any way you see fit. In the example below, we explicitly create a WebSocket transport and use it to create a new RPC Subscriptions client via the `createSolanaRpcSubscriptionsFromTransport` function.
The `createSolanaRpcSubscriptions` function communicates with the RPC server using a default `WebSocket` channel that should satisfy most use cases. However, you may here as well provide your own channel creator or decorate existing ones to communicate with RPC servers in any way you see fit. In the example below, we supply a custom `WebSocket` channel creator and use it to create a new RPC Subscriptions client via the `createSolanaRpcSubscriptionsFromTransport` function.

```ts
import { createDefaultRpcSubscriptionsTransport, createSolanaRpcSubscriptionsFromTransport } from '@solana/web3.js';

// Create a WebSocket transport or any custom transport of your choice.
const transport = createDefaultRpcSubscriptionsTransport({ url: 'ws://127.0.0.1:8900' });
// Create a transport with a custom channel creator of your choice.
const transport = createDefaultRpcSubscriptionsTransport({
createChannel({ abortSignal }) {
return createWebSocketChannel({
maxSubscriptionsPerChannel: 100,
minChannels: 25,
sendBufferHighWatermark: 32_768,
signal: abortSignal,
url: 'ws://127.0.0.1:8900',
});
},
});

// Create an RPC client using that transport.
const rpcSubscriptions = createSolanaRpcSubscriptionsFromTransport(transport);
Expand Down Expand Up @@ -661,16 +671,22 @@ If your app needs access to [unstable RPC Subscriptions](https://docs.solana.com

```ts
import {
createDefaultRpcSubscriptionsChannelCreator,
createDefaultRpcSubscriptionsTransport,
createSolanaRpcSubscriptions_UNSTABLE,
createSolanaRpcSubscriptionsFromTransport_UNSTABLE,
} from '@solana/web3.js';

// Using the default WebSocket transport.
// Using the default WebSocket channel.
const rpcSubscriptions = createSolanaRpcSubscriptions_UNSTABLE('ws://127.0.0.1:8900');
// ^? RpcSubscriptions<SolanaRpcSubscriptionsApi & SolanaRpcSubscriptionsApiUnstable>

// Using a custom transport.
const transport = createDefaultRpcSubscriptionsTransport({ url: 'ws://127.0.0.1:8900' });
const transport = createDefaultRpcSubscriptionsTransport({
createChannel: createDefaultRpcSubscriptionsChannelCreator({
url: 'ws://127.0.0.1:8900',
}),
});
const rpcSubscriptions = createSolanaRpcSubscriptionsFromTransport_UNSTABLE(transport);
// ^? RpcSubscriptions<SolanaRpcSubscriptionsApi & SolanaRpcSubscriptionsApiUnstable>
```
Expand All @@ -696,6 +712,7 @@ Alternatively, you may explicitly create the RPC Subscriptions API using the `cr

```ts
import {
createDefaultRpcSubscriptionsChannelCreator,
createDefaultRpcSubscriptionsTransport,
createSubscriptionRpc,
createSolanaRpcSubscriptionsApi,
Expand All @@ -705,7 +722,11 @@ import {
} from '@solana/web3.js';

const api = createSolanaRpcSubscriptionsApi<AccountNotificationsApi & SlotNotificationsApi>(DEFAULT_RPC_CONFIG);
const transport = createDefaultRpcSubscriptionsTransport({ url: 'ws://127.0.0.1:8900' });
const transport = createDefaultRpcSubscriptionsTransport({
createChannel: createDefaultRpcSubscriptionsChannelCreator({
url: 'ws://127.0.0.1:8900',
}),
});
const rpcSubscriptions = createSubscriptionRpc({ api, transport });
```

Expand Down
4 changes: 2 additions & 2 deletions packages/errors/src/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ export const SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD = 8100003 as con

// RPC-Subscriptions-related errors.
// Reserve error codes in the range [8190000-8190999].
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST = 8190000 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN = 8190000 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID = 8190001 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED = 8190002 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED = 8190003 as const;
Expand Down Expand Up @@ -459,7 +459,7 @@ export type SolanaErrorCode =
| typeof SOLANA_ERROR__RPC__INTEGER_OVERFLOW
| typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR
| typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT
Expand Down
4 changes: 2 additions & 2 deletions packages/errors/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ import {
SOLANA_ERROR__RPC__INTEGER_OVERFLOW,
SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR,
SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS,
SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER,
Expand Down Expand Up @@ -487,7 +487,7 @@ export type SolanaErrorContext = DefaultUnspecifiedErrorContextToUndefined<
[SOLANA_ERROR__NONCE_ACCOUNT_NOT_FOUND]: {
nonceAccountAddress: string;
};
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST]: {
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN]: {
notificationName: string;
};
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT]: {
Expand Down
9 changes: 4 additions & 5 deletions packages/errors/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ import {
SOLANA_ERROR__RPC__INTEGER_OVERFLOW,
SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR,
SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
Expand Down Expand Up @@ -447,10 +447,9 @@ export const SolanaErrorMessages: Readonly<{
[SOLANA_ERROR__MALFORMED_BIGINT_STRING]: '`$value` cannot be parsed as a `BigInt`',
[SOLANA_ERROR__MALFORMED_NUMBER_STRING]: '`$value` cannot be parsed as a `Number`',
[SOLANA_ERROR__NONCE_ACCOUNT_NOT_FOUND]: 'No nonce account could be found at address `$nonceAccountAddress`',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST]:
"Either the notification name must end in 'Notifications' or the API must supply a " +
"subscription creator function for the notification '$notificationName' to map between " +
'the notification name and the subscribe/unsubscribe method names.',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN]:
"The notification name must end in 'Notifications' and the API must supply a " +
"subscription plan creator function for the notification '$notificationName'.",
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED]:
'WebSocket was closed before payload could be added to the send buffer',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED]: 'WebSocket connection closed',
Expand Down
53 changes: 52 additions & 1 deletion packages/rpc-subscriptions-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,55 @@

# @solana/rpc-subscriptions-api

TODO
This package contains types that describe the [methods](https://solana.com/docs/rpc/websocket) of the Solana JSON RPC Subscriptions API, and utilities for creating a `RpcSubscriptionsApi` implementation with sensible defaults. It can be used standalone, but it is also exported as part of the Solana JavaScript SDK [`@solana/web3.js@rc`](https://github.com/solana-labs/solana-web3.js/tree/master/packages/library).

Each RPC subscriptions method is described in terms of a TypeScript type of the following form:

```ts
type ExampleApi = {
thingNotifications(address: Address): Thing;
};
```

A `RpcSubscriptionsApi` that implements `ExampleApi` will ultimately expose its defined methods on any `RpcSubscriptions` that uses it.

```ts
const rpcSubscriptions: RpcSubscriptions<ExampleApi> = createExampleRpcSubscriptions(/* ... */);
const thingNotifications = await rpc
.thingNotifications(address('95DpK3y3GF7U8s1k4EvZ7xqyeCkhsHeZaE97iZpHUGMN'))
.subscribe({ abortSignal: AbortSignal.timeout(5_000) });
try {
for await (const thing of thingNotifications) {
console.log('Got a thing', thing);
}
} catch (e) {
console.error('Our subscription to `Thing` notifications has failed', e);
} finally {
console.log('We are done listening for `Thing` notifications');
}
```

## Types

### `SolanaRpcSubscriptionsApi{Devnet|Testnet|Mainnet}`

These types represent the RPC subscription methods available on a specific Solana cluster.

## Functions

### `createSolanaRpcSubscriptionsApi(config)`

Creates a `RpcSubscriptionsApi` implementation of the Solana JSON RPC Subscriptions API with some default behaviours.

The default behaviours include:

- A transform that converts `bigint` inputs to `number` for compatiblity with version 1.0 of the Solana JSON RPC.
- A transform that calls the config's `onIntegerOverflow` handler whenever a `bigint` input would overflow a JavaScript IEEE 754 number. See [this](https://github.com/solana-labs/solana-web3.js/issues/1116) GitHub issue for more information.
- A transform that applies a default commitment wherever not specified

#### Arguments

A config object with the following properties:

- `defaultCommitment`: An optional default `Commitment` value. Given an RPC method that takes `commitment` as a parameter, this value will be used when the caller does not supply one.
- `onIntegerOverflow(request, keyPath, value): void`: An optional function that will be called whenever a `bigint` input exceeds that which can be expressed using JavaScript numbers. This is used in the default `SolanaRpcSubscriptionsApi` to throw an exception rather than to allow truncated values to propagate through a program.
3 changes: 2 additions & 1 deletion packages/rpc-subscriptions-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
],
"dependencies": {
"@solana/addresses": "workspace:*",
"@solana/fast-stable-stringify": "workspace:*",
"@solana/keys": "workspace:*",
"@solana/rpc-types": "workspace:*",
"@solana/rpc-subscriptions-spec": "workspace:*",
Expand All @@ -80,7 +81,7 @@
"@solana/transactions": "workspace:*"
},
"devDependencies": {
"@solana/rpc-subscriptions-transport-websocket": "workspace:*"
"@solana/rpc-subscriptions-channel-websocket": "workspace:*"
},
"peerDependencies": {
"typescript": ">=5"
Expand Down
36 changes: 30 additions & 6 deletions packages/rpc-subscriptions-api/src/__tests__/__setup__.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createSubscriptionRpc, RpcSubscriptions } from '@solana/rpc-subscriptions-spec';
import { createWebSocketTransport } from '@solana/rpc-subscriptions-transport-websocket';
import { createWebSocketChannel } from '@solana/rpc-subscriptions-channel-websocket';
import { createSubscriptionRpc, RpcSubscriptions, RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec';

import {
createSolanaRpcSubscriptionsApi_UNSTABLE,
Expand All @@ -12,9 +12,33 @@ export function createLocalhostSolanaRpcSubscriptions(): RpcSubscriptions<
> {
return createSubscriptionRpc({
api: createSolanaRpcSubscriptionsApi_UNSTABLE(),
transport: createWebSocketTransport({
sendBufferHighWatermark: Number.POSITIVE_INFINITY,
url: 'ws://127.0.0.1:8900',
}),
async transport({ executeSubscriptionPlan, signal }) {
const webSocketChannel = await createWebSocketChannel({
sendBufferHighWatermark: Number.POSITIVE_INFINITY,
signal,
url: 'ws://127.0.0.1:8900',
});
const channel = {
...webSocketChannel,
on(type, listener, options) {
if (type !== 'message') {
return webSocketChannel.on(type, listener, options);
}
return webSocketChannel.on(
'message',
function deserializingListener(message: string) {
const deserializedMessage = JSON.parse(message);
listener(deserializedMessage);
},
options,
);
},
send(message) {
const serializedMessage = JSON.stringify(message);
return webSocketChannel.send(serializedMessage);
},
} as RpcSubscriptionsChannel<unknown, unknown>;
return await executeSubscriptionPlan({ channel, signal });
},
});
}
33 changes: 26 additions & 7 deletions packages/rpc-subscriptions-api/src/__tests__/index-test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
import type { RpcSubscriptionsApi } from '@solana/rpc-subscriptions-spec';
import { executeRpcPubSubSubscriptionPlan, type RpcSubscriptionsApi } from '@solana/rpc-subscriptions-spec';

import { createSolanaRpcSubscriptionsApi } from '..';
import { createSolanaRpcSubscriptionsApi } from '../index';

type TestRpcSubscriptionNotifications = {
thingNotifications(...args: unknown[]): unknown;
};

describe('RpcSubscriptionsApi', () => {
jest.mock('@solana/rpc-subscriptions-spec', () => ({
...jest.requireActual('@solana/rpc-subscriptions-spec'),
executeRpcPubSubSubscriptionPlan: jest.fn().mockReturnValue(
new Promise(() => {
/* never resolves */
}),
),
}));

describe('createSolanaRpcSubscriptionsApi', () => {
let api: RpcSubscriptionsApi<TestRpcSubscriptionNotifications>;
beforeEach(() => {
api = createSolanaRpcSubscriptionsApi();
});
it('synthesizes subscribe/unsubscribe method names from the name of the notification', () => {
expect(api.thingNotifications()).toMatchObject({
subscribeMethodName: 'thingSubscribe',
unsubscribeMethodName: 'thingUnsubscribe',
it('creates a subscription plan that synthesizes the correct subscribe/unsubscribe method names from the name of the notification', () => {
const { executeSubscriptionPlan } = api.thingNotifications();
executeSubscriptionPlan({
channel: {
on: jest.fn(),
send: jest.fn(),
},
signal: new AbortController().signal,
});
expect(executeRpcPubSubSubscriptionPlan).toHaveBeenCalledWith(
expect.objectContaining({
subscribeMethodName: 'thingSubscribe',
unsubscribeMethodName: 'thingUnsubscribe',
}),
);
});
});
32 changes: 21 additions & 11 deletions packages/rpc-subscriptions-api/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import fastStableStringify from '@solana/fast-stable-stringify';
import {
createRpcSubscriptionsApi,
executeRpcPubSubSubscriptionPlan,
RpcSubscriptionsApi,
RpcSubscriptionsApiMethods,
} from '@solana/rpc-subscriptions-spec';
Expand Down Expand Up @@ -49,19 +51,27 @@ type Config = RequestTransformerConfig;
function createSolanaRpcSubscriptionsApi_INTERNAL<TApi extends RpcSubscriptionsApiMethods>(
config?: Config,
): RpcSubscriptionsApi<TApi> {
const responseTransformer = getDefaultResponseTransformerForSolanaRpcSubscriptions({
allowedNumericKeyPaths: getAllowedNumericKeypaths(),
});
// TODO(loris): Replace with request transformer.
const parametersTransformer = <T extends unknown[]>(notificationName: string, params?: T) => {
return getDefaultRequestTransformerForSolanaRpc(config)({ methodName: notificationName, params })
.params as unknown[];
};
return createRpcSubscriptionsApi<TApi>({
// TODO(loris): Replace with request transformer.
parametersTransformer: <T extends unknown[]>(params: T, notificationName: string) => {
return getDefaultRequestTransformerForSolanaRpc(config)({ methodName: notificationName, params })
.params as unknown[];
getSubscriptionConfigurationHash({ notificationName, params }) {
return fastStableStringify([notificationName, params]);
},
planExecutor({ notificationName, params, ...rest }) {
return executeRpcPubSubSubscriptionPlan({
...rest,
responseTransformer,
subscribeMethodName: notificationName.replace(/Notifications$/, 'Subscribe'),
subscribeParams: parametersTransformer(notificationName, params),
unsubscribeMethodName: notificationName.replace(/Notifications$/, 'Unsubscribe'),
});
},
responseTransformer: getDefaultResponseTransformerForSolanaRpcSubscriptions({
allowedNumericKeyPaths: getAllowedNumericKeypaths(),
}),
subscribeNotificationNameTransformer: (notificationName: string) =>
notificationName.replace(/Notifications$/, 'Subscribe'),
unsubscribeNotificationNameTransformer: (notificationName: string) =>
notificationName.replace(/Notifications$/, 'Unsubscribe'),
});
}

Expand Down
Loading

0 comments on commit 859d964

Please sign in to comment.