Skip to content

Commit

Permalink
Introducing channels
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher committed Oct 4, 2024
1 parent ad092cf commit 7e8d6ca
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 29 deletions.
12 changes: 6 additions & 6 deletions packages/errors/src/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ export const SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD = 8100003 as con
// Reserve error codes in the range [8190000-8190999].
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST = 8190000 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID = 8190001 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED = 8190002 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED = 8190003 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT = 8190004 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED = 8190002 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED = 8190003 as const;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT = 8190004 as const;

// Invariant violation errors.
// Reserve error codes in the range [9900000-9900999].
Expand Down Expand Up @@ -460,10 +460,10 @@ export type SolanaErrorCode =
| typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR
| typeof SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT
| typeof SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS
| typeof SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER
| typeof SOLANA_ERROR__SIGNER__EXPECTED_MESSAGE_MODIFYING_SIGNER
Expand Down
4 changes: 2 additions & 2 deletions packages/errors/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ import {
SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR,
SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS,
SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER,
SOLANA_ERROR__SIGNER__EXPECTED_MESSAGE_MODIFYING_SIGNER,
Expand Down Expand Up @@ -490,7 +490,7 @@ export type SolanaErrorContext = DefaultUnspecifiedErrorContextToUndefined<
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST]: {
notificationName: string;
};
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT]: {
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT]: {
errorEvent: Event;
};
[SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD]: {
Expand Down
14 changes: 7 additions & 7 deletions packages/errors/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ import {
SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR,
SOLANA_ERROR__RPC__TRANSPORT_HTTP_HEADER_FORBIDDEN,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_REQUEST,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT,
SOLANA_ERROR__SIGNER__ADDRESS_CANNOT_HAVE_MULTIPLE_SIGNERS,
SOLANA_ERROR__SIGNER__EXPECTED_KEY_PAIR_SIGNER,
SOLANA_ERROR__SIGNER__EXPECTED_MESSAGE_MODIFYING_SIGNER,
Expand Down Expand Up @@ -451,12 +451,12 @@ export const SolanaErrorMessages: Readonly<{
"Either the notification name must end in 'Notifications' or the API must supply a " +
"subscription creator function for the notification '$notificationName' to map between " +
'the notification name and the subscribe/unsubscribe method names.',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED]:
'WebSocket was closed before payload could be added to the send buffer',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED]: 'WebSocket connection closed',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT]: 'WebSocket failed to connect',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__EXPECTED_SERVER_SUBSCRIPTION_ID]:
'Failed to obtain a subscription id from the server',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED]:
'WebSocket was closed before payload could be added to the send buffer',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED]: 'WebSocket connection closed',
[SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT]: 'WebSocket failed to connect',
[SOLANA_ERROR__RPC__API_PLAN_MISSING_FOR_RPC_METHOD]: 'Could not find an API plan for RPC method: `$method`',
[SOLANA_ERROR__RPC__INTEGER_OVERFLOW]:
'The $argumentLabel argument to the `$methodName` RPC method$optionalPathLabel was ' +
Expand Down
42 changes: 41 additions & 1 deletion packages/rpc-subscriptions-spec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,44 @@

# @solana/rpc-subscriptions-spec

