diff --git a/packages/rpc-subscriptions/README.md b/packages/rpc-subscriptions/README.md index f6b682dbdf7..924b7465a94 100644 --- a/packages/rpc-subscriptions/README.md +++ b/packages/rpc-subscriptions/README.md @@ -56,3 +56,7 @@ Given a channel creator, will return a new channel creator with the following be ### `getRpcSubscriptionsChannelWithJSONSerialization(channel)` Given an `RpcSubscriptionsChannel`, will return a new channel that parses data published to the `'message'` channel as JSON, and JSON-stringifies messages sent via the `send(message)` method. + +### `getRpcSubscriptionsChannelWithAutoping(channel)` + +Given an `RpcSubscriptionsChannel`, will return a new channel that sends a ping message to the inner channel if a message has not been sent or received in the last `intervalMs`. In web browsers, this implementation sends no ping when the network is down, and sends a ping immediately upon the network coming back up. diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-autopinger-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-autopinger-test.ts index bd3488cb8e8..bfe92bb66d3 100644 --- a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-autopinger-test.ts +++ b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-autopinger-test.ts @@ -1,155 +1,148 @@ -import { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; +import { RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec'; -import { getWebSocketTransportWithAutoping } from '../rpc-subscriptions-autopinger'; - -jest.mock('@solana/rpc-subscriptions-transport-websocket'); +import { getRpcSubscriptionsChannelWithAutoping } from '../rpc-subscriptions-autopinger'; const MOCK_INTERVAL_MS = 60_000; -describe('getWebSocketTransportWithAutoping', () => { - let killConnection: () => void; - let mockInnerTransport: jest.Mock; - let receiveMessage: (value: unknown) => void; - let returnFromConnection: () => void; - let send: jest.Mock; - let transport: RpcSubscriptionsTransport; +describe('getRpcSubscriptionsChannelWithAutoping', () => { + let mockChannel: RpcSubscriptionsChannel; + let mockOn: jest.Mock; + let mockSend: jest.Mock; + function receiveError(error?: unknown) { + mockOn.mock.calls.filter(([type]) => type === 'error').forEach(([_, listener]) => listener(error)); + } + function receiveMessage(message: unknown) { + mockOn.mock.calls.filter(([type]) => type === 'message').forEach(([_, listener]) => listener(message)); + } beforeEach(() => { jest.useFakeTimers(); - send = jest.fn(); - let resultPromise; - mockInnerTransport = jest.fn(() => ({ - // eslint-disable-next-line @typescript-eslint/require-await - async *[Symbol.asyncIterator]() { - try { - while (true) { - yield (resultPromise ||= new Promise((resolve, reject) => { - killConnection = () => { - reject('error'); - }; - receiveMessage = resolve; - returnFromConnection = () => { - reject(); - }; - })); - resultPromise = null; - } - } catch (e) { - if (e === 'error') { - throw e; - } - return; - } - }, - send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED: send, - })); - transport = getWebSocketTransportWithAutoping({ + mockOn = jest.fn().mockReturnValue(() => {}); + mockSend = jest.fn(); + mockChannel = { + on: mockOn, + send: mockSend, + }; + }); + it('sends a ping message to the channel at the specified interval', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, intervalMs: MOCK_INTERVAL_MS, - transport: mockInnerTransport, }); - }); - it('sends a ping message to the active connection at the specified interval', async () => { - expect.assertions(4); - await transport({ payload: 'hi', signal: new AbortController().signal }); // First ping. jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); jest.advanceTimersByTime(1); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); // Second ping. - send.mockClear(); + mockSend.mockClear(); jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); jest.advanceTimersByTime(1); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); }); - it('does not send a ping until interval milliseconds after the last sent message', async () => { - expect.assertions(3); - const connection = await transport({ payload: 'hi', signal: new AbortController().signal }); + it('does not send a ping until interval milliseconds after the last sent message', () => { + const autopingChannel = getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); + autopingChannel.send('hi'); + mockSend.mockReset(); jest.advanceTimersByTime(500); - expect(send).not.toHaveBeenCalled(); - connection.send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED('hi'); - send.mockClear(); + expect(mockSend).not.toHaveBeenCalled(); + autopingChannel.send('hi'); + mockSend.mockClear(); jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); jest.advanceTimersByTime(1); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); }); - it('does not send a ping until interval milliseconds after the last received message', async () => { - expect.assertions(3); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('does not send a ping until interval milliseconds after the last received message', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); jest.advanceTimersByTime(500); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); receiveMessage('hi'); - await Promise.resolve(); // Flush Promise queue. - await Promise.resolve(); // Flush Promise queue. jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); jest.advanceTimersByTime(1); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); }); - it('does not send a ping after the connection throws', async () => { - expect.assertions(2); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('does not send a ping after a channel error', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); // First ping. jest.advanceTimersByTime(MOCK_INTERVAL_MS); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); - killConnection(); - await jest.runAllTimersAsync(); + receiveError('o no'); // No more pings. - send.mockClear(); + mockSend.mockClear(); jest.advanceTimersByTime(MOCK_INTERVAL_MS); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); }); - it('does not send a ping after the connection returns', async () => { - expect.assertions(2); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('does not send a ping after the abort signal fires', () => { + const abortController = new AbortController(); + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: abortController.signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); // First ping. jest.advanceTimersByTime(MOCK_INTERVAL_MS); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); - returnFromConnection(); - await jest.runAllTimersAsync(); + abortController.abort(); // No more pings. - send.mockClear(); + mockSend.mockClear(); jest.advanceTimersByTime(MOCK_INTERVAL_MS); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); }); if (__BROWSER__) { - it('stops pinging the connection when it goes offline', async () => { - expect.assertions(1); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('stops pinging the connection when it goes offline', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); globalThis.window.dispatchEvent(new Event('offline')); jest.advanceTimersByTime(MOCK_INTERVAL_MS); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); }); describe('when the network connection is offline to start', () => { beforeEach(() => { @@ -159,35 +152,44 @@ describe('getWebSocketTransportWithAutoping', () => { onLine: false, })); }); - it('does not ping the connection', async () => { - expect.assertions(1); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('does not ping the connection', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); jest.advanceTimersByTime(MOCK_INTERVAL_MS); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); }); - it('pings the connection immediately when the connection comes back online', async () => { - expect.assertions(1); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('pings the connection immediately when the connection comes back online', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); jest.advanceTimersByTime(500); globalThis.window.dispatchEvent(new Event('online')); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', }), ); }); - it('pings the connection interval milliseconds after the connection comes back online', async () => { - expect.assertions(3); - await transport({ payload: 'hi', signal: new AbortController().signal }); + it('pings the connection interval milliseconds after the connection comes back online', () => { + getRpcSubscriptionsChannelWithAutoping({ + abortSignal: new AbortController().signal, + channel: mockChannel, + intervalMs: MOCK_INTERVAL_MS, + }); jest.advanceTimersByTime(500); globalThis.window.dispatchEvent(new Event('online')); - send.mockClear(); - expect(send).not.toHaveBeenCalled(); + mockSend.mockClear(); + expect(mockSend).not.toHaveBeenCalled(); jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1); - expect(send).not.toHaveBeenCalled(); + expect(mockSend).not.toHaveBeenCalled(); jest.advanceTimersByTime(1); - expect(send).toHaveBeenCalledWith( + expect(mockSend).toHaveBeenCalledWith( expect.objectContaining({ jsonrpc: '2.0', method: 'ping', diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-autopinger.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-autopinger.ts index a213f7327f2..f9b6dcf74b0 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-autopinger.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-autopinger.ts @@ -1,8 +1,9 @@ -import type { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; +import type { RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec'; -type Config = Readonly<{ +type Config> = Readonly<{ + abortSignal: AbortSignal; + channel: TChannel; intervalMs: number; - transport: TTransport; }>; const PING_PAYLOAD = { @@ -10,70 +11,61 @@ const PING_PAYLOAD = { method: 'ping', } as const; -export function getWebSocketTransportWithAutoping({ +export function getRpcSubscriptionsChannelWithAutoping>({ + abortSignal: callerAbortSignal, + channel, intervalMs, - transport, -}: Config): TTransport { - const pingableConnections = new Map< - Awaited>, - Awaited> - >(); - return (async (...args) => { - const connection = await transport(...args); - let intervalId: ReturnType | undefined; - function sendPing() { - connection.send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED(PING_PAYLOAD); - } - function restartPingTimer() { - clearInterval(intervalId); - intervalId = setInterval(sendPing, intervalMs); - } - if (pingableConnections.has(connection) === false) { - pingableConnections.set(connection, { - [Symbol.asyncIterator]: connection[Symbol.asyncIterator].bind(connection), - send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED: ( - ...args: Parameters - ) => { - restartPingTimer(); - return connection.send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED(...args); - }, - }); - (async () => { - try { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const _ of connection) { - restartPingTimer(); - } - } catch { - /* empty */ - } finally { - pingableConnections.delete(connection); - clearInterval(intervalId); - if (handleOffline) { - globalThis.window.removeEventListener('offline', handleOffline); - } - if (handleOnline) { - globalThis.window.removeEventListener('online', handleOnline); - } - } - })(); - if (!__BROWSER__ || globalThis.navigator.onLine) { +}: Config): TChannel { + let intervalId: ReturnType | undefined; + function sendPing() { + channel.send(PING_PAYLOAD); + } + function restartPingTimer() { + clearInterval(intervalId); + intervalId = setInterval(sendPing, intervalMs); + } + const pingerAbortController = new AbortController(); + pingerAbortController.signal.addEventListener('abort', () => { + clearInterval(intervalId); + }); + callerAbortSignal.addEventListener('abort', () => { + pingerAbortController.abort(); + }); + channel.on( + 'error', + () => { + pingerAbortController.abort(); + }, + { signal: pingerAbortController.signal }, + ); + channel.on('message', restartPingTimer, { signal: pingerAbortController.signal }); + if (!__BROWSER__ || globalThis.navigator.onLine) { + restartPingTimer(); + } + if (__BROWSER__) { + globalThis.window.addEventListener( + 'offline', + function handleOffline() { + clearInterval(intervalId); + }, + { signal: pingerAbortController.signal }, + ); + globalThis.window.addEventListener( + 'online', + function handleOnline() { + sendPing(); + restartPingTimer(); + }, + { signal: pingerAbortController.signal }, + ); + } + return { + ...channel, + send(...args) { + if (!pingerAbortController.signal.aborted) { restartPingTimer(); } - let handleOffline; - let handleOnline; - if (__BROWSER__) { - handleOffline = () => { - clearInterval(intervalId); - }; - handleOnline = () => { - sendPing(); - restartPingTimer(); - }; - globalThis.window.addEventListener('offline', handleOffline); - globalThis.window.addEventListener('online', handleOnline); - } - } - return pingableConnections.get(connection)!; - }) as TTransport; + return channel.send(...args); + }, + }; }