diff --git a/.changeset/neat-cows-shop.md b/.changeset/neat-cows-shop.md
new file mode 100644
index 00000000000..b781c5dd7c1
--- /dev/null
+++ b/.changeset/neat-cows-shop.md
@@ -0,0 +1,5 @@
+---
+'@solana/subscribable': patch
+---
+
+Creates a package for working with subscribable data sources like event targets. From an `EventTarget` or object which conforms to the `EventEmitter` interface you can now create a more ergonomic `DataPublisher` (object with an `on` method that vends an unsubscribe function) or an abortable `AsyncIterable`.
diff --git a/packages/subscribable/.gitignore b/packages/subscribable/.gitignore
new file mode 100644
index 00000000000..849ddff3b7e
--- /dev/null
+++ b/packages/subscribable/.gitignore
@@ -0,0 +1 @@
+dist/
diff --git a/packages/subscribable/.npmrc b/packages/subscribable/.npmrc
new file mode 100644
index 00000000000..b6f27f13595
--- /dev/null
+++ b/packages/subscribable/.npmrc
@@ -0,0 +1 @@
+engine-strict=true
diff --git a/packages/subscribable/.prettierignore b/packages/subscribable/.prettierignore
new file mode 100644
index 00000000000..849ddff3b7e
--- /dev/null
+++ b/packages/subscribable/.prettierignore
@@ -0,0 +1 @@
+dist/
diff --git a/packages/subscribable/CHANGELOG.md b/packages/subscribable/CHANGELOG.md
new file mode 100644
index 00000000000..daa5b32d53c
--- /dev/null
+++ b/packages/subscribable/CHANGELOG.md
@@ -0,0 +1 @@
+# @solana/subscribable
diff --git a/packages/subscribable/LICENSE b/packages/subscribable/LICENSE
new file mode 100644
index 00000000000..ec09953d3c2
--- /dev/null
+++ b/packages/subscribable/LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2023 Solana Labs, Inc
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/packages/subscribable/README.md b/packages/subscribable/README.md
new file mode 100644
index 00000000000..7012460688d
--- /dev/null
+++ b/packages/subscribable/README.md
@@ -0,0 +1,99 @@
+[![npm][npm-image]][npm-url]
+[![npm-downloads][npm-downloads-image]][npm-url]
+[![semantic-release][semantic-release-image]][semantic-release-url]
+
+[![code-style-prettier][code-style-prettier-image]][code-style-prettier-url]
+
+[code-style-prettier-image]: https://img.shields.io/badge/code_style-prettier-ff69b4.svg?style=flat-square
+[code-style-prettier-url]: https://github.com/prettier/prettier
+[npm-downloads-image]: https://img.shields.io/npm/dm/@solana/subscribable/rc.svg?style=flat
+[npm-image]: https://img.shields.io/npm/v/@solana/subscribable/rc.svg?style=flat
+[npm-url]: https://www.npmjs.com/package/@solana/subscribable/v/rc
+[semantic-release-image]: https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg
+[semantic-release-url]: https://github.com/semantic-release/semantic-release
+
+# @solana/subscribable
+
+This package contains utilities for creating subscription-based events targets. These differ from the `EventTarget` interface in that the method you use to add a listener returns an unsubscribe function.
+
+## Types
+
+### `DataPublisher`
+
+This type represents an object with an `on` function that you can call to subscribe to certain data over a named channel.
+
+```ts
+let dataPublisher: DataPublisher<{ error: SolanaError }>;
+dataPublisher.on('data', handleData); // ERROR. `data` is not a known channel name.
+dataPublisher.on('error', e => {
+ console.error(e);
+}); // OK.
+```
+
+### `TypedEventEmitter`
+
+This type allows you to type `addEventListener` and `removeEventListener` so that the call signature of the listener matches the event type given.
+
+```ts
+const emitter: TypedEventEmitter<{ message: MessageEvent }> = new WebSocket('wss://api.devnet.solana.com');
+emitter.addEventListener('data', handleData); // ERROR. `data` is not a known event type.
+emitter.addEventListener('message', message => {
+ console.log(message.origin); // OK. `message` is a `MessageEvent` so it has an `origin` property.
+});
+```
+
+### `TypedEventTarget`
+
+This type is a superset of `TypedEventEmitter` that allows you to constrain calls to `dispatchEvent`.
+
+```ts
+const target: TypedEventTarget<{ candyVended: CustomEvent<{ flavour: string }> }> = new EventTarget();
+target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavour: 'raspberry' } })); // OK.
+target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavor: 'raspberry' } })); // ERROR. Misspelling in detail.
+```
+
+## Functions
+
+### `createAsyncIterableFromDataPublisher({ abortSignal, dataChannelName, dataPublisher, errorChannelName })`
+
+Returns an `AsyncIterable` given a data publisher. The iterable will produce iterators that vend messages published to `dataChannelName` and will throw the first time a message is published to `errorChannelName`. Triggering the abort signal will cause all iterators spawned from this iterator to return once they have published all queued messages.
+
+```ts
+const iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: AbortSignal.timeout(10_000),
+ dataChannelName: 'message',
+ dataPublisher,
+ errorChannelName: 'error',
+});
+try {
+ for await (const message of iterable) {
+ console.log('Got message', message);
+ }
+} catch (e) {
+ console.error('An error was published to the error channel', e);
+} finally {
+ console.log("It's been 10 seconds; that's enough for now.");
+}
+```
+
+Things to note:
+
+- If a message is published over a channel before the `AsyncIterator` attached to it has polled for the next result, the message will be queued in memory.
+- Messages only begin to be queued after the first time an iterator begins to poll. Channel messages published before that time will be dropped.
+- If there are messages in the queue and an error occurs, all queued messages will be vended to the iterator before the error is thrown.
+- If there are messages in the queue and the abort signal fires, all queued messages will be vended to the iterator after which it will return.
+- Any new iterators created after the first error is encountered will reject with that error when polled.
+
+### `getDataPublisherFromEventEmitter(emitter)`
+
+Returns an object with an `on` function that you can call to subscribe to certain data over a named channel. The `on` function returns an unsubscribe function.
+
+```ts
+const socketDataPublisher = getDataPublisherFromEventEmitter(new WebSocket('wss://api.devnet.solana.com'));
+const unsubscribe = socketDataPublisher.on('message', message => {
+ if (JSON.parse(message.data).id === 42) {
+ console.log('Got response 42');
+ unsubscribe();
+ }
+});
+```
diff --git a/packages/subscribable/package.json b/packages/subscribable/package.json
new file mode 100644
index 00000000000..78f7a4e967a
--- /dev/null
+++ b/packages/subscribable/package.json
@@ -0,0 +1,90 @@
+{
+ "name": "@solana/subscribable",
+ "version": "2.0.0-rc.1",
+ "description": "Helpers for creating subscription-based event emitters",
+ "exports": {
+ "edge-light": {
+ "import": "./dist/index.node.mjs",
+ "require": "./dist/index.node.cjs"
+ },
+ "workerd": {
+ "import": "./dist/index.node.mjs",
+ "require": "./dist/index.node.cjs"
+ },
+ "browser": {
+ "import": "./dist/index.browser.mjs",
+ "require": "./dist/index.browser.cjs"
+ },
+ "node": {
+ "import": "./dist/index.node.mjs",
+ "require": "./dist/index.node.cjs"
+ },
+ "react-native": "./dist/index.native.mjs",
+ "types": "./dist/types/index.d.ts"
+ },
+ "browser": {
+ "./dist/index.node.cjs": "./dist/index.browser.cjs",
+ "./dist/index.node.mjs": "./dist/index.browser.mjs"
+ },
+ "main": "./dist/index.node.cjs",
+ "module": "./dist/index.node.mjs",
+ "react-native": "./dist/index.native.mjs",
+ "types": "./dist/types/index.d.ts",
+ "type": "commonjs",
+ "files": [
+ "./dist/"
+ ],
+ "sideEffects": false,
+ "keywords": [
+ "blockchain",
+ "solana",
+ "web3"
+ ],
+ "scripts": {
+ "compile:js": "tsup --config build-scripts/tsup.config.package.ts",
+ "compile:typedefs": "tsc -p ./tsconfig.declarations.json",
+ "dev": "jest -c ../../node_modules/@solana/test-config/jest-dev.config.ts --rootDir . --watch",
+ "prepublishOnly": "pnpm pkg delete devDependencies",
+ "publish-impl": "npm view $npm_package_name@$npm_package_version > /dev/null 2>&1 || pnpm publish --tag ${PUBLISH_TAG:-canary} --access public --no-git-checks",
+ "publish-packages": "pnpm prepublishOnly && pnpm publish-impl",
+ "style:fix": "pnpm eslint --fix src && pnpm prettier --log-level warn --ignore-unknown --write ./*",
+ "test:lint": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-lint.config.ts --rootDir . --silent",
+ "test:prettier": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-prettier.config.ts --rootDir . --silent",
+ "test:treeshakability:browser": "agadoo dist/index.browser.mjs",
+ "test:treeshakability:native": "agadoo dist/index.native.mjs",
+ "test:treeshakability:node": "agadoo dist/index.node.mjs",
+ "test:typecheck": "tsc --noEmit",
+ "test:unit:browser": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-unit.config.browser.ts --rootDir . --silent",
+ "test:unit:node": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-unit.config.node.ts --rootDir . --silent"
+ },
+ "author": "Solana Labs Maintainers ",
+ "license": "MIT",
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/solana-labs/solana-web3.js"
+ },
+ "bugs": {
+ "url": "http://github.com/solana-labs/solana-web3.js/issues"
+ },
+ "browserslist": [
+ "supports bigint and not dead",
+ "maintained node versions"
+ ],
+ "engine": {
+ "node": ">=17.4"
+ },
+ "dependencies": {
+ "@solana/errors": "workspace:*"
+ },
+ "peerDependencies": {
+ "typescript": ">=5"
+ },
+ "bundlewatch": {
+ "defaultCompression": "gzip",
+ "files": [
+ {
+ "path": "./dist/index*.js"
+ }
+ ]
+ }
+}
diff --git a/packages/subscribable/src/__tests__/async-iterable-test.ts b/packages/subscribable/src/__tests__/async-iterable-test.ts
new file mode 100644
index 00000000000..b03639d4522
--- /dev/null
+++ b/packages/subscribable/src/__tests__/async-iterable-test.ts
@@ -0,0 +1,220 @@
+import { createAsyncIterableFromDataPublisher } from '../async-iterable';
+import { DataPublisher } from '../data-publisher';
+
+describe('createAsyncIterableFromDataPublisher', () => {
+ let mockDataPublisher: DataPublisher;
+ let mockOn: jest.Mock;
+ function publish(type: string, payload: unknown) {
+ mockOn.mock.calls.filter(([actualType]) => actualType === type).forEach(([_, listener]) => listener(payload));
+ }
+ beforeEach(() => {
+ mockOn = jest.fn().mockReturnValue(function unsubscribe() {});
+ mockDataPublisher = {
+ on: mockOn,
+ };
+ });
+ it('returns from the iterator when the abort signal starts aborted', async () => {
+ expect.assertions(1);
+ const abortController = new AbortController();
+ abortController.abort();
+ const iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: abortController.signal,
+ dataChannelName: 'data',
+ dataPublisher: mockDataPublisher,
+ errorChannelName: 'error',
+ });
+ const iterator = iterable[Symbol.asyncIterator]();
+ const nextDataPromise = iterator.next();
+ await expect(nextDataPromise).resolves.toMatchObject({
+ done: true,
+ value: undefined,
+ });
+ });
+ it('returns from the iterator when the abort signal fires', async () => {
+ expect.assertions(1);
+ const abortController = new AbortController();
+ const iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: abortController.signal,
+ dataChannelName: 'data',
+ dataPublisher: mockDataPublisher,
+ errorChannelName: 'error',
+ });
+ const iterator = iterable[Symbol.asyncIterator]();
+ const nextDataPromise = iterator.next();
+ abortController.abort();
+ await expect(nextDataPromise).resolves.toMatchObject({
+ done: true,
+ value: undefined,
+ });
+ });
+ it('throws the first published error through newly created iterators', async () => {
+ expect.assertions(1);
+ const iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: new AbortController().signal,
+ dataChannelName: 'data',
+ dataPublisher: mockDataPublisher,
+ errorChannelName: 'error',
+ });
+ publish('error', new Error('o no'));
+ publish('error', new Error('also o no'));
+ const iterator = iterable[Symbol.asyncIterator]();
+ const nextDataPromise = iterator.next();
+ await expect(nextDataPromise).rejects.toThrow(new Error('o no'));
+ });
+ it('returns from the iterator on the next poll after an error', async () => {
+ expect.assertions(1);
+ const iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: new AbortController().signal,
+ dataChannelName: 'data',
+ dataPublisher: mockDataPublisher,
+ errorChannelName: 'error',
+ });
+ publish('error', new Error('o no'));
+ const iterator = iterable[Symbol.asyncIterator]();
+ iterator.next().catch(() => {});
+ const dataPromiseAfterError = iterator.next();
+ await expect(dataPromiseAfterError).resolves.toStrictEqual({
+ done: true,
+ value: undefined,
+ });
+ });
+ describe('given that no iterator has yet been polled', () => {
+ let abortController: AbortController;
+ let iterable: AsyncIterable;
+ let iterator: AsyncIterator;
+ beforeEach(() => {
+ abortController = new AbortController();
+ iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: abortController.signal,
+ dataChannelName: 'data',
+ dataPublisher: mockDataPublisher,
+ errorChannelName: 'error',
+ });
+ iterator = iterable[Symbol.asyncIterator]();
+ });
+ it('drops data published before polling begins', async () => {
+ expect.assertions(1);
+ publish('data', 'lost message');
+ const nextDataPromise = iterator.next();
+ publish('data', 'hi');
+ await expect(nextDataPromise).resolves.toStrictEqual({
+ done: false,
+ value: 'hi',
+ });
+ });
+ it('vends the first error received when the iterator is eventually polled', async () => {
+ expect.assertions(1);
+ publish('error', new Error('o no'));
+ publish('error', new Error('also o no'));
+ const nextDataPromise = iterator.next();
+ await expect(nextDataPromise).rejects.toThrow(new Error('o no'));
+ });
+ it('returns when the iterator is eventually polled, the iterator having already been aborted', async () => {
+ expect.assertions(1);
+ abortController.abort();
+ const nextDataPromise = iterator.next();
+ await expect(nextDataPromise).resolves.toStrictEqual({
+ done: true,
+ value: undefined,
+ });
+ });
+ });
+ describe('given that multiple consumers have begun to poll', () => {
+ let abortController: AbortController;
+ let iteratorA: AsyncIterator;
+ let iteratorB: AsyncIterator;
+ let nextDataPromiseA: Promise;
+ let nextDataPromiseB: Promise;
+ beforeEach(() => {
+ abortController = new AbortController();
+ const iterable = createAsyncIterableFromDataPublisher({
+ abortSignal: abortController.signal,
+ dataChannelName: 'data',
+ dataPublisher: mockDataPublisher,
+ errorChannelName: 'error',
+ });
+ iteratorA = iterable[Symbol.asyncIterator]();
+ iteratorB = iterable[Symbol.asyncIterator]();
+ nextDataPromiseA = iteratorA.next();
+ nextDataPromiseB = iteratorB.next();
+ });
+ it('throws from all iterators when an error is published', async () => {
+ expect.assertions(2);
+ publish('error', new Error('o no'));
+ await expect(nextDataPromiseA).rejects.toThrow(new Error('o no'));
+ await expect(nextDataPromiseB).rejects.toThrow(new Error('o no'));
+ });
+ it('vends a message to all iterators who have already polled for a result', async () => {
+ expect.assertions(2);
+ publish('data', 'hi');
+ await expect(nextDataPromiseA).resolves.toStrictEqual({
+ done: false,
+ value: 'hi',
+ });
+ await expect(nextDataPromiseB).resolves.toStrictEqual({
+ done: false,
+ value: 'hi',
+ });
+ });
+ it('queues messages for all iterators', async () => {
+ expect.assertions(4);
+ publish('data', 'consumed message');
+ publish('data', 'queued message 1');
+ publish('data', 'queued message 2');
+ await expect(iteratorA.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 1',
+ });
+ await expect(iteratorB.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 1',
+ });
+ await expect(iteratorA.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 2',
+ });
+ await expect(iteratorB.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 2',
+ });
+ });
+ it('flushes the queue before vending errors', async () => {
+ expect.assertions(4);
+ publish('data', 'consumed message');
+ publish('data', 'queued message 1');
+ publish('error', new Error('o no'));
+ await expect(iteratorA.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 1',
+ });
+ await expect(iteratorB.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 1',
+ });
+ await expect(iteratorA.next()).rejects.toThrow(new Error('o no'));
+ await expect(iteratorB.next()).rejects.toThrow(new Error('o no'));
+ });
+ it('flushes the queue before an abort finalizes it', async () => {
+ expect.assertions(4);
+ publish('data', 'consumed message');
+ publish('data', 'queued message 1');
+ abortController.abort();
+ await expect(iteratorA.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 1',
+ });
+ await expect(iteratorB.next()).resolves.toStrictEqual({
+ done: false,
+ value: 'queued message 1',
+ });
+ await expect(iteratorA.next()).resolves.toStrictEqual({
+ done: true,
+ value: undefined,
+ });
+ await expect(iteratorB.next()).resolves.toStrictEqual({
+ done: true,
+ value: undefined,
+ });
+ });
+ });
+});
diff --git a/packages/subscribable/src/__tests__/data-publisher-test.ts b/packages/subscribable/src/__tests__/data-publisher-test.ts
new file mode 100644
index 00000000000..0331d8a866c
--- /dev/null
+++ b/packages/subscribable/src/__tests__/data-publisher-test.ts
@@ -0,0 +1,76 @@
+import { DataPublisher, getDataPublisherFromEventEmitter } from '../data-publisher';
+
+describe('a data publisher', () => {
+ let dataPublisher: DataPublisher;
+ let eventTarget: EventTarget;
+ beforeEach(() => {
+ eventTarget = new EventTarget();
+ dataPublisher = getDataPublisherFromEventEmitter(eventTarget);
+ });
+ it('calls a subscriber with no arguments when the event is an `Event`', () => {
+ const subscriber = jest.fn();
+ dataPublisher.on('someEvent', subscriber);
+ eventTarget.dispatchEvent(new Event('someEvent'));
+ expect(subscriber).toHaveBeenCalledWith();
+ });
+ it('calls a subscriber with `null` when the event is a `CustomEvent` with no `detail`', () => {
+ const subscriber = jest.fn();
+ dataPublisher.on('someEvent', subscriber);
+ eventTarget.dispatchEvent(new CustomEvent('someEvent'));
+ expect(subscriber).toHaveBeenCalledWith(null);
+ });
+ it('calls a subscriber with the `detail` of a `CustomEvent`', () => {
+ const subscriber = jest.fn();
+ dataPublisher.on('someEvent', subscriber);
+ eventTarget.dispatchEvent(new CustomEvent('someEvent', { detail: 123 }));
+ expect(subscriber).toHaveBeenCalledWith(123);
+ });
+ it('does not call a subscriber after the unsubscribe function is called', () => {
+ const subscriber = jest.fn();
+ const unsubscribe = dataPublisher.on('someEvent', subscriber);
+ unsubscribe();
+ eventTarget.dispatchEvent(new Event('someEvent'));
+ expect(subscriber).not.toHaveBeenCalled();
+ });
+ it('does not call a subscriber after its abort signal fires', () => {
+ const subscriber = jest.fn();
+ const abortController = new AbortController();
+ dataPublisher.on('someEvent', subscriber, { signal: abortController.signal });
+ abortController.abort();
+ eventTarget.dispatchEvent(new Event('someEvent'));
+ expect(subscriber).not.toHaveBeenCalled();
+ });
+ it('does not fatal when the unsubscribe method is called more than once', () => {
+ const subscriber = jest.fn();
+ const unsubscribe = dataPublisher.on('someEvent', subscriber);
+ unsubscribe();
+ expect(() => {
+ unsubscribe();
+ }).not.toThrow();
+ });
+ it('keeps other subscribers subscribed when unsubcribing from others', () => {
+ const subscriberA = jest.fn();
+ const subscriberB = jest.fn();
+ dataPublisher.on('someEvent', subscriberA);
+ const unsubscribeB = dataPublisher.on('someEvent', subscriberB);
+ unsubscribeB();
+ eventTarget.dispatchEvent(new Event('someEvent'));
+ expect(subscriberA).toHaveBeenCalled();
+ });
+ it('keeps other subscribers subscribed when the abort signal of another fires', () => {
+ const subscriberA = jest.fn();
+ const subscriberB = jest.fn();
+ const abortController = new AbortController();
+ dataPublisher.on('someEvent', subscriberA);
+ dataPublisher.on('someEvent', subscriberB, { signal: abortController.signal });
+ abortController.abort();
+ eventTarget.dispatchEvent(new Event('someEvent'));
+ expect(subscriberA).toHaveBeenCalled();
+ });
+ it('does not notify a subscriber about an event with a type different than the one it is interested in', () => {
+ const subscriber = jest.fn();
+ dataPublisher.on('someEvent', subscriber);
+ eventTarget.dispatchEvent(new Event('someOtherEvent'));
+ expect(subscriber).not.toHaveBeenCalled();
+ });
+});
diff --git a/packages/subscribable/src/__typetests__/data-publisher-typetest.ts b/packages/subscribable/src/__typetests__/data-publisher-typetest.ts
new file mode 100644
index 00000000000..e3e8e2f44a9
--- /dev/null
+++ b/packages/subscribable/src/__typetests__/data-publisher-typetest.ts
@@ -0,0 +1,60 @@
+/* eslint-disable @typescript-eslint/ban-ts-comment */
+
+import { DataPublisher, getDataPublisherFromEventEmitter } from '../data-publisher';
+import { TypedEventEmitter } from '../event-emitter';
+
+type ChannelMap = {
+ fall: null;
+ jump: { height: number };
+ run: { velocity: number };
+};
+const publisher = null as unknown as DataPublisher;
+
+// [DESCRIBE] getDataPublisherFromEventEmitter
+{
+ // It materializes listener signatures based on the events of the emitter
+ {
+ const eventEmitter = null as unknown as TypedEventEmitter<{ foo: CustomEvent<'bar'> }>;
+ const publisher = getDataPublisherFromEventEmitter(eventEmitter);
+ publisher.on('foo', data => {
+ data satisfies 'bar';
+ });
+ }
+}
+
+// [DESCRIBE] DataPublisher
+{
+ // It adds listeners for known types
+ {
+ publisher.on('fall', () => {});
+ publisher.on('jump', () => {});
+ publisher.on('run', () => {});
+ }
+ // It rejects adding listeners for unknown types
+ {
+ publisher.on(
+ // @ts-expect-error
+ 'roll',
+ () => {},
+ );
+ }
+ // It accepts adding listeners with the appropriate signature
+ {
+ publisher.on('fall', data => {
+ data satisfies null;
+ });
+ publisher.on('jump', data => {
+ data satisfies { height: number };
+ });
+ publisher.on('run', data => {
+ data satisfies { velocity: number };
+ });
+ }
+ // It rejects adding listeners with inappropriate signatures
+ {
+ publisher.on('fall', data => {
+ // @ts-expect-error
+ data satisfies { style: string };
+ });
+ }
+}
diff --git a/packages/subscribable/src/__typetests__/event-emitter-typetest.ts b/packages/subscribable/src/__typetests__/event-emitter-typetest.ts
new file mode 100644
index 00000000000..41af842f395
--- /dev/null
+++ b/packages/subscribable/src/__typetests__/event-emitter-typetest.ts
@@ -0,0 +1,89 @@
+/* eslint-disable @typescript-eslint/ban-ts-comment */
+
+import { TypedEventEmitter, TypedEventTarget } from '../event-emitter';
+
+type EventMap = {
+ fall: Event;
+ jump: CustomEvent<{ height: number }>;
+ run: CustomEvent<{ velocity: number }>;
+};
+const emitter = null as unknown as TypedEventEmitter;
+
+// [DESCRIBE] TypedEventEmitter
+{
+ // It adds listeners for known types
+ {
+ emitter.addEventListener('fall', () => {});
+ emitter.addEventListener('jump', () => {});
+ emitter.addEventListener('run', () => {});
+ }
+ // It removes listeners for known types
+ {
+ emitter.removeEventListener('fall', () => {});
+ emitter.removeEventListener('jump', () => {});
+ emitter.removeEventListener('run', () => {});
+ }
+ // It rejects adding listeners for unknown types
+ {
+ emitter.addEventListener(
+ // @ts-expect-error
+ 'roll',
+ () => {},
+ );
+ }
+ // It rejects removing listeners for unknown types
+ {
+ emitter.removeEventListener(
+ // @ts-expect-error
+ 'roll',
+ () => {},
+ );
+ }
+ // It accepts adding listeners with the appropriate signature
+ {
+ emitter.addEventListener('fall', ev => {
+ ev satisfies Event;
+ });
+ emitter.addEventListener('jump', ev => {
+ ev satisfies CustomEvent<{ height: number }>;
+ });
+ emitter.addEventListener('run', ev => {
+ ev satisfies CustomEvent<{ velocity: number }>;
+ });
+ }
+ // It accepts removing listeners with the appropriate signature
+ {
+ emitter.removeEventListener('fall', ev => {
+ ev satisfies Event;
+ });
+ emitter.removeEventListener('jump', ev => {
+ ev satisfies CustomEvent<{ height: number }>;
+ });
+ emitter.removeEventListener('run', ev => {
+ ev satisfies CustomEvent<{ velocity: number }>;
+ });
+ }
+ // It rejects adding listeners with inappropriate signatures
+ {
+ emitter.addEventListener('fall', ev => {
+ // @ts-expect-error
+ ev satisfies CustomEvent<{ style: string }>;
+ });
+ }
+ // It rejects removing listeners with inappropriate signatures
+ {
+ emitter.removeEventListener('fall', ev => {
+ // @ts-expect-error
+ ev satisfies CustomEvent<{ style: string }>;
+ });
+ }
+}
+
+// [DESCRIBE] TypedEventTarget
+{
+ // It is a TypedEventEmitter
+ {
+ const eventTarget = null as unknown as TypedEventTarget<{ foo: CustomEvent<'bar'> }>;
+ eventTarget satisfies TypedEventEmitter<{ foo: CustomEvent<'bar'> }>;
+ }
+}
diff --git a/packages/subscribable/src/async-iterable.ts b/packages/subscribable/src/async-iterable.ts
new file mode 100644
index 00000000000..262c5c2644c
--- /dev/null
+++ b/packages/subscribable/src/async-iterable.ts
@@ -0,0 +1,172 @@
+import {
+ SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
+ SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING,
+ SolanaError,
+} from '@solana/errors';
+
+import { DataPublisher } from './data-publisher';
+
+type Config = Readonly<{
+ abortSignal: AbortSignal;
+ dataChannelName: string;
+ // FIXME: It would be nice to be able to constrain the type of `dataPublisher` to one that
+ // definitely supports the `dataChannelName` and `errorChannelName` channels, and
+ // furthermore publishes `TData` on the `dataChannelName` channel. This is more difficult
+ // than it should be: https://tsplay.dev/NlZelW
+ dataPublisher: DataPublisher;
+ errorChannelName: string;
+}>;
+
+const enum PublishType {
+ DATA,
+ ERROR,
+}
+
+type IteratorKey = symbol;
+type IteratorState =
+ | {
+ __hasPolled: false;
+ publishQueue: (
+ | {
+ __type: PublishType.DATA;
+ data: TData;
+ }
+ | {
+ __type: PublishType.ERROR;
+ err: unknown;
+ }
+ )[];
+ }
+ | {
+ __hasPolled: true;
+ onData: (data: TData) => void;
+ onError: Parameters[0]>[1];
+ };
+
+let EXPLICIT_ABORT_TOKEN: symbol;
+function createExplicitAbortToken() {
+ // This function is an annoying workaround to prevent `process.env.NODE_ENV` from appearing at
+ // the top level of this module and thwarting an optimizing compiler's attempt to tree-shake.
+ return Symbol(
+ __DEV__
+ ? "This symbol is thrown from a socket's iterator when the connection is explicitly " +
+ 'aborted by the user'
+ : undefined,
+ );
+}
+
+const UNINITIALIZED = Symbol();
+
+export function createAsyncIterableFromDataPublisher({
+ abortSignal,
+ dataChannelName,
+ dataPublisher,
+ errorChannelName,
+}: Config): AsyncIterable {
+ const iteratorState: Map> = new Map();
+ function publishErrorToAllIterators(reason: unknown) {
+ for (const [iteratorKey, state] of iteratorState.entries()) {
+ if (state.__hasPolled) {
+ iteratorState.delete(iteratorKey);
+ state.onError(reason);
+ } else {
+ state.publishQueue.push({
+ __type: PublishType.ERROR,
+ err: reason,
+ });
+ }
+ }
+ }
+ const abortController = new AbortController();
+ abortSignal.addEventListener('abort', () => {
+ abortController.abort();
+ publishErrorToAllIterators((EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken()));
+ });
+ const options = { signal: abortController.signal } as const;
+ let firstError: unknown | typeof UNINITIALIZED = UNINITIALIZED;
+ dataPublisher.on(
+ errorChannelName,
+ err => {
+ if (firstError === UNINITIALIZED) {
+ firstError = err;
+ abortController.abort();
+ publishErrorToAllIterators(err);
+ }
+ },
+ options,
+ );
+ dataPublisher.on(
+ dataChannelName,
+ data => {
+ iteratorState.forEach((state, iteratorKey) => {
+ if (state.__hasPolled) {
+ const { onData } = state;
+ iteratorState.set(iteratorKey, { __hasPolled: false, publishQueue: [] });
+ onData(data as TData);
+ } else {
+ state.publishQueue.push({
+ __type: PublishType.DATA,
+ data: data as TData,
+ });
+ }
+ });
+ },
+ options,
+ );
+ return {
+ async *[Symbol.asyncIterator]() {
+ if (abortSignal.aborted) {
+ return;
+ }
+ if (firstError !== UNINITIALIZED) {
+ throw firstError;
+ }
+ const iteratorKey = Symbol();
+ iteratorState.set(iteratorKey, { __hasPolled: false, publishQueue: [] });
+ try {
+ while (true) {
+ const state = iteratorState.get(iteratorKey);
+ if (!state) {
+ // There should always be state by now.
+ throw new SolanaError(SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING);
+ }
+ if (state.__hasPolled) {
+ // You should never be able to poll twice in a row.
+ throw new SolanaError(
+ SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
+ );
+ }
+ const publishQueue = state.publishQueue;
+ try {
+ if (publishQueue.length) {
+ state.publishQueue = [];
+ for (const item of publishQueue) {
+ if (item.__type === PublishType.DATA) {
+ yield item.data;
+ } else {
+ throw item.err;
+ }
+ }
+ } else {
+ yield await new Promise((resolve, reject) => {
+ iteratorState.set(iteratorKey, {
+ __hasPolled: true,
+ onData: resolve,
+ onError: reject,
+ });
+ });
+ }
+ } catch (e) {
+ if (e === (EXPLICIT_ABORT_TOKEN ||= createExplicitAbortToken())) {
+ return;
+ } else {
+ throw e;
+ }
+ }
+ }
+ } finally {
+ iteratorState.delete(iteratorKey);
+ }
+ },
+ };
+}
diff --git a/packages/subscribable/src/data-publisher.ts b/packages/subscribable/src/data-publisher.ts
new file mode 100644
index 00000000000..a33cd0d3455
--- /dev/null
+++ b/packages/subscribable/src/data-publisher.ts
@@ -0,0 +1,34 @@
+import { TypedEventEmitter } from './event-emitter';
+
+type UnsubscribeFn = () => void;
+
+export interface DataPublisher = Record> {
+ on(
+ channelName: TChannelName,
+ subscriber: (data: TDataByChannelName[TChannelName]) => void,
+ options?: { signal: AbortSignal },
+ ): UnsubscribeFn;
+}
+
+export function getDataPublisherFromEventEmitter>(
+ eventEmitter: TypedEventEmitter,
+): DataPublisher<{
+ [TEventType in keyof TEventMap]: TEventMap[TEventType] extends CustomEvent ? TEventMap[TEventType]['detail'] : null;
+}> {
+ return {
+ on(channelName, subscriber, options) {
+ function innerListener(ev: Event) {
+ if (ev instanceof CustomEvent) {
+ const data = ev.detail;
+ subscriber(data);
+ } else {
+ (subscriber as () => void)();
+ }
+ }
+ eventEmitter.addEventListener(channelName, innerListener, options);
+ return () => {
+ eventEmitter.removeEventListener(channelName, innerListener);
+ };
+ },
+ };
+}
diff --git a/packages/subscribable/src/event-emitter.ts b/packages/subscribable/src/event-emitter.ts
new file mode 100644
index 00000000000..77839a5ea76
--- /dev/null
+++ b/packages/subscribable/src/event-emitter.ts
@@ -0,0 +1,33 @@
+type EventMap = Record;
+type Listener = ((evt: TEvent) => void) | { handleEvent(object: TEvent): void };
+
+export interface TypedEventEmitter {
+ addEventListener(
+ type: TEventType,
+ listener: Listener,
+ options?: AddEventListenerOptions | boolean,
+ ): void;
+ removeEventListener(
+ type: TEventType,
+ listener: Listener,
+ options?: EventListenerOptions | boolean,
+ ): void;
+}
+
+/**
+ * Why not just extend the interface above, rather than to copy/paste it?
+ * See https://github.com/microsoft/TypeScript/issues/60008
+ */
+export interface TypedEventTarget {
+ addEventListener(
+ type: TEventType,
+ listener: Listener,
+ options?: AddEventListenerOptions | boolean,
+ ): void;
+ dispatchEvent(ev: TEventMap[TEventType]): void;
+ removeEventListener(
+ type: TEventType,
+ listener: Listener,
+ options?: EventListenerOptions | boolean,
+ ): void;
+}
diff --git a/packages/subscribable/src/index.ts b/packages/subscribable/src/index.ts
new file mode 100644
index 00000000000..6113e209c81
--- /dev/null
+++ b/packages/subscribable/src/index.ts
@@ -0,0 +1,3 @@
+export * from './async-iterable';
+export * from './data-publisher';
+export * from './event-emitter';
diff --git a/packages/subscribable/src/types/global.d.ts b/packages/subscribable/src/types/global.d.ts
new file mode 100644
index 00000000000..2d0fcf69889
--- /dev/null
+++ b/packages/subscribable/src/types/global.d.ts
@@ -0,0 +1,4 @@
+declare const __BROWSER__: boolean;
+declare const __DEV__: boolean;
+declare const __NODEJS__: boolean;
+declare const __REACTNATIVE__: boolean;
diff --git a/packages/subscribable/tsconfig.declarations.json b/packages/subscribable/tsconfig.declarations.json
new file mode 100644
index 00000000000..dc2d27bb09f
--- /dev/null
+++ b/packages/subscribable/tsconfig.declarations.json
@@ -0,0 +1,10 @@
+{
+ "compilerOptions": {
+ "declaration": true,
+ "declarationMap": true,
+ "emitDeclarationOnly": true,
+ "outDir": "./dist/types"
+ },
+ "extends": "./tsconfig.json",
+ "include": ["src/index.ts", "src/types"]
+}
diff --git a/packages/subscribable/tsconfig.json b/packages/subscribable/tsconfig.json
new file mode 100644
index 00000000000..092f699a18c
--- /dev/null
+++ b/packages/subscribable/tsconfig.json
@@ -0,0 +1,9 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "display": "@solana/subscribable",
+ "extends": "../tsconfig/base.json",
+ "include": ["src"],
+ "compilerOptions": {
+ "lib": ["DOM", "ES2015"]
+ }
+}
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 0309f82857e..b6f3f6c355d 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -1051,6 +1051,15 @@ importers:
specifier: workspace:*
version: link:../text-encoding-impl
+ packages/subscribable:
+ dependencies:
+ '@solana/errors':
+ specifier: workspace:*
+ version: link:../errors
+ typescript:
+ specifier: '>=5'
+ version: 5.6.2
+
packages/sysvars:
dependencies:
'@solana/accounts':