From 0073358b3c4c10ed9813f784d2e2222919769679 Mon Sep 17 00:00:00 2001 From: EYHN Date: Wed, 11 Dec 2024 20:21:52 +0800 Subject: [PATCH] feat(nbstore): add awareness frontend --- packages/common/nbstore/package.json | 1 + .../nbstore/src/__tests__/frontend.spec.ts | 89 +++++++++- .../common/nbstore/src/__tests__/sync.spec.ts | 6 +- .../common/nbstore/src/frontend/awareness.ts | 70 ++++++++ packages/common/nbstore/src/frontend/blob.ts | 4 +- packages/common/nbstore/src/frontend/doc.ts | 4 +- .../src/impls/broadcast-channel/awareness.ts | 128 +++++++++++++ .../src/impls/broadcast-channel/channel.ts | 23 +++ .../nbstore/src/impls/cloud/awareness.ts | 148 +++++++++++++++ .../common/nbstore/src/impls/cloud/doc.ts | 5 +- .../common/nbstore/src/impls/cloud/index.ts | 1 + .../common/nbstore/src/impls/cloud/socket.ts | 26 +++ .../common/nbstore/src/impls/idb/awareness.ts | 43 ----- packages/common/nbstore/src/impls/idb/db.ts | 2 +- .../common/nbstore/src/storage/awareness.ts | 3 +- packages/common/nbstore/src/storage/index.ts | 9 +- .../nbstore/src/sync/awareness/index.ts | 30 ++++ .../common/nbstore/src/sync/blob/index.ts | 2 +- packages/common/nbstore/src/sync/doc/index.ts | 2 +- packages/common/nbstore/src/sync/index.ts | 14 +- yarn.lock | 168 ++++++++++++++++-- 21 files changed, 695 insertions(+), 83 deletions(-) create mode 100644 packages/common/nbstore/src/frontend/awareness.ts create mode 100644 packages/common/nbstore/src/impls/broadcast-channel/awareness.ts create mode 100644 packages/common/nbstore/src/impls/broadcast-channel/channel.ts create mode 100644 packages/common/nbstore/src/impls/cloud/awareness.ts delete mode 100644 packages/common/nbstore/src/impls/idb/awareness.ts create mode 100644 packages/common/nbstore/src/sync/awareness/index.ts diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index f40500c6d58bc..5a9c66757ce47 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -20,6 +20,7 @@ "lodash-es": "^4.17.21", "nanoid": "^5.0.9", "rxjs": "^7.8.1", + "y-protocols": "^1.0.6", "yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" }, "devDependencies": { diff --git a/packages/common/nbstore/src/__tests__/frontend.spec.ts b/packages/common/nbstore/src/__tests__/frontend.spec.ts index 941a13a4d8754..4a00fbae581f4 100644 --- a/packages/common/nbstore/src/__tests__/frontend.spec.ts +++ b/packages/common/nbstore/src/__tests__/frontend.spec.ts @@ -1,10 +1,14 @@ import 'fake-indexeddb/auto'; -import { test, vitest } from 'vitest'; +import { expect, test, vitest } from 'vitest'; +import { Awareness } from 'y-protocols/awareness.js'; import { Doc as YDoc } from 'yjs'; +import { AwarenessFrontend } from '../frontend/awareness'; import { DocFrontend } from '../frontend/doc'; +import { BroadcastChannelAwarenessStorage } from '../impls/broadcast-channel/awareness'; import { IndexedDBDocStorage } from '../impls/idb'; +import { AwarenessSync } from '../sync/awareness'; import { expectYjsEqual } from './utils'; test('doc', async () => { @@ -48,3 +52,86 @@ test('doc', async () => { }); }); }); + +test('awareness', async () => { + const storage1 = new BroadcastChannelAwarenessStorage({ + id: 'ws1', + peer: 'a', + type: 'workspace', + }); + + const storage2 = new BroadcastChannelAwarenessStorage({ + id: 'ws1', + peer: 'b', + type: 'workspace', + }); + + await storage1.connect(); + await storage2.connect(); + + // peer a + const docA = new YDoc({ guid: 'test-doc' }); + docA.clientID = 1; + const awarenessA = new Awareness(docA); + + // peer b + const docB = new YDoc({ guid: 'test-doc' }); + docB.clientID = 2; + const awarenessB = new Awareness(docB); + + // peer c + const docC = new YDoc({ guid: 'test-doc' }); + docC.clientID = 3; + const awarenessC = new Awareness(docC); + + { + const sync = new AwarenessSync(storage1, [storage2]); + const frontend = new AwarenessFrontend(sync); + frontend.connect(awarenessA); + frontend.connect(awarenessB); + } + { + const sync = new AwarenessSync(storage2, [storage1]); + const frontend = new AwarenessFrontend(sync); + frontend.connect(awarenessC); + } + + awarenessA.setLocalState({ + hello: 'world', + }); + + await vitest.waitFor(() => { + expect(awarenessB.getStates().get(1)).toEqual({ + hello: 'world', + }); + expect(awarenessC.getStates().get(1)).toEqual({ + hello: 'world', + }); + }); + + awarenessB.setLocalState({ + foo: 'bar', + }); + + await vitest.waitFor(() => { + expect(awarenessA.getStates().get(2)).toEqual({ + foo: 'bar', + }); + expect(awarenessC.getStates().get(2)).toEqual({ + foo: 'bar', + }); + }); + + awarenessC.setLocalState({ + baz: 'qux', + }); + + await vitest.waitFor(() => { + expect(awarenessA.getStates().get(3)).toEqual({ + baz: 'qux', + }); + expect(awarenessB.getStates().get(3)).toEqual({ + baz: 'qux', + }); + }); +}); diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index e6c70452f837f..95ae54032d367 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -9,7 +9,7 @@ import { IndexedDBSyncStorage, } from '../impls/idb'; import { SpaceStorage } from '../storage'; -import { SyncEngine } from '../sync'; +import { Sync } from '../sync'; import { expectYjsEqual } from './utils'; test('doc', async () => { @@ -53,7 +53,7 @@ test('doc', async () => { bin: update, }); - const sync = new SyncEngine(peerA, [peerB, peerC]); + const sync = new Sync(peerA, [peerB, peerC]); sync.start(); await new Promise(resolve => setTimeout(resolve, 1000)); @@ -143,7 +143,7 @@ test('blob', async () => { await peerB.connect(); await peerC.connect(); - const sync = new SyncEngine(peerA, [peerB, peerC]); + const sync = new Sync(peerA, [peerB, peerC]); sync.start(); await new Promise(resolve => setTimeout(resolve, 1000)); diff --git a/packages/common/nbstore/src/frontend/awareness.ts b/packages/common/nbstore/src/frontend/awareness.ts new file mode 100644 index 0000000000000..19a092789bc42 --- /dev/null +++ b/packages/common/nbstore/src/frontend/awareness.ts @@ -0,0 +1,70 @@ +import { nanoid } from 'nanoid'; +import { + applyAwarenessUpdate, + type Awareness, + encodeAwarenessUpdate, +} from 'y-protocols/awareness.js'; + +import type { AwarenessRecord } from '../storage/awareness'; +import type { AwarenessSync } from '../sync/awareness'; + +type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>; + +export class AwarenessFrontend { + constructor(private readonly sync: AwarenessSync) {} + + connect(awareness: Awareness) { + const uniqueId = nanoid(); + const handleAwarenessUpdate = ( + changes: AwarenessChanges, + origin: string + ) => { + if (origin === uniqueId) { + return; + } + const changedClients = Object.values(changes).reduce((res, cur) => + res.concat(cur) + ); + + const update = encodeAwarenessUpdate(awareness, changedClients); + + this.sync + .update( + { + docId: awareness.doc.guid, + bin: update, + }, + uniqueId + ) + .catch(error => { + console.error('update awareness error', error); + }); + }; + + awareness.on('update', handleAwarenessUpdate); + const handleSyncUpdate = (update: AwarenessRecord, origin?: string) => { + if (origin === uniqueId) { + // skip self update + return; + } + + applyAwarenessUpdate(awareness, update.bin, origin); + }; + const handleSyncCollect = () => { + return { + docId: awareness.doc.guid, + bin: encodeAwarenessUpdate(awareness, [awareness.clientID]), + }; + }; + const unsubscribe = this.sync.subscribeUpdate( + awareness.doc.guid, + handleSyncUpdate, + handleSyncCollect + ); + + return () => { + awareness.off('update', handleAwarenessUpdate); + unsubscribe(); + }; + } +} diff --git a/packages/common/nbstore/src/frontend/blob.ts b/packages/common/nbstore/src/frontend/blob.ts index 6d4c1e319d1d2..40af3f7773f30 100644 --- a/packages/common/nbstore/src/frontend/blob.ts +++ b/packages/common/nbstore/src/frontend/blob.ts @@ -1,10 +1,10 @@ import type { BlobRecord, BlobStorage } from '../storage'; -import type { BlobSyncEngine } from '../sync/blob'; +import type { BlobSync } from '../sync/blob'; export class BlobFrontend { constructor( readonly storage: BlobStorage, - readonly sync?: BlobSyncEngine + readonly sync?: BlobSync ) {} get(blobId: string) { diff --git a/packages/common/nbstore/src/frontend/doc.ts b/packages/common/nbstore/src/frontend/doc.ts index 7a1bf7b06d9b1..c95aee1c18195 100644 --- a/packages/common/nbstore/src/frontend/doc.ts +++ b/packages/common/nbstore/src/frontend/doc.ts @@ -9,7 +9,7 @@ import { } from 'yjs'; import type { DocRecord, DocStorage } from '../storage'; -import type { DocSyncEngine } from '../sync/doc'; +import type { DocSync } from '../sync/doc'; import { AsyncPriorityQueue } from '../utils/async-priority-queue'; import { isEmptyUpdate } from '../utils/is-empty-update'; import { throwIfAborted } from '../utils/throw-if-aborted'; @@ -56,7 +56,7 @@ export class DocFrontend { constructor( private readonly storage: DocStorage, - private readonly sync: DocSyncEngine | null, + private readonly sync: DocSync | null, readonly options: DocFrontendOptions = {} ) {} diff --git a/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts b/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts new file mode 100644 index 0000000000000..837dc04784f91 --- /dev/null +++ b/packages/common/nbstore/src/impls/broadcast-channel/awareness.ts @@ -0,0 +1,128 @@ +import { nanoid } from 'nanoid'; + +import { + type AwarenessRecord, + AwarenessStorage, +} from '../../storage/awareness'; +import { BroadcastChannelConnection } from './channel'; + +type ChannelMessage = + | { + type: 'awareness-update'; + docId: string; + bin: Uint8Array; + origin?: string; + } + | { + type: 'awareness-collect'; + docId: string; + collectId: string; + } + | { + type: 'awareness-collect-fallback'; + docId: string; + bin: Uint8Array; + collectId: string; + }; + +export class BroadcastChannelAwarenessStorage extends AwarenessStorage { + override readonly storageType = 'awareness'; + override readonly connection = new BroadcastChannelConnection(this.options); + get channel() { + return this.connection.inner; + } + + private readonly subscriptions = new Map< + string, + Set<{ + onUpdate: (update: AwarenessRecord, origin?: string) => void; + onCollect: () => AwarenessRecord; + }> + >(); + + override update(record: AwarenessRecord, origin?: string): Promise { + const subscribers = this.subscriptions.get(record.docId); + if (subscribers) { + subscribers.forEach(subscriber => subscriber.onUpdate(record, origin)); + } + this.channel.postMessage({ + type: 'awareness-update', + docId: record.docId, + bin: record.bin, + origin, + } satisfies ChannelMessage); + return Promise.resolve(); + } + + override subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => AwarenessRecord + ): () => void { + const subscribers = this.subscriptions.get(id) ?? new Set(); + subscribers.forEach(subscriber => { + const fallback = subscriber.onCollect(); + onUpdate(fallback); + }); + + const collectUniqueId = nanoid(); + + const onChannelMessage = (message: MessageEvent) => { + if ( + message.data.type === 'awareness-update' && + message.data.docId === id + ) { + onUpdate( + { + docId: message.data.docId, + bin: message.data.bin, + }, + message.data.origin + ); + } + if ( + message.data.type === 'awareness-collect' && + message.data.docId === id + ) { + const fallback = onCollect(); + if (fallback) { + this.channel.postMessage({ + type: 'awareness-collect-fallback', + docId: message.data.docId, + bin: fallback.bin, + collectId: collectUniqueId, + } satisfies ChannelMessage); + } + } + if ( + message.data.type === 'awareness-collect-fallback' && + message.data.docId === id && + message.data.collectId === collectUniqueId + ) { + onUpdate({ + docId: message.data.docId, + bin: message.data.bin, + }); + } + }; + + this.channel.addEventListener('message', onChannelMessage); + this.channel.postMessage({ + type: 'awareness-collect', + docId: id, + collectId: collectUniqueId, + } satisfies ChannelMessage); + + const subscriber = { + onUpdate, + onCollect, + }; + subscribers.add(subscriber); + this.subscriptions.set(id, subscribers); + + return () => { + subscribers.delete(subscriber); + this.channel.removeEventListener('message', onChannelMessage); + }; + } +} diff --git a/packages/common/nbstore/src/impls/broadcast-channel/channel.ts b/packages/common/nbstore/src/impls/broadcast-channel/channel.ts new file mode 100644 index 0000000000000..e3d7b1327bef2 --- /dev/null +++ b/packages/common/nbstore/src/impls/broadcast-channel/channel.ts @@ -0,0 +1,23 @@ +import { Connection } from '../../connection'; +import type { StorageOptions } from '../../storage'; + +export class BroadcastChannelConnection extends Connection { + readonly channelName = `channel:${this.opts.peer}:${this.opts.type}:${this.opts.id}`; + + constructor(private readonly opts: StorageOptions) { + super(); + } + + override async doConnect() { + return new BroadcastChannel(this.channelName); + } + + override async doDisconnect() { + this.close(); + } + + private close(error?: Error) { + this.maybeConnection?.close(); + this.setStatus('closed', error); + } +} diff --git a/packages/common/nbstore/src/impls/cloud/awareness.ts b/packages/common/nbstore/src/impls/cloud/awareness.ts new file mode 100644 index 0000000000000..f15b8916293b9 --- /dev/null +++ b/packages/common/nbstore/src/impls/cloud/awareness.ts @@ -0,0 +1,148 @@ +import type { SocketOptions } from 'socket.io-client'; + +import { share } from '../../connection'; +import { + type AwarenessRecord, + AwarenessStorage, + type AwarenessStorageOptions, +} from '../../storage/awareness'; +import { + base64ToUint8Array, + SocketConnection, + uint8ArrayToBase64, +} from './socket'; + +interface CloudAwarenessStorageOptions extends AwarenessStorageOptions { + socketOptions: SocketOptions; +} + +export class CloudAwarenessStorage extends AwarenessStorage { + connection = share( + new SocketConnection(this.peer, this.options.socketOptions) + ); + + private get socket() { + return this.connection.inner; + } + + override async connect(): Promise { + await super.connect(); + } + + override async update(record: AwarenessRecord): Promise { + const encodedUpdate = await uint8ArrayToBase64(record.bin); + this.socket.emit('space:update-awareness', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: record.docId, + awarenessUpdate: encodedUpdate, + }); + } + + override subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => AwarenessRecord + ): () => void { + // leave awareness + const leave = () => { + this.socket.emit('space:leave-awareness', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: id, + }); + }; + + // join awareness, and collect awareness from others + const joinAndCollect = async () => { + await this.socket.emitWithAck('space:join-awareness', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: id, + clientVersion: BUILD_CONFIG.appVersion, + }); + this.socket.emit('space:load-awarenesses', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: id, + }); + }; + + joinAndCollect().catch(err => console.error('awareness join failed', err)); + + const unsubscribeConnectionStatusChanged = this.connection.onStatusChanged( + status => { + if (status === 'connected') { + joinAndCollect().catch(err => + console.error('awareness join failed', err) + ); + } + } + ); + + const handleCollectAwareness = ({ + spaceId, + spaceType, + docId, + }: { + spaceId: string; + spaceType: string; + docId: string; + }) => { + if ( + spaceId === this.spaceId && + spaceType === this.spaceType && + docId === id + ) { + (async () => { + const record = onCollect(); + const encodedUpdate = await uint8ArrayToBase64(record.bin); + this.socket.emit('space:update-awareness', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: record.docId, + awarenessUpdate: encodedUpdate, + }); + })().catch(err => console.error('awareness upload failed', err)); + } + }; + + const handleBroadcastAwarenessUpdate = ({ + spaceType, + spaceId, + docId, + awarenessUpdate, + }: { + spaceType: string; + spaceId: string; + docId: string; + awarenessUpdate: string; + }) => { + if ( + spaceId === this.spaceId && + spaceType === this.spaceType && + docId === id + ) { + onUpdate({ + bin: base64ToUint8Array(awarenessUpdate), + docId: id, + }); + } + }; + + this.socket.on('space:collect-awareness', handleCollectAwareness); + this.socket.on( + 'space:broadcast-awareness-update', + handleBroadcastAwarenessUpdate + ); + return () => { + leave(); + this.socket.off('space:collect-awareness', handleCollectAwareness); + this.socket.off( + 'space:broadcast-awareness-update', + handleBroadcastAwarenessUpdate + ); + unsubscribeConnectionStatusChanged(); + }; + } +} diff --git a/packages/common/nbstore/src/impls/cloud/doc.ts b/packages/common/nbstore/src/impls/cloud/doc.ts index 327ac777fa90c..6dd855ae521fa 100644 --- a/packages/common/nbstore/src/impls/cloud/doc.ts +++ b/packages/common/nbstore/src/impls/cloud/doc.ts @@ -1,4 +1,3 @@ -import { noop } from 'lodash-es'; import type { SocketOptions } from 'socket.io-client'; import { share } from '../../connection'; @@ -33,7 +32,9 @@ export class CloudDocStorage extends DocStorage { await super.connect(); this.connection.onStatusChanged(status => { if (status === 'connected') { - this.join().catch(noop); + this.join().catch(err => { + console.error('doc storage join failed', err); + }); this.socket.on('space:broadcast-doc-update', this.onServerUpdate); } }); diff --git a/packages/common/nbstore/src/impls/cloud/index.ts b/packages/common/nbstore/src/impls/cloud/index.ts index d476ae6eb9b92..f4829dcdbfe8a 100644 --- a/packages/common/nbstore/src/impls/cloud/index.ts +++ b/packages/common/nbstore/src/impls/cloud/index.ts @@ -1,2 +1,3 @@ +export * from './awareness'; export * from './blob'; export * from './doc'; diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts index 61f0a2403118b..de80a479912b1 100644 --- a/packages/common/nbstore/src/impls/cloud/socket.ts +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -29,6 +29,19 @@ interface ServerEvents { timestamp: number; editor: string; }; + + 'space:collect-awareness': { + spaceType: string; + spaceId: string; + docId: string; + }; + + 'space:broadcast-awareness-update': { + spaceType: string; + spaceId: string; + docId: string; + awarenessUpdate: string; + }; } interface ClientEvents { @@ -52,6 +65,19 @@ interface ClientEvents { docId: string; }; + 'space:update-awareness': { + spaceType: string; + spaceId: string; + docId: string; + awarenessUpdate: string; + }; + + 'space:load-awarenesses': { + spaceType: string; + spaceId: string; + docId: string; + }; + 'space:push-doc-update': [ { spaceType: string; spaceId: string; docId: string; updates: string }, { timestamp: number }, diff --git a/packages/common/nbstore/src/impls/idb/awareness.ts b/packages/common/nbstore/src/impls/idb/awareness.ts deleted file mode 100644 index 38da681a07944..0000000000000 --- a/packages/common/nbstore/src/impls/idb/awareness.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { share } from '../../connection'; -import { - type AwarenessRecord, - AwarenessStorage, -} from '../../storage/awareness'; -import { IDBConnection } from './db'; - -export class IndexedDBAwarenessStorage extends AwarenessStorage { - override readonly storageType = 'awareness'; - override readonly connection = share(new IDBConnection(this.options)); - - private readonly subscriptions = new Map< - string, - Set<(update: AwarenessRecord, origin?: string) => void> - >(); - - private readonly cached = new Map(); - - override update(record: AwarenessRecord, origin?: string): Promise { - const subscribers = this.subscriptions.get(record.docId); - if (subscribers) { - subscribers.forEach(callback => callback(record, origin)); - } - this.cached.set(record.docId, record); - return Promise.resolve(); - } - - override subscribeUpdate( - id: string, - callback: (update: AwarenessRecord, origin?: string) => void - ): () => void { - const subscribers = this.subscriptions.get(id) ?? new Set(); - subscribers.add(callback); - this.subscriptions.set(id, subscribers); - const cached = this.cached.get(id); - if (cached) { - callback(cached); - } - return () => { - subscribers.delete(callback); - }; - } -} diff --git a/packages/common/nbstore/src/impls/idb/db.ts b/packages/common/nbstore/src/impls/idb/db.ts index 5397698ecdcfd..60129fe1515b5 100644 --- a/packages/common/nbstore/src/impls/idb/db.ts +++ b/packages/common/nbstore/src/impls/idb/db.ts @@ -34,7 +34,7 @@ export class IDBConnection extends Connection<{ this.setStatus('error', new Error('Blocked by other tabs.')); }, }), - channel: new BroadcastChannel(this.dbName), + channel: new BroadcastChannel('idb:' + this.dbName), }; } diff --git a/packages/common/nbstore/src/storage/awareness.ts b/packages/common/nbstore/src/storage/awareness.ts index 127b60682eb3d..5b47f3a450105 100644 --- a/packages/common/nbstore/src/storage/awareness.ts +++ b/packages/common/nbstore/src/storage/awareness.ts @@ -21,6 +21,7 @@ export abstract class AwarenessStorage< abstract subscribeUpdate( id: string, - callback: (update: AwarenessRecord, origin?: string) => void + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => AwarenessRecord ): () => void; } diff --git a/packages/common/nbstore/src/storage/index.ts b/packages/common/nbstore/src/storage/index.ts index f5afcc874b732..c053c664f1b9d 100644 --- a/packages/common/nbstore/src/storage/index.ts +++ b/packages/common/nbstore/src/storage/index.ts @@ -1,11 +1,9 @@ import EventEmitter2 from 'eventemitter2'; import type { ConnectionStatus } from '../connection'; -import { type Storage, type StorageType } from '../storage'; -import type { AwarenessStorage } from './awareness'; import type { BlobStorage } from './blob'; import type { DocStorage } from './doc'; -import { type Storage, type StorageType } from './storage'; +import type { Storage, StorageType } from './storage'; import type { SyncStorage } from './sync'; type Storages = DocStorage | BlobStorage | SyncStorage; @@ -24,7 +22,10 @@ export class SpaceStorage { tryGet( type: T ): Extract | undefined { - return this.storages.get(type) as Extract; + return this.storages.get(type) as unknown as Extract< + Storages, + { storageType: T } + >; } get(type: T): Extract { diff --git a/packages/common/nbstore/src/sync/awareness/index.ts b/packages/common/nbstore/src/sync/awareness/index.ts new file mode 100644 index 0000000000000..cfdcbf9047e20 --- /dev/null +++ b/packages/common/nbstore/src/sync/awareness/index.ts @@ -0,0 +1,30 @@ +import type { + AwarenessRecord, + AwarenessStorage, +} from '../../storage/awareness'; + +export class AwarenessSync { + constructor( + readonly local: AwarenessStorage, + readonly remotes: AwarenessStorage[] + ) {} + + async update(record: AwarenessRecord, origin?: string) { + await Promise.all( + [this.local, ...this.remotes].map(peer => peer.update(record, origin)) + ); + } + + subscribeUpdate( + id: string, + onUpdate: (update: AwarenessRecord, origin?: string) => void, + onCollect: () => AwarenessRecord + ): () => void { + const unsubscribes = [this.local, ...this.remotes].map(peer => + peer.subscribeUpdate(id, onUpdate, onCollect) + ); + return () => { + unsubscribes.forEach(unsubscribe => unsubscribe()); + }; + } +} diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts index a04a9920df22d..7a337b5bef7d6 100644 --- a/packages/common/nbstore/src/sync/blob/index.ts +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -3,7 +3,7 @@ import { difference } from 'lodash-es'; import type { BlobRecord, BlobStorage } from '../../storage'; import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; -export class BlobSyncEngine { +export class BlobSync { private abort: AbortController | null = null; constructor( diff --git a/packages/common/nbstore/src/sync/doc/index.ts b/packages/common/nbstore/src/sync/doc/index.ts index 7b7fb63d4dc12..0585556b231af 100644 --- a/packages/common/nbstore/src/sync/doc/index.ts +++ b/packages/common/nbstore/src/sync/doc/index.ts @@ -1,7 +1,7 @@ import type { DocStorage, SyncStorage } from '../../storage'; import { DocSyncPeer } from './peer'; -export class DocSyncEngine { +export class DocSync { private readonly peers: DocSyncPeer[]; private abort: AbortController | null = null; diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts index 22b640a6c12cd..58bfea2700273 100644 --- a/packages/common/nbstore/src/sync/index.ts +++ b/packages/common/nbstore/src/sync/index.ts @@ -1,10 +1,10 @@ import type { BlobStorage, DocStorage, SpaceStorage } from '../storage'; -import { BlobSyncEngine } from './blob'; -import { DocSyncEngine } from './doc'; +import { BlobSync } from './blob'; +import { DocSync } from './doc'; -export class SyncEngine { - private readonly doc: DocSyncEngine | null; - private readonly blob: BlobSyncEngine | null; +export class Sync { + private readonly doc: DocSync | null; + private readonly blob: BlobSync | null; constructor( readonly local: SpaceStorage, @@ -16,7 +16,7 @@ export class SyncEngine { this.doc = doc && sync - ? new DocSyncEngine( + ? new DocSync( doc, sync, peers @@ -25,7 +25,7 @@ export class SyncEngine { ) : null; this.blob = blob - ? new BlobSyncEngine( + ? new BlobSync( blob, peers .map(peer => peer.tryGet('blob')) diff --git a/yarn.lock b/yarn.lock index 3862e04288d0e..69318811c8bfe 100644 --- a/yarn.lock +++ b/yarn.lock @@ -747,6 +747,7 @@ __metadata: rxjs: "npm:^7.8.1" socket.io-client: "npm:^4.8.1" vitest: "npm:2.1.8" + y-protocols: "npm:^1.0.6" yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" peerDependencies: "@affine/electron-api": "workspace:*" @@ -1957,7 +1958,14 @@ __metadata: languageName: node linkType: hard -"@babel/compat-data@npm:^7.20.5, @babel/compat-data@npm:^7.22.6, @babel/compat-data@npm:^7.25.9, @babel/compat-data@npm:^7.26.0": +"@babel/compat-data@npm:^7.20.5, @babel/compat-data@npm:^7.25.9": + version: 7.26.2 + resolution: "@babel/compat-data@npm:7.26.2" + checksum: 10/ed9eed6b62ce803ef4a320b1dac76b0302abbb29c49dddf96f3e3207d9717eb34e299a8651bb1582e9c3346ead74b6d595ffced5b3dae718afa08b18741f8402 + languageName: node + linkType: hard + +"@babel/compat-data@npm:^7.22.6, @babel/compat-data@npm:^7.26.0": version: 7.26.3 resolution: "@babel/compat-data@npm:7.26.3" checksum: 10/0bf4e491680722aa0eac26f770f2fae059f92e2ac083900b241c90a2c10f0fc80e448b1feccc2b332687fab4c3e33e9f83dee9ef56badca1fb9f3f71266d9ebf @@ -13546,6 +13554,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-darwin-arm64@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-darwin-arm64@npm:1.9.3" + conditions: os=darwin & cpu=arm64 + languageName: node + linkType: hard + "@swc/core-darwin-x64@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-darwin-x64@npm:1.10.1" @@ -13553,6 +13568,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-darwin-x64@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-darwin-x64@npm:1.9.3" + conditions: os=darwin & cpu=x64 + languageName: node + linkType: hard + "@swc/core-linux-arm-gnueabihf@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-linux-arm-gnueabihf@npm:1.10.1" @@ -13560,6 +13582,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-linux-arm-gnueabihf@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-linux-arm-gnueabihf@npm:1.9.3" + conditions: os=linux & cpu=arm + languageName: node + linkType: hard + "@swc/core-linux-arm64-gnu@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-linux-arm64-gnu@npm:1.10.1" @@ -13567,6 +13596,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-linux-arm64-gnu@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-linux-arm64-gnu@npm:1.9.3" + conditions: os=linux & cpu=arm64 & libc=glibc + languageName: node + linkType: hard + "@swc/core-linux-arm64-musl@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-linux-arm64-musl@npm:1.10.1" @@ -13574,6 +13610,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-linux-arm64-musl@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-linux-arm64-musl@npm:1.9.3" + conditions: os=linux & cpu=arm64 & libc=musl + languageName: node + linkType: hard + "@swc/core-linux-x64-gnu@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-linux-x64-gnu@npm:1.10.1" @@ -13581,6 +13624,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-linux-x64-gnu@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-linux-x64-gnu@npm:1.9.3" + conditions: os=linux & cpu=x64 & libc=glibc + languageName: node + linkType: hard + "@swc/core-linux-x64-musl@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-linux-x64-musl@npm:1.10.1" @@ -13588,6 +13638,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-linux-x64-musl@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-linux-x64-musl@npm:1.9.3" + conditions: os=linux & cpu=x64 & libc=musl + languageName: node + linkType: hard + "@swc/core-win32-arm64-msvc@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-win32-arm64-msvc@npm:1.10.1" @@ -13595,6 +13652,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-win32-arm64-msvc@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-win32-arm64-msvc@npm:1.9.3" + conditions: os=win32 & cpu=arm64 + languageName: node + linkType: hard + "@swc/core-win32-ia32-msvc@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-win32-ia32-msvc@npm:1.10.1" @@ -13602,6 +13666,13 @@ __metadata: languageName: node linkType: hard +"@swc/core-win32-ia32-msvc@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-win32-ia32-msvc@npm:1.9.3" + conditions: os=win32 & cpu=ia32 + languageName: node + linkType: hard + "@swc/core-win32-x64-msvc@npm:1.10.1": version: 1.10.1 resolution: "@swc/core-win32-x64-msvc@npm:1.10.1" @@ -13609,7 +13680,14 @@ __metadata: languageName: node linkType: hard -"@swc/core@npm:^1.10.1, @swc/core@npm:^1.7.26": +"@swc/core-win32-x64-msvc@npm:1.9.3": + version: 1.9.3 + resolution: "@swc/core-win32-x64-msvc@npm:1.9.3" + conditions: os=win32 & cpu=x64 + languageName: node + linkType: hard + +"@swc/core@npm:^1.10.1": version: 1.10.1 resolution: "@swc/core@npm:1.10.1" dependencies: @@ -13655,6 +13733,52 @@ __metadata: languageName: node linkType: hard +"@swc/core@npm:^1.7.26": + version: 1.9.3 + resolution: "@swc/core@npm:1.9.3" + dependencies: + "@swc/core-darwin-arm64": "npm:1.9.3" + "@swc/core-darwin-x64": "npm:1.9.3" + "@swc/core-linux-arm-gnueabihf": "npm:1.9.3" + "@swc/core-linux-arm64-gnu": "npm:1.9.3" + "@swc/core-linux-arm64-musl": "npm:1.9.3" + "@swc/core-linux-x64-gnu": "npm:1.9.3" + "@swc/core-linux-x64-musl": "npm:1.9.3" + "@swc/core-win32-arm64-msvc": "npm:1.9.3" + "@swc/core-win32-ia32-msvc": "npm:1.9.3" + "@swc/core-win32-x64-msvc": "npm:1.9.3" + "@swc/counter": "npm:^0.1.3" + "@swc/types": "npm:^0.1.17" + peerDependencies: + "@swc/helpers": "*" + dependenciesMeta: + "@swc/core-darwin-arm64": + optional: true + "@swc/core-darwin-x64": + optional: true + "@swc/core-linux-arm-gnueabihf": + optional: true + "@swc/core-linux-arm64-gnu": + optional: true + "@swc/core-linux-arm64-musl": + optional: true + "@swc/core-linux-x64-gnu": + optional: true + "@swc/core-linux-x64-musl": + optional: true + "@swc/core-win32-arm64-msvc": + optional: true + "@swc/core-win32-ia32-msvc": + optional: true + "@swc/core-win32-x64-msvc": + optional: true + peerDependenciesMeta: + "@swc/helpers": + optional: true + checksum: 10/0a95ce8a2d21370c82e2b0e744c30eacdbd709a7b470950786f3c25a6272c0aa079206a3543aaccc022ca98af87a2a5536387a0259b5377e94d34fac28143cd0 + languageName: node + linkType: hard + "@swc/counter@npm:^0.1.3": version: 0.1.3 resolution: "@swc/counter@npm:0.1.3" @@ -15053,9 +15177,9 @@ __metadata: linkType: hard "@ungap/structured-clone@npm:^1.0.0": - version: 1.2.1 - resolution: "@ungap/structured-clone@npm:1.2.1" - checksum: 10/6770f71e8183311b2871601ddb02d62a26373be7cf2950cb546a345a2305c75b502e36ce80166120aa2f5f1ea1562141684651ebbfcc711c58acd32035d3e545 + version: 1.2.0 + resolution: "@ungap/structured-clone@npm:1.2.0" + checksum: 10/c6fe89a505e513a7592e1438280db1c075764793a2397877ff1351721fe8792a966a5359769e30242b3cd023f2efb9e63ca2ca88019d73b564488cc20e3eab12 languageName: node linkType: hard @@ -18885,15 +19009,15 @@ __metadata: languageName: node linkType: hard -"debug@npm:4, debug@npm:^4, debug@npm:^4.0.0, debug@npm:^4.0.1, debug@npm:^4.1.0, debug@npm:^4.1.1, debug@npm:^4.3.1, debug@npm:^4.3.2, debug@npm:^4.3.3, debug@npm:^4.3.4, debug@npm:^4.3.5, debug@npm:^4.3.6, debug@npm:^4.3.7, debug@npm:^4.4.0, debug@npm:~4.4.0": - version: 4.4.0 - resolution: "debug@npm:4.4.0" +"debug@npm:4, debug@npm:^4, debug@npm:^4.0.0, debug@npm:^4.0.1, debug@npm:^4.1.0, debug@npm:^4.1.1, debug@npm:^4.3.1, debug@npm:^4.3.2, debug@npm:^4.3.3, debug@npm:^4.3.4, debug@npm:^4.3.5, debug@npm:^4.3.6, debug@npm:^4.3.7, debug@npm:~4.3.1, debug@npm:~4.3.2, debug@npm:~4.3.4": + version: 4.3.7 + resolution: "debug@npm:4.3.7" dependencies: ms: "npm:^2.1.3" peerDependenciesMeta: supports-color: optional: true - checksum: 10/1847944c2e3c2c732514b93d11886575625686056cd765336212dc15de2d2b29612b6cd80e1afba767bb8e1803b778caf9973e98169ef1a24a7a7009e1820367 + checksum: 10/71168908b9a78227ab29d5d25fe03c5867750e31ce24bf2c44a86efc5af041758bb56569b0a3d48a9b5344c00a24a777e6f4100ed6dfd9534a42c1dde285125a languageName: node linkType: hard @@ -18918,15 +19042,15 @@ __metadata: languageName: node linkType: hard -"debug@npm:~4.3.1, debug@npm:~4.3.2, debug@npm:~4.3.4": - version: 4.3.7 - resolution: "debug@npm:4.3.7" +"debug@npm:^4.4.0, debug@npm:~4.4.0": + version: 4.4.0 + resolution: "debug@npm:4.4.0" dependencies: ms: "npm:^2.1.3" peerDependenciesMeta: supports-color: optional: true - checksum: 10/71168908b9a78227ab29d5d25fe03c5867750e31ce24bf2c44a86efc5af041758bb56569b0a3d48a9b5344c00a24a777e6f4100ed6dfd9534a42c1dde285125a + checksum: 10/1847944c2e3c2c732514b93d11886575625686056cd765336212dc15de2d2b29612b6cd80e1afba767bb8e1803b778caf9973e98169ef1a24a7a7009e1820367 languageName: node linkType: hard @@ -19482,7 +19606,14 @@ __metadata: languageName: node linkType: hard -"dotenv@npm:^16.0.0, dotenv@npm:^16.3.1, dotenv@npm:^16.4.5, dotenv@npm:^16.4.7, dotenv@npm:~16.4.5": +"dotenv@npm:^16.0.0, dotenv@npm:^16.3.1, dotenv@npm:^16.4.5, dotenv@npm:~16.4.5": + version: 16.4.6 + resolution: "dotenv@npm:16.4.6" + checksum: 10/86bf758c47ec2585cf171ded83f0ff8b7327d865116ab8a327ff9ec84e35cc8f7cd3f68cef7ce6ec2907310228a4ec4f310cf701615973aba4a9b57bdbb65c59 + languageName: node + linkType: hard + +"dotenv@npm:^16.4.7": version: 16.4.7 resolution: "dotenv@npm:16.4.7" checksum: 10/f13bfe97db88f0df4ec505eeffb8925ec51f2d56a3d0b6d916964d8b4af494e6fb1633ba5d09089b552e77ab2a25de58d70259b2c5ed45ec148221835fc99a0c @@ -24490,7 +24621,14 @@ __metadata: languageName: node linkType: hard -"lilconfig@npm:^3.0.0, lilconfig@npm:^3.1.2, lilconfig@npm:^3.1.3, lilconfig@npm:~3.1.3": +"lilconfig@npm:^3.0.0, lilconfig@npm:^3.1.2": + version: 3.1.2 + resolution: "lilconfig@npm:3.1.2" + checksum: 10/8058403850cfad76d6041b23db23f730e52b6c17a8c28d87b90766639ca0ee40c748a3e85c2d7bd133d572efabff166c4b015e5d25e01fd666cb4b13cfada7f0 + languageName: node + linkType: hard + +"lilconfig@npm:^3.1.3, lilconfig@npm:~3.1.3": version: 3.1.3 resolution: "lilconfig@npm:3.1.3" checksum: 10/b932ce1af94985f0efbe8896e57b1f814a48c8dbd7fc0ef8469785c6303ed29d0090af3ccad7e36b626bfca3a4dc56cc262697e9a8dd867623cf09a39d54e4c3