From e4260af4e90bfc84ad94c573296943658c386d47 Mon Sep 17 00:00:00 2001 From: Steven Luscher Date: Wed, 2 Oct 2024 21:05:35 +0000 Subject: [PATCH] Unhook all of the subscription augmenters so that we can refactor and build them back up, one by one --- packages/rpc-subscriptions/package.json | 2 - ...-subscriptions-connection-sharding-test.ts | 95 ------------------- .../rpc-subscriptions-connection-sharding.ts | 37 -------- .../src/rpc-subscriptions-transport.ts | 37 ++------ .../src/rpc-subscriptions.ts | 18 +--- pnpm-lock.yaml | 6 -- 6 files changed, 12 insertions(+), 183 deletions(-) delete mode 100644 packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-connection-sharding-test.ts delete mode 100644 packages/rpc-subscriptions/src/rpc-subscriptions-connection-sharding.ts diff --git a/packages/rpc-subscriptions/package.json b/packages/rpc-subscriptions/package.json index 9c765b758a5..886dbcd5c0b 100644 --- a/packages/rpc-subscriptions/package.json +++ b/packages/rpc-subscriptions/package.json @@ -72,8 +72,6 @@ ], "dependencies": { "@solana/errors": "workspace:*", - "@solana/fast-stable-stringify": "workspace:*", - "@solana/functional": "workspace:*", "@solana/promises": "workspace:*", "@solana/rpc-subscriptions-api": "workspace:*", "@solana/rpc-subscriptions-spec": "workspace:*", diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-connection-sharding-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-connection-sharding-test.ts deleted file mode 100644 index fbea287a12a..00000000000 --- a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-connection-sharding-test.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; - -import { getWebSocketTransportWithConnectionSharding } from '../rpc-subscriptions-connection-sharding'; - -describe('getWebSocketTransportWithConnectionSharding', () => { - let getShard: jest.Mock; - let mockInnerTransport: jest.MockedFn; - let send: jest.Mock<(payload: unknown) => Promise>; - let transport: RpcSubscriptionsTransport; - beforeEach(() => { - jest.useFakeTimers(); - getShard = jest.fn(); - send = jest.fn().mockResolvedValue(undefined); - send = jest.fn().mockResolvedValue(undefined); - mockInnerTransport = jest.fn().mockResolvedValue({ - async *[Symbol.asyncIterator]() { - yield await new Promise(() => { - /* never resolve */ - }); - }, - send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED: send, - }); - transport = getWebSocketTransportWithConnectionSharding({ - getShard, - transport: mockInnerTransport, - }); - }); - describe('given payloads that shard to the same key', () => { - beforeEach(() => { - getShard.mockReturnValue('shard-key'); - }); - it('sends the initial message when constructing a new connection', async () => { - expect.assertions(1); - await Promise.all([ - transport({ payload: 'hello', signal: new AbortController().signal }), - transport({ payload: 'world', signal: new AbortController().signal }), - ]); - expect(mockInnerTransport).toHaveBeenCalledWith( - expect.objectContaining({ - payload: 'hello', - }), - ); - }); - it('sends subsequent messages over the cached connection in the same runloop', async () => { - expect.assertions(2); - await Promise.all([ - transport({ payload: 'hello', signal: new AbortController().signal }), - transport({ payload: 'world', signal: new AbortController().signal }), - ]); - expect(mockInnerTransport).toHaveBeenCalledTimes(1); - expect(send).toHaveBeenCalledWith('world'); - }); - it('sends subsequent messages over the cached connection in different runloops', async () => { - expect.assertions(2); - await transport({ payload: 'hello', signal: new AbortController().signal }); - await transport({ payload: 'world', signal: new AbortController().signal }); - expect(mockInnerTransport).toHaveBeenCalledTimes(1); - expect(send).toHaveBeenCalledWith('world'); - }); - it('reuses the same connection for all payloads in the same runloop', async () => { - expect.assertions(1); - await Promise.all([ - transport({ payload: 'hello', signal: new AbortController().signal }), - transport({ payload: 'world', signal: new AbortController().signal }), - ]); - expect(mockInnerTransport).toHaveBeenCalledTimes(1); - }); - it('reuses the same connection for all payloads in different runloops', async () => { - expect.assertions(1); - await transport({ payload: 'hello', signal: new AbortController().signal }); - await transport({ payload: 'world', signal: new AbortController().signal }); - expect(mockInnerTransport).toHaveBeenCalledTimes(1); - }); - }); - describe('given payloads that shard to different keys', () => { - beforeEach(() => { - let shardKey = 0; - getShard.mockImplementation(() => `${++shardKey}`); - }); - it('creates a connection for each payload in the same runloop', async () => { - expect.assertions(1); - await Promise.all([ - transport({ payload: 'hello', signal: new AbortController().signal }), - transport({ payload: 'world', signal: new AbortController().signal }), - ]); - expect(mockInnerTransport).toHaveBeenCalledTimes(2); - }); - it('creates a connection for each payload in different runloops', async () => { - expect.assertions(1); - await transport({ payload: 'hello', signal: new AbortController().signal }); - await transport({ payload: 'world', signal: new AbortController().signal }); - expect(mockInnerTransport).toHaveBeenCalledTimes(2); - }); - }); -}); diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-connection-sharding.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-connection-sharding.ts deleted file mode 100644 index e408c3f0aea..00000000000 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-connection-sharding.ts +++ /dev/null @@ -1,37 +0,0 @@ -import type { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; - -import { getCachedAbortableIterableFactory } from './cached-abortable-iterable'; - -type Config = Readonly<{ - /** - * You might like to open more subscriptions per connection than your RPC provider allows for. - * Using the initial payload as input, return a shard key from this method to assign - * subscriptions to separate connections. One socket will be opened per shard key. - */ - getShard?: (payload: unknown) => string | symbol; - transport: TTransport; -}>; - -let NULL_SHARD_CACHE_KEY: symbol; -function createNullShardCacheKey() { - // This function is an annoying workaround to prevent `process.env.NODE_ENV` from appearing at - // the top level of this module and thwarting an optimizing compiler's attempt to tree-shake. - return Symbol(__DEV__ ? 'Cache key to use when there is no connection sharding strategy' : undefined); -} - -export function getWebSocketTransportWithConnectionSharding({ - getShard, - transport, -}: Config): TTransport { - return getCachedAbortableIterableFactory({ - getAbortSignalFromInputArgs: ({ signal }) => signal, - getCacheKeyFromInputArgs: ({ payload }) => - getShard ? getShard(payload) : (NULL_SHARD_CACHE_KEY ||= createNullShardCacheKey()), - onCacheHit: (connection, { payload }) => connection.send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED(payload), - onCreateIterable: (abortSignal, config) => - transport({ - ...config, - signal: abortSignal, - }), - }) as TTransport; -} diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts index fb505f757d8..e22d03694ed 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-transport.ts @@ -1,18 +1,9 @@ -import { pipe } from '@solana/functional'; import { createWebSocketTransport } from '@solana/rpc-subscriptions-transport-websocket'; import type { ClusterUrl } from '@solana/rpc-types'; -import { getWebSocketTransportWithAutoping } from './rpc-subscriptions-autopinger'; import { RpcSubscriptionsTransportFromClusterUrl } from './rpc-subscriptions-clusters'; -import { getWebSocketTransportWithConnectionSharding } from './rpc-subscriptions-connection-sharding'; export type DefaultRpcSubscriptionsTransportConfig = Readonly<{ - /** - * You might like to open more subscriptions per connection than your RPC provider allows - * for. Using the initial payload as input, return a shard key from this method to assign - * subscriptions to separate connections. One socket will be opened per shard key. - */ - getShard?: (payload: unknown) => string; intervalMs?: number; sendBufferHighWatermark?: number; url: TClusterUrl; @@ -21,24 +12,12 @@ export type DefaultRpcSubscriptionsTransportConfig( config: DefaultRpcSubscriptionsTransportConfig, ): RpcSubscriptionsTransportFromClusterUrl { - const { getShard, intervalMs, ...rest } = config; - return pipe( - createWebSocketTransport({ - ...rest, - sendBufferHighWatermark: - config.sendBufferHighWatermark ?? - // Let 128KB of data into the WebSocket buffer before buffering it in the app. - 131_072, - }) as RpcSubscriptionsTransportFromClusterUrl, - transport => - getWebSocketTransportWithAutoping({ - intervalMs: intervalMs ?? 5_000, - transport, - }), - transport => - getWebSocketTransportWithConnectionSharding({ - getShard, - transport, - }), - ); + const { /* `intervalMs` will make a comeback; stay tuned */ ...rest } = config; + return createWebSocketTransport({ + ...rest, + sendBufferHighWatermark: + config.sendBufferHighWatermark ?? + // Let 128KB of data into the WebSocket buffer before buffering it in the app. + 131_072, + }) as RpcSubscriptionsTransportFromClusterUrl; } diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions.ts b/packages/rpc-subscriptions/src/rpc-subscriptions.ts index 0b4023a5f8e..2b75b0e37d8 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions.ts @@ -1,5 +1,3 @@ -import fastStableStringify from '@solana/fast-stable-stringify'; -import { pipe } from '@solana/functional'; import type { SolanaRpcSubscriptionsApi, SolanaRpcSubscriptionsApiUnstable } from '@solana/rpc-subscriptions-api'; import { createSolanaRpcSubscriptionsApi } from '@solana/rpc-subscriptions-api'; import { @@ -11,7 +9,6 @@ import { ClusterUrl } from '@solana/rpc-types'; import { DEFAULT_RPC_SUBSCRIPTIONS_CONFIG } from './rpc-default-config'; import type { RpcSubscriptionsFromTransport } from './rpc-subscriptions-clusters'; -import { getRpcSubscriptionsWithSubscriptionCoalescing } from './rpc-subscriptions-coalescer'; import { createDefaultRpcSubscriptionsTransport, DefaultRpcSubscriptionsTransportConfig, @@ -39,17 +36,10 @@ export function createSolanaRpcSubscriptionsFromTransport< TTransport extends RpcSubscriptionsTransport, TApi extends RpcSubscriptionsApiMethods = SolanaRpcSubscriptionsApi, >(transport: TTransport) { - return pipe( - createSubscriptionRpc({ - api: createSolanaRpcSubscriptionsApi(DEFAULT_RPC_SUBSCRIPTIONS_CONFIG), - transport, - }), - rpcSubscriptions => - getRpcSubscriptionsWithSubscriptionCoalescing({ - getDeduplicationKey: (...args) => fastStableStringify(args), - rpcSubscriptions, - }), - ) as RpcSubscriptionsFromTransport; + return createSubscriptionRpc({ + api: createSolanaRpcSubscriptionsApi(DEFAULT_RPC_SUBSCRIPTIONS_CONFIG), + transport, + }) as RpcSubscriptionsFromTransport; } export function createSolanaRpcSubscriptionsFromTransport_UNSTABLE( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f76c2c981d0..89221db3920 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -857,12 +857,6 @@ importers: '@solana/errors': specifier: workspace:* version: link:../errors - '@solana/fast-stable-stringify': - specifier: workspace:* - version: link:../fast-stable-stringify - '@solana/functional': - specifier: workspace:* - version: link:../functional '@solana/promises': specifier: workspace:* version: link:../promises