Skip to content

Commit

Permalink
feat(nbstore): add awareness storage&sync&frontend (#9016)
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 17, 2024
1 parent 36ac793 commit ffa0231
Show file tree
Hide file tree
Showing 21 changed files with 572 additions and 26 deletions.
1 change: 1 addition & 0 deletions packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"lodash-es": "^4.17.21",
"nanoid": "^5.0.9",
"rxjs": "^7.8.1",
"y-protocols": "^1.0.6",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
Expand Down
89 changes: 88 additions & 1 deletion packages/common/nbstore/src/__tests__/frontend.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import 'fake-indexeddb/auto';

import { test, vitest } from 'vitest';
import { expect, test, vitest } from 'vitest';
import { Awareness } from 'y-protocols/awareness.js';
import { Doc as YDoc } from 'yjs';

import { AwarenessFrontend } from '../frontend/awareness';
import { DocFrontend } from '../frontend/doc';
import { BroadcastChannelAwarenessStorage } from '../impls/broadcast-channel/awareness';
import { IndexedDBDocStorage } from '../impls/idb';
import { AwarenessSync } from '../sync/awareness';
import { expectYjsEqual } from './utils';

test('doc', async () => {
Expand Down Expand Up @@ -48,3 +52,86 @@ test('doc', async () => {
});
});
});

test('awareness', async () => {
const storage1 = new BroadcastChannelAwarenessStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const storage2 = new BroadcastChannelAwarenessStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
});

await storage1.connect();
await storage2.connect();

// peer a
const docA = new YDoc({ guid: 'test-doc' });
docA.clientID = 1;
const awarenessA = new Awareness(docA);

// peer b
const docB = new YDoc({ guid: 'test-doc' });
docB.clientID = 2;
const awarenessB = new Awareness(docB);

// peer c
const docC = new YDoc({ guid: 'test-doc' });
docC.clientID = 3;
const awarenessC = new Awareness(docC);

{
const sync = new AwarenessSync(storage1, [storage2]);
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessA);
frontend.connect(awarenessB);
}
{
const sync = new AwarenessSync(storage2, [storage1]);
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessC);
}

awarenessA.setLocalState({
hello: 'world',
});

await vitest.waitFor(() => {
expect(awarenessB.getStates().get(1)).toEqual({
hello: 'world',
});
expect(awarenessC.getStates().get(1)).toEqual({
hello: 'world',
});
});

awarenessB.setLocalState({
foo: 'bar',
});

await vitest.waitFor(() => {
expect(awarenessA.getStates().get(2)).toEqual({
foo: 'bar',
});
expect(awarenessC.getStates().get(2)).toEqual({
foo: 'bar',
});
});

awarenessC.setLocalState({
baz: 'qux',
});

await vitest.waitFor(() => {
expect(awarenessA.getStates().get(3)).toEqual({
baz: 'qux',
});
expect(awarenessB.getStates().get(3)).toEqual({
baz: 'qux',
});
});
});
6 changes: 3 additions & 3 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
IndexedDBSyncStorage,
} from '../impls/idb';
import { SpaceStorage } from '../storage';
import { SyncEngine } from '../sync';
import { Sync } from '../sync';
import { expectYjsEqual } from './utils';