TODO
This package contains types that describe the implementation of the JSON RPC Subscriptions API, as well as methods to create one. It can be used standalone, but it is also exported as part of the Solana JavaScript SDK [`@solana/web3.js@rc`](https://github.com/solana-labs/solana-web3.js/tree/master/packages/library).

This API is designed to be used as follows:

```ts
const rpcSubscriptions =
// Step 1 - Create an `RpcSubscriptions` instance. This may be stateful.
createSolanaRpcSubscriptions(mainnet('wss://api.mainnet-beta.solana.com'));
const response = await rpcSubscriptions
// Step 2 - Call supported methods on it to produce `PendingRpcSubscriptionsRequest` objects.
.slotNotifications({ commitment: 'confirmed' })
// Step 3 - Call the `subscribe()` method on those pending requests to trigger them.
.subscribe({ abortSignal: AbortSignal.timeout(10_000) });
// Step 4 - Iterate over the result.
try {
for await (const slotNotification of slotNotifications) {
console.log('Got a slot notification', slotNotification);
}
} catch (e) {
console.error('The subscription closed unexpectedly', e);
} finally {
console.log('We have stopped listening for notifications');
}
```

## Types

### `RpcSubscriptionsChannel<TOutboundMessage, TInboundMessage>`

A channel is a `DataPublisher` that you can subscribe to events of type `RpcSubscriptionChannelEvents<TInboundMessage>`. Additionally, you can use it to send messages of type `TOutboundMessage` back to the remote end by calling the `send(message)` method.

### `RpcSubscriptionsChannelCreator<TOutboundMessage, TInboundMessage>`

A channel creator is a function that accepts an `AbortSignal`, returns a new `RpcSubscriptionsChannel`, and tears down the channel when the abort signal fires.

### `RpcSubscriptionChannelEvents<TInboundMessage>`

Subscription channels publish events on two channel names:

- `error`: Fires when the channel closes unexpectedly
- `message`: Fires on every message received from the remote end
3 changes: 2 additions & 1 deletion packages/rpc-subscriptions-spec/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
],
"dependencies": {
"@solana/errors": "workspace:*",
"@solana/rpc-spec-types": "workspace:*"
"@solana/rpc-spec-types": "workspace:*",
"@solana/subscribable": "workspace:*"
},
"peerDependencies": {
"typescript": ">=5"
Expand Down
1 change: 1 addition & 0 deletions packages/rpc-subscriptions-spec/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './rpc-subscriptions-request';
export * from './rpc-subscriptions';
export * from './rpc-subscriptions-api';
export * from './rpc-subscriptions-channel';
export * from './rpc-subscriptions-transport';
28 changes: 28 additions & 0 deletions packages/rpc-subscriptions-spec/src/rpc-subscriptions-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
SolanaError,
} from '@solana/errors';
import { DataPublisher } from '@solana/subscribable';

type RpcSubscriptionsChannelSolanaErrorCode =
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED
| typeof SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT;

export type RpcSubscriptionChannelEvents<TInboundMessage> = {
error: SolanaError<RpcSubscriptionsChannelSolanaErrorCode>;
message: TInboundMessage;
};

export interface RpcSubscriptionsChannel<TOutboundMessage, TInboundMessage>
extends DataPublisher<RpcSubscriptionChannelEvents<TInboundMessage>> {
send(message: TOutboundMessage): Promise<void>;
}

export type RpcSubscriptionsChannelCreator<TOutboundMessage, TInboundMessage> = (
config: Readonly<{
abortSignal: AbortSignal;
}>,
) => Promise<RpcSubscriptionsChannel<TOutboundMessage, TInboundMessage>>;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
SolanaError,
} from '@solana/errors';
import WS from 'jest-websocket-mock';
Expand Down Expand Up @@ -44,7 +44,7 @@ describe('createWebSocketConnection', () => {
url: 'ws://fake', // Wrong URL!
});
await expect(connectionPromise).rejects.toThrow(
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, {
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, {
errorEvent: {} as Event,
}),
);
Expand All @@ -61,7 +61,7 @@ describe('createWebSocketConnection', () => {
expect(client).toHaveProperty('readyState', WebSocket.CONNECTING);
abortController.abort();
await expect(connectionPromise).rejects.toThrow(
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, {
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, {
errorEvent: {} as Event,
}),
);
Expand Down Expand Up @@ -231,7 +231,7 @@ describe('RpcWebSocketConnection', () => {
const sendPromise = connection.send({ some: 'message' });
abortController.abort();
await expect(sendPromise).rejects.toThrow(
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED),
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED),
);
});
it('fatals when the connection encounters an error while a message is queued', async () => {
Expand All @@ -243,7 +243,7 @@ describe('RpcWebSocketConnection', () => {
wasClean: false,
});
await expect(sendPromise).rejects.toThrow(
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED),
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED),
);
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT,
SolanaError,
} from '@solana/errors';
import WebSocket from '@solana/ws-impl';
Expand Down Expand Up @@ -78,7 +78,7 @@ export async function createWebSocketConnection({
function handleError(ev: Event) {
if (!hasConnected) {
reject(
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT, {
new SolanaError(SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT, {
errorEvent: ev,
}),
);
Expand Down Expand Up @@ -113,7 +113,7 @@ export async function createWebSocketConnection({
clearInterval(intervalId);
reject(
new SolanaError(
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFFERED,
),
);
};
Expand Down Expand Up @@ -166,7 +166,7 @@ export async function createWebSocketConnection({
return;
} else {
throw new SolanaError(
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED,
{
cause: e,
},
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7e8d6ca

Please sign in to comment.