Skip to content

Commit

Permalink
A channel message demultiplexer
Browse files Browse the repository at this point in the history
  • Loading branch information
steveluscher committed Oct 4, 2024
1 parent 1b2a7ce commit d64199d
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { demultiplexDataPublisher } from '../rpc-subscriptions-pubsub-demultiplex';

describe('demultiplexDataPublisher', () => {
let mockDataPublisher: { on: jest.Mock };
function publishMessage(channelName: string, message: unknown) {
mockDataPublisher.on.mock.calls
.filter(([actualChannelName]) => actualChannelName === channelName)
.forEach(([_, listener]) => listener(message));
}
beforeEach(() => {
mockDataPublisher = {
on: jest.fn(),
};
});
it('does not listen to the publisher when there are no subscribers', () => {
demultiplexDataPublisher(mockDataPublisher, 'channelName', jest.fn() /* messageTransformer */);
expect(mockDataPublisher.on).not.toHaveBeenCalled();
});
it('starts to listen to the publisher when a subscriber appears', () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
demuxedDataPublisher.on('someChannelName', () => {});
expect(mockDataPublisher.on).toHaveBeenCalledTimes(1);
});
it('only listens to the publisher once despite multiple subscriptions', () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
demuxedDataPublisher.on('someChannelName', () => {});
demuxedDataPublisher.on('someOtherChannelName', () => {});
expect(mockDataPublisher.on).toHaveBeenCalledTimes(1);
});
it('unsubscribes from the publisher once the last subscriber unsubscribes', () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const unsubscribe = demuxedDataPublisher.on('someChannelName', () => {});
unsubscribe();
expect(mockUnsubscribe).toHaveBeenCalledTimes(1);
});
it('does not unsubscribe from the publisher if there are still subscribers after some having unsubscribed', () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const unsubscribe = demuxedDataPublisher.on('someChannelName', () => {});
demuxedDataPublisher.on('someChannelName', () => {});
unsubscribe();
expect(mockUnsubscribe).not.toHaveBeenCalled();
});
it("does not unsubscribe from the publisher when one subscriber's unsubscribe function is called as many times as there are subscriptions", () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const unsubscribeA = demuxedDataPublisher.on('someChannelName', () => {});
demuxedDataPublisher.on('someOtherChannelName', () => {});
// No matter how many times the unsubscribe function is called, it only decrements the
// subscriber count once, for its own subscription.
unsubscribeA();
unsubscribeA();
expect(mockUnsubscribe).not.toHaveBeenCalled();
});
it('unsubscribes from the publisher once the last subscriber aborts', () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const abortController = new AbortController();
demuxedDataPublisher.on('someChannelName', () => {}, { signal: abortController.signal });
abortController.abort();
expect(mockUnsubscribe).toHaveBeenCalledTimes(1);
});
it('does not unsubscribe from the publisher if there are still subscribers after some having aborted', () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const abortController = new AbortController();
demuxedDataPublisher.on('someChannelName', () => {}, { signal: abortController.signal });
demuxedDataPublisher.on('someChannelName', () => {});
abortController.abort();
expect(mockUnsubscribe).not.toHaveBeenCalled();
});
it("does not unsubscribe from the publisher when one subscriber's abort signal is fired as many times as there are subscriptions", () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const abortController = new AbortController();
demuxedDataPublisher.on('someChannelName', () => {}, { signal: abortController.signal });
demuxedDataPublisher.on('someOtherChannelName', () => {});
// No matter how many times the abort signal is fired, it only decrements the subscriber
// count once, for its own subscription.
abortController.abort();
abortController.abort();
expect(mockUnsubscribe).not.toHaveBeenCalled();
});
it("does not unsubscribe from the publisher when one subscriber's unsubscribe function is called and its abort signal fires for a total of as many cancellations as there are subscriptions", () => {
const demuxedDataPublisher = demultiplexDataPublisher(
mockDataPublisher,
'channelName',
jest.fn() /* messageTransformer */,
);
const mockUnsubscribe = jest.fn();
mockDataPublisher.on.mockReturnValue(mockUnsubscribe);
const abortControllerA = new AbortController();
const unsubscribeA = demuxedDataPublisher.on('someChannelName', () => {}, { signal: abortControllerA.signal });
demuxedDataPublisher.on('someOtherChannelName', () => {});
// No matter how many times the unsubscribe function is called, it only decrements the
// subscriber count once, for its own subscription.
unsubscribeA();
abortControllerA.abort();
expect(mockUnsubscribe).not.toHaveBeenCalled();
});
it('does not call the transform function when there are no subscribers yet', () => {
const mockMessageTransformer = jest.fn().mockReturnValue([]);
demultiplexDataPublisher(mockDataPublisher, 'channelName', mockMessageTransformer);
publishMessage('channelName', 'hi');
expect(mockMessageTransformer).not.toHaveBeenCalled();
});
it('calls the transform function for every event that matches the source channel name when there is at least one subscriber', () => {
const mockMessageTransformer = jest.fn().mockReturnValue([]);
const demuxedDataPublisher = demultiplexDataPublisher(mockDataPublisher, 'channelName', mockMessageTransformer);
demuxedDataPublisher.on('channelName', () => {});
publishMessage('channelName', 'hi');
expect(mockMessageTransformer).toHaveBeenCalledWith('hi');
});
it('does not call the transform function when the event does not match the source channel name', () => {
const mockMessageTransformer = jest.fn().mockReturnValue([]);
demultiplexDataPublisher(mockDataPublisher, 'channelName', mockMessageTransformer);
publishMessage('otherChannelName', 'o no');
expect(mockMessageTransformer).not.toHaveBeenCalled();
});
it('publishes a message on the demuxed channel with the name returned by the transformer', () => {
const demuxedDataPublisher = demultiplexDataPublisher(mockDataPublisher, 'channelName', () => [
'transformedChannelName',
'HI',
]);
const transformedChannelListener = jest.fn();
demuxedDataPublisher.on('transformedChannelName', transformedChannelListener);
publishMessage('channelName', 'hi');
expect(transformedChannelListener).toHaveBeenCalledWith('HI');
});
it('publishes no message on the demuxed channel if the transformer returns `undefined`', () => {
const demuxedDataPublisher = demultiplexDataPublisher(mockDataPublisher, 'channelName', () => {});
const transformedChannelListener = jest.fn();
demuxedDataPublisher.on('transformedChannelName', transformedChannelListener);
publishMessage('channelName', 'hi');
expect(transformedChannelListener).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { DataPublisher, getDataPublisherFromEventEmitter } from '@solana/subscribable';

export function demultiplexDataPublisher<
TDataPublisher extends DataPublisher,
const TChannelName extends Parameters<TDataPublisher['on']>[0],
>(
publisher: TDataPublisher,
sourceChannelName: TChannelName,
messageTransformer: (
// FIXME: Deriving the type of the message from `TDataPublisher` and `TChannelName` would
// help callers to constrain their transform functions.
message: unknown,
) => [destinationChannelName: string, message: unknown] | void,
): DataPublisher {
let innerPublisherState:
| {
readonly dispose: () => void;
numSubscribers: number;
}
| undefined;
const eventTarget = new EventTarget();
const demultiplexedDataPublisher = getDataPublisherFromEventEmitter(eventTarget);
return {
...demultiplexedDataPublisher,
on(channelName, subscriber, options) {
if (!innerPublisherState) {
const innerPublisherUnsubscribe = publisher.on(sourceChannelName, sourceMessage => {
const transformResult = messageTransformer(sourceMessage);
if (!transformResult) {
return;
}
const [destinationChannelName, message] = transformResult;
eventTarget.dispatchEvent(
new CustomEvent(destinationChannelName, {
detail: message,
}),
);
});
innerPublisherState = {
dispose: innerPublisherUnsubscribe,
numSubscribers: 0,
};
}
innerPublisherState.numSubscribers++;
const unsubscribe = demultiplexedDataPublisher.on(channelName, subscriber, options);
let isActive = true;
function handleUnsubscribe() {
if (!isActive) {
return;
}
isActive = false;
options?.signal.removeEventListener('abort', handleUnsubscribe);
innerPublisherState!.numSubscribers--;
if (innerPublisherState!.numSubscribers === 0) {
innerPublisherState!.dispose();
innerPublisherState = undefined;
}
unsubscribe();
}
options?.signal.addEventListener('abort', handleUnsubscribe);
return handleUnsubscribe;
},
};
}
25 changes: 25 additions & 0 deletions packages/rpc-subscriptions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,31 @@ This package contains types that implement RPC subscriptions as required by the

## Functions

### `demultiplexDataPublisher(publisher, sourceChannelName, messageTransformer)`

Given a channel that carries messages for multiple subscribers on a single channel name, this function returns a new `DataPublisher` that splits them into multiple channel names.

Imagine a channel that carries multiple notifications whose destination is contained within the message itself.

```ts
const demuxedDataPublisher = demultiplexDataPublisher(channel, 'message', message => {
const destinationChannelName = `notification-for:${message.subscriberId}`;
return [destinationChannelName, message];
});
```

Now you can subscribe to _only_ the messages you are interested in, without having to subscribe to the entire `'message'` channel and filter out the messages that are not for you.

```ts
demuxedDataPublisher.on(
'notification-for:123',
message => {
console.log('Got a message for subscriber 123', message);
},
{ signal: AbortSignal.timeout(5_000) },
);
```

### `getChannelPoolingChannelCreator({ createChannel, maxSubscriptionsPerChannel, minChannels })`

Given a channel creator, will return a new channel creator with the following behavior.
Expand Down

0 comments on commit d64199d

Please sign in to comment.