test('doc', async () => {
Expand Down Expand Up @@ -53,7 +53,7 @@ test('doc', async () => {
bin: update,
});

const sync = new SyncEngine(peerA, [peerB, peerC]);
const sync = new Sync(peerA, [peerB, peerC]);
sync.start();

await new Promise(resolve => setTimeout(resolve, 1000));
Expand Down Expand Up @@ -143,7 +143,7 @@ test('blob', async () => {
await peerB.connect();
await peerC.connect();

const sync = new SyncEngine(peerA, [peerB, peerC]);
const sync = new Sync(peerA, [peerB, peerC]);
sync.start();

await new Promise(resolve => setTimeout(resolve, 1000));
Expand Down
70 changes: 70 additions & 0 deletions packages/common/nbstore/src/frontend/awareness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { nanoid } from 'nanoid';
import {
applyAwarenessUpdate,
type Awareness,
encodeAwarenessUpdate,
} from 'y-protocols/awareness.js';

import type { AwarenessRecord } from '../storage/awareness';
import type { AwarenessSync } from '../sync/awareness';

type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>;

export class AwarenessFrontend {
constructor(private readonly sync: AwarenessSync) {}

connect(awareness: Awareness) {
const uniqueId = nanoid();
const handleAwarenessUpdate = (
changes: AwarenessChanges,
origin: string
) => {
if (origin === uniqueId) {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) =>
res.concat(cur)
);

const update = encodeAwarenessUpdate(awareness, changedClients);

this.sync
.update(
{
docId: awareness.doc.guid,
bin: update,
},
uniqueId
)
.catch(error => {
console.error('update awareness error', error);
});
};

awareness.on('update', handleAwarenessUpdate);
const handleSyncUpdate = (update: AwarenessRecord, origin?: string) => {
if (origin === uniqueId) {
// skip self update
return;
}

applyAwarenessUpdate(awareness, update.bin, origin);
};
const handleSyncCollect = () => {
return {
docId: awareness.doc.guid,
bin: encodeAwarenessUpdate(awareness, [awareness.clientID]),
};
};
const unsubscribe = this.sync.subscribeUpdate(
awareness.doc.guid,
handleSyncUpdate,
handleSyncCollect
);

return () => {
awareness.off('update', handleAwarenessUpdate);
unsubscribe();
};
}
}
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/frontend/blob.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { BlobRecord, BlobStorage } from '../storage';
import type { BlobSyncEngine } from '../sync/blob';
import type { BlobSync } from '../sync/blob';

export class BlobFrontend {
constructor(
readonly storage: BlobStorage,
readonly sync?: BlobSyncEngine
readonly sync?: BlobSync
) {}

get(blobId: string) {
Expand Down
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/frontend/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from 'yjs';

import type { DocRecord, DocStorage } from '../storage';
import type { DocSyncEngine } from '../sync/doc';
import type { DocSync } from '../sync/doc';
import { AsyncPriorityQueue } from '../utils/async-priority-queue';
import { isEmptyUpdate } from '../utils/is-empty-update';
import { throwIfAborted } from '../utils/throw-if-aborted';
Expand Down Expand Up @@ -56,7 +56,7 @@ export class DocFrontend {

constructor(
private readonly storage: DocStorage,
private readonly sync: DocSyncEngine | null,
private readonly sync: DocSync | null,
readonly options: DocFrontendOptions = {}
) {}

Expand Down
128 changes: 128 additions & 0 deletions packages/common/nbstore/src/impls/broadcast-channel/awareness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { nanoid } from 'nanoid';

import {
type AwarenessRecord,
AwarenessStorage,
} from '../../storage/awareness';
import { BroadcastChannelConnection } from './channel';

type ChannelMessage =
| {
type: 'awareness-update';
docId: string;
bin: Uint8Array;
origin?: string;
}
| {
type: 'awareness-collect';
docId: string;
collectId: string;
}
| {
type: 'awareness-collect-fallback';
docId: string;
bin: Uint8Array;
collectId: string;
};

export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
override readonly storageType = 'awareness';
override readonly connection = new BroadcastChannelConnection(this.options);
get channel() {
return this.connection.inner;
}

private readonly subscriptions = new Map<
string,
Set<{
onUpdate: (update: AwarenessRecord, origin?: string) => void;
onCollect: () => AwarenessRecord;
}>
>();

override update(record: AwarenessRecord, origin?: string): Promise<void> {
const subscribers = this.subscriptions.get(record.docId);
if (subscribers) {
subscribers.forEach(subscriber => subscriber.onUpdate(record, origin));
}
this.channel.postMessage({
type: 'awareness-update',
docId: record.docId,
bin: record.bin,
origin,
} satisfies ChannelMessage);
return Promise.resolve();
}

override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
): () => void {
const subscribers = this.subscriptions.get(id) ?? new Set();
subscribers.forEach(subscriber => {
const fallback = subscriber.onCollect();
onUpdate(fallback);
});

const collectUniqueId = nanoid();

const onChannelMessage = (message: MessageEvent<ChannelMessage>) => {
if (
message.data.type === 'awareness-update' &&
message.data.docId === id
) {
onUpdate(
{
docId: message.data.docId,
bin: message.data.bin,
},
message.data.origin
);
}
if (
message.data.type === 'awareness-collect' &&
message.data.docId === id
) {
const fallback = onCollect();
if (fallback) {
this.channel.postMessage({
type: 'awareness-collect-fallback',
docId: message.data.docId,
bin: fallback.bin,
collectId: collectUniqueId,
} satisfies ChannelMessage);
}
}
if (
message.data.type === 'awareness-collect-fallback' &&
message.data.docId === id &&
message.data.collectId === collectUniqueId
) {
onUpdate({
docId: message.data.docId,
bin: message.data.bin,
});
}
};

this.channel.addEventListener('message', onChannelMessage);
this.channel.postMessage({
type: 'awareness-collect',
docId: id,
collectId: collectUniqueId,
} satisfies ChannelMessage);

const subscriber = {
onUpdate,
onCollect,
};
subscribers.add(subscriber);
this.subscriptions.set(id, subscribers);

return () => {
subscribers.delete(subscriber);
this.channel.removeEventListener('message', onChannelMessage);
};
}
}
23 changes: 23 additions & 0 deletions packages/common/nbstore/src/impls/broadcast-channel/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Connection } from '../../connection';
import type { StorageOptions } from '../../storage';

export class BroadcastChannelConnection extends Connection<BroadcastChannel> {
readonly channelName = `channel:${this.opts.peer}:${this.opts.type}:${this.opts.id}`;

constructor(private readonly opts: StorageOptions) {
super();
}

override async doConnect() {
return new BroadcastChannel(this.channelName);
}

override async doDisconnect() {
this.close();
}

private close(error?: Error) {
this.maybeConnection?.close();
this.setStatus('closed', error);
}
}
Loading

0 comments on commit ffa0231

Please sign in to comment.