Skip to content

Commit

Permalink
A channel augmenter that autopings the channel
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher committed Oct 4, 2024
1 parent 6d545f9 commit 35dc1bd
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 166 deletions.
4 changes: 4 additions & 0 deletions packages/rpc-subscriptions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ Given a channel creator, will return a new channel creator with the following be
### `getRpcSubscriptionsChannelWithJSONSerialization(channel)`

Given an `RpcSubscriptionsChannel`, will return a new channel that parses data published to the `'message'` channel as JSON, and JSON-stringifies messages sent via the `send(message)` method.

### `getRpcSubscriptionsChannelWithAutoping(channel)`

Given an `RpcSubscriptionsChannel`, will return a new channel that sends a ping message to the inner channel if a message has not been sent or received in the last `intervalMs`. In web browsers, this implementation sends no ping when the network is down, and sends a ping immediately upon the network coming back up.
Original file line number Diff line number Diff line change
@@ -1,155 +1,148 @@
import { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec';
import { RpcSubscriptionsChannel } from '@solana/rpc-subscriptions-spec';

import { getWebSocketTransportWithAutoping } from '../rpc-subscriptions-autopinger';

jest.mock('@solana/rpc-subscriptions-transport-websocket');
import { getRpcSubscriptionsChannelWithAutoping } from '../rpc-subscriptions-autopinger';

const MOCK_INTERVAL_MS = 60_000;

describe('getWebSocketTransportWithAutoping', () => {
let killConnection: () => void;
let mockInnerTransport: jest.Mock;
let receiveMessage: (value: unknown) => void;
let returnFromConnection: () => void;
let send: jest.Mock;
let transport: RpcSubscriptionsTransport;
describe('getRpcSubscriptionsChannelWithAutoping', () => {
let mockChannel: RpcSubscriptionsChannel<unknown, unknown>;
let mockOn: jest.Mock;
let mockSend: jest.Mock;
function receiveError(error?: unknown) {
mockOn.mock.calls.filter(([type]) => type === 'error').forEach(([_, listener]) => listener(error));
}
function receiveMessage(message: unknown) {
mockOn.mock.calls.filter(([type]) => type === 'message').forEach(([_, listener]) => listener(message));
}
beforeEach(() => {
jest.useFakeTimers();
send = jest.fn();
let resultPromise;
mockInnerTransport = jest.fn(() => ({
// eslint-disable-next-line @typescript-eslint/require-await
async *[Symbol.asyncIterator]() {
try {
while (true) {
yield (resultPromise ||= new Promise((resolve, reject) => {
killConnection = () => {
reject('error');
};
receiveMessage = resolve;
returnFromConnection = () => {
reject();
};
}));
resultPromise = null;
}
} catch (e) {
if (e === 'error') {
throw e;
}
return;
}
},
send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED: send,
}));
transport = getWebSocketTransportWithAutoping({
mockOn = jest.fn().mockReturnValue(() => {});
mockSend = jest.fn();
mockChannel = {
on: mockOn,
send: mockSend,
};
});
it('sends a ping message to the channel at the specified interval', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
transport: mockInnerTransport,
});
});
it('sends a ping message to the active connection at the specified interval', async () => {
expect.assertions(4);
await transport({ payload: 'hi', signal: new AbortController().signal });
// First ping.
jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
// Second ping.
send.mockClear();
mockSend.mockClear();
jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
});
it('does not send a ping until interval milliseconds after the last sent message', async () => {
expect.assertions(3);
const connection = await transport({ payload: 'hi', signal: new AbortController().signal });
it('does not send a ping until interval milliseconds after the last sent message', () => {
const autopingChannel = getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
autopingChannel.send('hi');
mockSend.mockReset();
jest.advanceTimersByTime(500);
expect(send).not.toHaveBeenCalled();
connection.send_DO_NOT_USE_OR_YOU_WILL_BE_FIRED('hi');
send.mockClear();
expect(mockSend).not.toHaveBeenCalled();
autopingChannel.send('hi');
mockSend.mockClear();
jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
});
it('does not send a ping until interval milliseconds after the last received message', async () => {
expect.assertions(3);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('does not send a ping until interval milliseconds after the last received message', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
jest.advanceTimersByTime(500);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
receiveMessage('hi');
await Promise.resolve(); // Flush Promise queue.
await Promise.resolve(); // Flush Promise queue.
jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
});
it('does not send a ping after the connection throws', async () => {
expect.assertions(2);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('does not send a ping after a channel error', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
// First ping.
jest.advanceTimersByTime(MOCK_INTERVAL_MS);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
killConnection();
await jest.runAllTimersAsync();
receiveError('o no');
// No more pings.
send.mockClear();
mockSend.mockClear();
jest.advanceTimersByTime(MOCK_INTERVAL_MS);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
});
it('does not send a ping after the connection returns', async () => {
expect.assertions(2);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('does not send a ping after the abort signal fires', () => {
const abortController = new AbortController();
getRpcSubscriptionsChannelWithAutoping({
abortSignal: abortController.signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
// First ping.
jest.advanceTimersByTime(MOCK_INTERVAL_MS);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
returnFromConnection();
await jest.runAllTimersAsync();
abortController.abort();
// No more pings.
send.mockClear();
mockSend.mockClear();
jest.advanceTimersByTime(MOCK_INTERVAL_MS);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
});
if (__BROWSER__) {
it('stops pinging the connection when it goes offline', async () => {
expect.assertions(1);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('stops pinging the connection when it goes offline', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
globalThis.window.dispatchEvent(new Event('offline'));
jest.advanceTimersByTime(MOCK_INTERVAL_MS);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
});
describe('when the network connection is offline to start', () => {
beforeEach(() => {
Expand All @@ -159,35 +152,44 @@ describe('getWebSocketTransportWithAutoping', () => {
onLine: false,
}));
});
it('does not ping the connection', async () => {
expect.assertions(1);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('does not ping the connection', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
jest.advanceTimersByTime(MOCK_INTERVAL_MS);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
});
it('pings the connection immediately when the connection comes back online', async () => {
expect.assertions(1);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('pings the connection immediately when the connection comes back online', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
jest.advanceTimersByTime(500);
globalThis.window.dispatchEvent(new Event('online'));
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
}),
);
});
it('pings the connection interval milliseconds after the connection comes back online', async () => {
expect.assertions(3);
await transport({ payload: 'hi', signal: new AbortController().signal });
it('pings the connection interval milliseconds after the connection comes back online', () => {
getRpcSubscriptionsChannelWithAutoping({
abortSignal: new AbortController().signal,
channel: mockChannel,
intervalMs: MOCK_INTERVAL_MS,
});
jest.advanceTimersByTime(500);
globalThis.window.dispatchEvent(new Event('online'));
send.mockClear();
expect(send).not.toHaveBeenCalled();
mockSend.mockClear();
expect(mockSend).not.toHaveBeenCalled();
jest.advanceTimersByTime(MOCK_INTERVAL_MS - 1);
expect(send).not.toHaveBeenCalled();
expect(mockSend).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(send).toHaveBeenCalledWith(
expect(mockSend).toHaveBeenCalledWith(
expect.objectContaining({
jsonrpc: '2.0',
method: 'ping',
Expand Down
Loading

0 comments on commit 35dc1bd

Please sign in to comment.