From c8cafcce07a01417d1180cfc865f8e133df67db9 Mon Sep 17 00:00:00 2001 From: EYHN Date: Mon, 25 Nov 2024 11:21:11 +0800 Subject: [PATCH] feat(nbstore): new sync engine --- packages/common/nbstore/package.json | 6 +- .../common/nbstore/src/__tests__/sync.spec.ts | 85 +++ .../nbstore/src/connection/connection.ts | 19 + packages/common/nbstore/src/impls/idb/doc.ts | 33 +- .../common/nbstore/src/impls/idb/schema.ts | 1 + packages/common/nbstore/src/impls/idb/sync.ts | 31 +- .../common/nbstore/src/impls/idb/v1/doc.ts | 4 + packages/common/nbstore/src/op/consumer.ts | 15 +- packages/common/nbstore/src/op/ops.ts | 4 +- packages/common/nbstore/src/storage/doc.ts | 35 +- .../common/nbstore/src/storage/history.ts | 2 +- packages/common/nbstore/src/storage/index.ts | 12 +- packages/common/nbstore/src/storage/sync.ts | 10 +- packages/common/nbstore/src/sync/doc/index.ts | 18 + packages/common/nbstore/src/sync/doc/peer.ts | 631 ++++++++++++++++++ packages/common/nbstore/src/sync/index.ts | 23 + .../utils/__tests__/priority-queue.spec.ts | 41 ++ .../nbstore/src/utils/async-priority-queue.ts | 43 ++ packages/common/nbstore/src/utils/clock.ts | 38 ++ .../nbstore/src/utils/is-empty-update.ts | 13 + .../nbstore/src/utils/priority-queue.ts | 69 ++ .../nbstore/src/utils/throw-if-aborted.ts | 9 + yarn.lock | 4 + 23 files changed, 1105 insertions(+), 41 deletions(-) create mode 100644 packages/common/nbstore/src/__tests__/sync.spec.ts create mode 100644 packages/common/nbstore/src/sync/doc/index.ts create mode 100644 packages/common/nbstore/src/sync/doc/peer.ts create mode 100644 packages/common/nbstore/src/sync/index.ts create mode 100644 packages/common/nbstore/src/utils/__tests__/priority-queue.spec.ts create mode 100644 packages/common/nbstore/src/utils/async-priority-queue.ts create mode 100644 packages/common/nbstore/src/utils/clock.ts create mode 100644 packages/common/nbstore/src/utils/is-empty-update.ts create mode 100644 packages/common/nbstore/src/utils/priority-queue.ts create mode 100644 packages/common/nbstore/src/utils/throw-if-aborted.ts diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index 5fc2f1083e002..8d39c3491e3c8 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -11,14 +11,18 @@ "./idb/v1": "./src/impls/idb/v1/index.ts" }, "dependencies": { + "@datastructures-js/binary-search-tree": "^5.3.2", "@toeverything/infra": "workspace:*", "eventemitter2": "^6.4.9", "lodash-es": "^4.17.21", + "nanoid": "^5.0.7", "rxjs": "^7.8.1", "yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" }, "devDependencies": { - "idb": "^8.0.0" + "fake-indexeddb": "^6.0.0", + "idb": "^8.0.0", + "vitest": "2.1.4" }, "peerDependencies": { "idb": "^8.0.0" diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts new file mode 100644 index 0000000000000..82bc69989e441 --- /dev/null +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -0,0 +1,85 @@ +import 'fake-indexeddb/auto'; + +import { expect, test } from 'vitest'; +import { Doc as YDoc, encodeStateAsUpdate } from 'yjs'; + +import { IndexedDBDocStorage, IndexedDBSyncStorage } from '../impls/idb'; +import { SpaceStorage } from '../storage'; +import { SyncEngine } from '../sync'; + +test('sync', async () => { + const doc = new YDoc(); + doc.getMap('test').set('hello', 'world'); + const update = encodeStateAsUpdate(doc); + + const peerADoc = new IndexedDBDocStorage({ + id: 'ws1', + peer: 'a', + type: 'workspace', + }); + + const peerASync = new IndexedDBSyncStorage({ + id: 'ws1', + peer: 'a', + type: 'workspace', + }); + + const peerBDoc = new IndexedDBDocStorage({ + id: 'ws1', + peer: 'b', + type: 'workspace', + }); + const peerCDoc = new IndexedDBDocStorage({ + id: 'ws1', + peer: 'c', + type: 'workspace', + }); + + const peerA = new SpaceStorage([peerADoc, peerASync]); + const peerB = new SpaceStorage([peerBDoc]); + const peerC = new SpaceStorage([peerCDoc]); + + await peerA.connect(); + await peerB.connect(); + await peerC.connect(); + + await peerA.get('doc').pushDocUpdate({ + docId: 'doc1', + bin: update, + }); + + const sync = new SyncEngine(peerA, [peerB, peerC]); + const abort = new AbortController(); + sync.run(abort.signal); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + { + const b = await peerB.get('doc').getDoc('doc1'); + expect(b).not.toBeNull(); + expect(b?.bin).toEqual(update); + + const c = await peerC.get('doc').getDoc('doc1'); + expect(c).not.toBeNull(); + expect(c?.bin).toEqual(update); + } + + doc.getMap('test').set('foo', 'bar'); + const update2 = encodeStateAsUpdate(doc); + await peerC.get('doc').pushDocUpdate({ + docId: 'doc1', + bin: update2, + }); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + { + const a = await peerA.get('doc').getDoc('doc1'); + expect(a).not.toBeNull(); + expect(a?.bin).toEqual(update2); + + const c = await peerC.get('doc').getDoc('doc1'); + expect(c).not.toBeNull(); + expect(c?.bin).toEqual(update2); + } +}); diff --git a/packages/common/nbstore/src/connection/connection.ts b/packages/common/nbstore/src/connection/connection.ts index 68e045eca93b9..e5f808214b7d2 100644 --- a/packages/common/nbstore/src/connection/connection.ts +++ b/packages/common/nbstore/src/connection/connection.ts @@ -104,6 +104,25 @@ export abstract class Connection { }); } + waitForConnected(signal?: AbortSignal) { + return new Promise((resolve, reject) => { + if (this.status === 'connected') { + resolve(); + return; + } + + this.onStatusChanged(status => { + if (status === 'connected') { + resolve(); + } + }); + + signal?.addEventListener('abort', reason => { + reject(reason); + }); + }); + } + onStatusChanged( cb: (status: ConnectionStatus, error?: Error) => void ): () => void { diff --git a/packages/common/nbstore/src/impls/idb/doc.ts b/packages/common/nbstore/src/impls/idb/doc.ts index d977b83ec284f..d4ac6709c20b0 100644 --- a/packages/common/nbstore/src/impls/idb/doc.ts +++ b/packages/common/nbstore/src/impls/idb/doc.ts @@ -1,5 +1,6 @@ import { share } from '../../connection'; import { + type DocClock, type DocClocks, type DocRecord, DocStorage, @@ -14,9 +15,20 @@ export class IndexedDBDocStorage extends DocStorage { return this.connection.inner; } - override async pushDocUpdate(update: DocUpdate) { - const trx = this.db.transaction(['updates', 'clocks'], 'readwrite'); + private _lastTimestamp = new Date(0); + + private generateTimestamp() { const timestamp = new Date(); + if (timestamp.getTime() <= this._lastTimestamp.getTime()) { + timestamp.setTime(this._lastTimestamp.getTime() + 1); + } + this._lastTimestamp = timestamp; + return timestamp; + } + + override async pushDocUpdate(update: DocUpdate, origin?: string) { + const trx = this.db.transaction(['updates', 'clocks'], 'readwrite'); + const timestamp = this.generateTimestamp(); await trx.objectStore('updates').add({ ...update, createdAt: timestamp, @@ -24,6 +36,17 @@ export class IndexedDBDocStorage extends DocStorage { await trx.objectStore('clocks').put({ docId: update.docId, timestamp }); + this.emit( + 'update', + { + docId: update.docId, + bin: update.bin, + timestamp, + editor: update.editor, + }, + origin + ); + return { docId: update.docId, timestamp }; } @@ -72,6 +95,12 @@ export class IndexedDBDocStorage extends DocStorage { }, {} as DocClocks); } + override async getDocTimestamp(docId: string): Promise { + const trx = this.db.transaction('clocks', 'readonly'); + + return (await trx.store.get(docId)) ?? null; + } + protected override async setDocSnapshot( snapshot: DocRecord ): Promise { diff --git a/packages/common/nbstore/src/impls/idb/schema.ts b/packages/common/nbstore/src/impls/idb/schema.ts index 953e9f395dff8..ec65f1228f65f 100644 --- a/packages/common/nbstore/src/impls/idb/schema.ts +++ b/packages/common/nbstore/src/impls/idb/schema.ts @@ -94,6 +94,7 @@ export interface DocStorageSchema extends DBSchema { peer: string; docId: string; clock: Date; + pulledClock: Date; pushedClock: Date; }; indexes: { diff --git a/packages/common/nbstore/src/impls/idb/sync.ts b/packages/common/nbstore/src/impls/idb/sync.ts index 9eea7febac76a..ef99a479b1a1e 100644 --- a/packages/common/nbstore/src/impls/idb/sync.ts +++ b/packages/common/nbstore/src/impls/idb/sync.ts @@ -8,7 +8,7 @@ export class IndexedDBSyncStorage extends SyncStorage { return this.connection.inner; } - override async getPeerClocks(peer: string) { + override async getPeerRemoteClocks(peer: string) { const trx = this.db.transaction('peerClocks', 'readonly'); const records = await trx.store.index('peer').getAll(peer); @@ -19,7 +19,7 @@ export class IndexedDBSyncStorage extends SyncStorage { }, {} as DocClocks); } - override async setPeerClock(peer: string, clock: DocClock) { + override async setPeerRemoteClock(peer: string, clock: DocClock) { const trx = this.db.transaction('peerClocks', 'readwrite'); const record = await trx.store.get([peer, clock.docId]); @@ -28,6 +28,32 @@ export class IndexedDBSyncStorage extends SyncStorage { peer, docId: clock.docId, clock: clock.timestamp, + pulledClock: record?.pulledClock ?? new Date(0), + pushedClock: record?.pushedClock ?? new Date(0), + }); + } + } + + override async getPeerPulledRemoteClocks(peer: string) { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const records = await trx.store.index('peer').getAll(peer); + + return records.reduce((clocks, { docId, pulledClock }) => { + clocks[docId] = pulledClock; + return clocks; + }, {} as DocClocks); + } + override async setPeerPulledRemoteClock(peer: string, clock: DocClock) { + const trx = this.db.transaction('peerClocks', 'readwrite'); + const record = await trx.store.get([peer, clock.docId]); + + if (!record || record.pulledClock < clock.timestamp) { + await trx.store.put({ + peer, + docId: clock.docId, + clock: record?.clock ?? new Date(0), + pulledClock: clock.timestamp, pushedClock: record?.pushedClock ?? new Date(0), }); } @@ -54,6 +80,7 @@ export class IndexedDBSyncStorage extends SyncStorage { docId: clock.docId, clock: record?.clock ?? new Date(0), pushedClock: clock.timestamp, + pulledClock: record?.pulledClock ?? new Date(0), }); } } diff --git a/packages/common/nbstore/src/impls/idb/v1/doc.ts b/packages/common/nbstore/src/impls/idb/v1/doc.ts index 2660c2d608df1..7dc538830b8c7 100644 --- a/packages/common/nbstore/src/impls/idb/v1/doc.ts +++ b/packages/common/nbstore/src/impls/idb/v1/doc.ts @@ -57,6 +57,10 @@ export class IndexedDBV1DocStorage extends DocStorage { return {}; } + override async getDocTimestamp(_docId: string) { + return null; + } + protected override async setDocSnapshot(): Promise { return false; } diff --git a/packages/common/nbstore/src/op/consumer.ts b/packages/common/nbstore/src/op/consumer.ts index 80764e855c696..f1f61c3e103b5 100644 --- a/packages/common/nbstore/src/op/consumer.ts +++ b/packages/common/nbstore/src/op/consumer.ts @@ -69,10 +69,9 @@ export class SpaceStorageConsumer extends SpaceStorage { this.consumer.register('getDocDiff', ({ docId, state }) => { return storage.getDocDiff(docId, state); }); - this.consumer.register( - 'pushDocUpdate', - storage.pushDocUpdate.bind(storage) - ); + this.consumer.register('pushDocUpdate', ({ update, origin }) => { + return storage.pushDocUpdate(update, origin); + }); this.consumer.register( 'getDocTimestamps', storage.getDocTimestamps.bind(storage) @@ -81,8 +80,8 @@ export class SpaceStorageConsumer extends SpaceStorage { this.consumer.register('subscribeDocUpdate', () => { return new Observable(subscriber => { subscriber.add( - storage.subscribeDocUpdate(update => { - subscriber.next(update); + storage.subscribeDocUpdate((update, origin) => { + subscriber.next({ update, origin }); }) ); }); @@ -117,10 +116,10 @@ export class SpaceStorageConsumer extends SpaceStorage { private registerSyncHandlers(storage: SyncStorage) { this.consumer.register( 'getPeerClocks', - storage.getPeerClocks.bind(storage) + storage.getPeerRemoteClocks.bind(storage) ); this.consumer.register('setPeerClock', ({ peer, ...clock }) => { - return storage.setPeerClock(peer, clock); + return storage.setPeerRemoteClock(peer, clock); }); this.consumer.register( 'getPeerPushedClocks', diff --git a/packages/common/nbstore/src/op/ops.ts b/packages/common/nbstore/src/op/ops.ts index 10ac262774009..6509acc5f474a 100644 --- a/packages/common/nbstore/src/op/ops.ts +++ b/packages/common/nbstore/src/op/ops.ts @@ -31,10 +31,10 @@ export interface SpaceStorageOps extends OpSchema { // doc getDoc: [string, DocRecord | null]; getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null]; - pushDocUpdate: [DocUpdate, DocClock]; + pushDocUpdate: [{ update: DocUpdate; origin?: string }, DocClock]; getDocTimestamps: [Date, DocClocks]; deleteDoc: [string, void]; - subscribeDocUpdate: [void, DocRecord]; + subscribeDocUpdate: [void, { update: DocRecord; origin?: string }]; // history listHistory: [{ docId: string; filter?: HistoryFilter }, ListedHistory[]]; diff --git a/packages/common/nbstore/src/storage/doc.ts b/packages/common/nbstore/src/storage/doc.ts index 876ffad53dffc..a264bd346e9c7 100644 --- a/packages/common/nbstore/src/storage/doc.ts +++ b/packages/common/nbstore/src/storage/doc.ts @@ -1,6 +1,7 @@ import EventEmitter2 from 'eventemitter2'; import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs'; +import { isEmptyUpdate } from '../utils/is-empty-update'; import type { Lock } from './lock'; import { SingletonLocker } from './lock'; import { Storage, type StorageOptions } from './storage'; @@ -43,23 +44,6 @@ export abstract class DocStorage< override readonly storageType = 'doc'; private readonly locker = new SingletonLocker(); - /** - * Tell a binary is empty yjs binary or not. - * - * NOTE: - * `[0, 0]` is empty yjs update binary - * `[0]` is empty yjs state vector binary - */ - isEmptyBin(bin: Uint8Array): boolean { - return ( - bin.length === 0 || - // 0x0 for state vector - (bin.length === 1 && bin[0] === 0) || - // 0x00 for update - (bin.length === 2 && bin[0] === 0 && bin[1] === 0) - ); - } - // REGION: open apis by Op system /** * Get a doc record with latest binary. @@ -114,8 +98,15 @@ export abstract class DocStorage< /** * Push updates into storage + * + * @param origin - Internal identifier to recognize the source in the "update" event. Will not be stored or transferred. + */ + abstract pushDocUpdate(update: DocUpdate, origin?: string): Promise; + + /** + * Get the timestamp of the latest update of a doc. */ - abstract pushDocUpdate(update: DocUpdate): Promise; + abstract getDocTimestamp(docId: string): Promise; /** * Get all docs timestamps info. especially for useful in sync process. @@ -140,7 +131,7 @@ export abstract class DocStorage< * But for Cloud storage, there will be updates broadcasted from other clients, * so the storage will emit updates to notify the client to integrate them. */ - subscribeDocUpdate(callback: (update: DocRecord) => void) { + subscribeDocUpdate(callback: (update: DocRecord, origin?: string) => void) { this.event.on('update', callback); return () => { @@ -152,7 +143,7 @@ export abstract class DocStorage< // REGION: api for internal usage protected on( event: 'update', - callback: (update: DocRecord) => void + callback: (update: DocRecord, origin: string) => void ): () => void; protected on( event: 'snapshot', @@ -165,7 +156,7 @@ export abstract class DocStorage< }; } - protected emit(event: 'update', update: DocRecord): void; + protected emit(event: 'update', update: DocRecord, origin?: string): void; protected emit( event: 'snapshot', snapshot: DocRecord, @@ -249,7 +240,7 @@ export abstract class DocStorage< protected mergeUpdates(updates: Uint8Array[]) { const merge = this.options?.mergeUpdates ?? mergeUpdates; - return merge(updates.filter(bin => !this.isEmptyBin(bin))); + return merge(updates.filter(bin => !isEmptyUpdate(bin))); } protected async lockDocForUpdate(docId: string): Promise { diff --git a/packages/common/nbstore/src/storage/history.ts b/packages/common/nbstore/src/storage/history.ts index 4f7d52c4f3571..f112b3c83be3e 100644 --- a/packages/common/nbstore/src/storage/history.ts +++ b/packages/common/nbstore/src/storage/history.ts @@ -86,7 +86,7 @@ export abstract class HistoricalDocStorage< } const change = this.generateRevertUpdate(fromSnapshot.bin, toSnapshot.bin); - await this.pushDocUpdate({ docId, bin: change, editor }); + await this.pushDocUpdate({ docId, bin: change, editor }, 'rollback'); // force create a new history record after rollback await this.createHistory(docId, fromSnapshot); } diff --git a/packages/common/nbstore/src/storage/index.ts b/packages/common/nbstore/src/storage/index.ts index 220af70e754a1..33d13fcd28f51 100644 --- a/packages/common/nbstore/src/storage/index.ts +++ b/packages/common/nbstore/src/storage/index.ts @@ -2,6 +2,9 @@ import EventEmitter2 from 'eventemitter2'; import type { ConnectionStatus } from '../connection'; import { type Storage, type StorageType } from '../storage'; +import type { BlobStorage } from './blob'; +import type { DocStorage } from './doc'; +import type { SyncStorage } from './sync'; export class SpaceStorage { protected readonly storages: Map = new Map(); @@ -14,12 +17,18 @@ export class SpaceStorage { ); } + tryGet(type: 'blob'): BlobStorage | undefined; + tryGet(type: 'sync'): SyncStorage | undefined; + tryGet(type: 'doc'): DocStorage | undefined; tryGet(type: StorageType) { return this.storages.get(type); } + get(type: 'blob'): BlobStorage; + get(type: 'sync'): SyncStorage; + get(type: 'doc'): DocStorage; get(type: StorageType) { - const storage = this.tryGet(type); + const storage = this.storages.get(type); if (!storage) { throw new Error(`Storage ${type} not registered.`); @@ -31,6 +40,7 @@ export class SpaceStorage { async connect() { await Promise.allSettled( Array.from(this.storages.values()).map(async storage => { + // FIXME: multiple calls will register multiple listeners this.disposables.add( storage.connection.onStatusChanged((status, error) => { this.event.emit('connection', { diff --git a/packages/common/nbstore/src/storage/sync.ts b/packages/common/nbstore/src/storage/sync.ts index b2ad2744335de..605f1ac92f4ca 100644 --- a/packages/common/nbstore/src/storage/sync.ts +++ b/packages/common/nbstore/src/storage/sync.ts @@ -8,8 +8,14 @@ export abstract class SyncStorage< > extends Storage { override readonly storageType = 'sync'; - abstract getPeerClocks(peer: string): Promise; - abstract setPeerClock(peer: string, clock: DocClock): Promise; + abstract getPeerRemoteClocks(peer: string): Promise; + abstract setPeerRemoteClock(peer: string, clock: DocClock): Promise; + abstract getPeerPulledRemoteClocks(peer: string): Promise; + + abstract setPeerPulledRemoteClock( + peer: string, + clock: DocClock + ): Promise; abstract getPeerPushedClocks(peer: string): Promise; abstract setPeerPushedClock(peer: string, clock: DocClock): Promise; abstract clearClocks(): Promise; diff --git a/packages/common/nbstore/src/sync/doc/index.ts b/packages/common/nbstore/src/sync/doc/index.ts new file mode 100644 index 0000000000000..1b46fc0cafb3a --- /dev/null +++ b/packages/common/nbstore/src/sync/doc/index.ts @@ -0,0 +1,18 @@ +import type { DocStorage, SyncStorage } from '../../storage'; +import { DocSyncPeer } from './peer'; + +export class DocSyncEngine { + constructor( + readonly local: DocStorage, + readonly sync: SyncStorage, + readonly peers: DocStorage[] + ) {} + + async run(signal?: AbortSignal) { + await Promise.all( + this.peers.map(peer => + new DocSyncPeer(this.local, this.sync, peer).mainLoop(signal) + ) + ); + } +} diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts new file mode 100644 index 0000000000000..4c689068be4e9 --- /dev/null +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -0,0 +1,631 @@ +import { remove } from 'lodash-es'; +import { nanoid } from 'nanoid'; +import { Subject } from 'rxjs'; +import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs'; + +import type { DocStorage, SyncStorage } from '../../storage'; +import { AsyncPriorityQueue } from '../../utils/async-priority-queue'; +import { ClockMap } from '../../utils/clock'; +import { isEmptyUpdate } from '../../utils/is-empty-update'; +import { throwIfAborted } from '../../utils/throw-if-aborted'; + +type Job = + | { + type: 'connect'; + docId: string; + } + | { + type: 'push'; + docId: string; + update: Uint8Array; + clock: Date; + } + | { + type: 'pull'; + docId: string; + } + | { + type: 'pullAndPush'; + docId: string; + } + | { + type: 'save'; + docId: string; + update?: Uint8Array; + serverClock: Date; + }; + +interface Status { + docs: Set; + connectedDocs: Set; + jobDocQueue: AsyncPriorityQueue; + jobMap: Map; + remoteClocks: ClockMap; + pulledRemoteClocks: ClockMap; + pushedClocks: ClockMap; + syncing: boolean; + retrying: boolean; + errorMessage: string | null; +} + +interface DocSyncPeerOptions { + mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; +} + +function createJobErrorCatcher< + Jobs extends Record Promise>, +>(jobs: Jobs): Jobs { + return Object.fromEntries( + Object.entries(jobs).map(([k, fn]) => { + return [ + k, + async (docId, ...args) => { + try { + await fn(docId, ...args); + } catch (err) { + if (err instanceof Error) { + throw new Error( + `Error in job "${k}": ${err.stack || err.message}` + ); + } else { + throw err; + } + } + }, + ]; + }) + ) as Jobs; +} + +export class DocSyncPeer { + /** + * random unique id for recognize self in "update" event + */ + private readonly uniqueId = nanoid(); + private readonly prioritySettings = new Map(); + + constructor( + readonly local: DocStorage, + readonly syncMetadata: SyncStorage, + readonly remote: DocStorage, + readonly options: DocSyncPeerOptions = {} + ) {} + + private status: Status = { + docs: new Set(), + connectedDocs: new Set(), + jobDocQueue: new AsyncPriorityQueue(), + jobMap: new Map(), + remoteClocks: new ClockMap(new Map()), + pulledRemoteClocks: new ClockMap(new Map()), + pushedClocks: new ClockMap(new Map()), + syncing: false, + retrying: false, + errorMessage: null, + }; + private readonly statusUpdatedSubject$ = new Subject(); + + private readonly jobs = createJobErrorCatcher({ + connect: async (docId: string, signal?: AbortSignal) => { + const pushedClock = this.status.pushedClocks.get(docId); + const clock = await this.local.getDocTimestamp(docId); + + throwIfAborted(signal); + if (pushedClock === null || pushedClock !== clock?.timestamp) { + await this.jobs.pullAndPush(docId, signal); + } else { + const pulled = this.status.pulledRemoteClocks.get(docId); + if (pulled === null || pulled !== this.status.remoteClocks.get(docId)) { + await this.jobs.pull(docId, signal); + } + } + + this.status.connectedDocs.add(docId); + this.statusUpdatedSubject$.next(docId); + }, + push: async ( + docId: string, + jobs: (Job & { type: 'push' })[], + signal?: AbortSignal + ) => { + if (this.status.connectedDocs.has(docId)) { + const maxClock = jobs.reduce( + (a, b) => (a.getTime() > b.clock.getTime() ? a : b.clock), + new Date(0) + ); + const merged = await this.mergeUpdates( + jobs.map(j => j.update).filter(update => !isEmptyUpdate(update)) + ); + if (!isEmptyUpdate(merged)) { + const { timestamp } = await this.remote.pushDocUpdate( + { + docId, + bin: merged, + }, + this.uniqueId + ); + this.schedule({ + type: 'save', + docId, + serverClock: timestamp, + }); + } + throwIfAborted(signal); + await this.actions.updatePushedClock(docId, maxClock); + } + }, + pullAndPush: async (docId: string, signal?: AbortSignal) => { + const docRecord = await this.local.getDoc(docId); + + const stateVector = + docRecord && !isEmptyUpdate(docRecord.bin) + ? encodeStateVectorFromUpdate(docRecord.bin) + : new Uint8Array(); + const remoteDocRecord = await this.remote.getDocDiff(docId, stateVector); + + if (remoteDocRecord) { + const { + missing: newData, + state: serverStateVector, + timestamp: serverClock, + } = remoteDocRecord; + this.schedule({ + type: 'save', + docId, + serverClock, + }); + throwIfAborted(signal); + const { timestamp: localClock } = await this.local.pushDocUpdate( + { + bin: newData, + docId, + }, + this.uniqueId + ); + throwIfAborted(signal); + await this.actions.updatePulledRemoteClock(docId, serverClock); + const diff = + docRecord && serverStateVector && serverStateVector.length > 0 + ? diffUpdate(docRecord.bin, serverStateVector) + : docRecord?.bin; + if (diff && !isEmptyUpdate(diff)) { + throwIfAborted(signal); + const { timestamp: serverClock } = await this.remote.pushDocUpdate( + { + bin: diff, + docId, + }, + this.uniqueId + ); + this.schedule({ + type: 'save', + docId, + serverClock, + }); + } + throwIfAborted(signal); + await this.actions.updatePushedClock(docId, localClock); + } else { + if (docRecord) { + if (!isEmptyUpdate(docRecord.bin)) { + throwIfAborted(signal); + const { timestamp: serverClock } = await this.remote.pushDocUpdate( + { + bin: docRecord.bin, + docId, + }, + this.uniqueId + ); + this.schedule({ + type: 'save', + docId, + serverClock, + }); + } + await this.actions.updatePushedClock(docId, docRecord.timestamp); + } + } + }, + pull: async (docId: string, signal?: AbortSignal) => { + const docRecord = await this.local.getDoc(docId); + + const stateVector = + docRecord && !isEmptyUpdate(docRecord.bin) + ? encodeStateVectorFromUpdate(docRecord.bin) + : new Uint8Array(); + const serverDoc = await this.remote.getDocDiff(docId, stateVector); + if (!serverDoc) { + return; + } + const { missing: newData, timestamp: serverClock } = serverDoc; + throwIfAborted(signal); + await this.local.pushDocUpdate( + { + docId, + bin: newData, + }, + this.uniqueId + ); + throwIfAborted(signal); + await this.actions.updatePulledRemoteClock(docId, serverClock); + this.schedule({ + type: 'save', + docId, + serverClock, + }); + }, + save: async ( + docId: string, + jobs: (Job & { type: 'save' })[], + signal?: AbortSignal + ) => { + const serverClock = jobs.reduce( + (a, b) => (a.getTime() > b.serverClock.getTime() ? a : b.serverClock), + new Date(0) + ); + if (this.status.connectedDocs.has(docId)) { + const data = jobs + .map(j => j.update) + .filter((update): update is Uint8Array => + update ? !isEmptyUpdate(update) : false + ); + const update = + data.length > 0 ? await this.mergeUpdates(data) : new Uint8Array(); + + throwIfAborted(signal); + await this.local.pushDocUpdate( + { + docId, + bin: update, + }, + this.uniqueId + ); + throwIfAborted(signal); + + await this.actions.updatePulledRemoteClock(docId, serverClock); + } + }, + }); + + private readonly actions = { + updateRemoteClock: async (docId: string, remoteClock: Date) => { + const updated = this.status.remoteClocks.setIfBigger(docId, remoteClock); + if (updated) { + await this.syncMetadata.setPeerRemoteClock(this.remote.peer, { + docId, + timestamp: remoteClock, + }); + this.statusUpdatedSubject$.next(docId); + } + }, + updatePushedClock: async (docId: string, pushedClock: Date) => { + const updated = this.status.pushedClocks.setIfBigger(docId, pushedClock); + if (updated) { + await this.syncMetadata.setPeerPushedClock(this.remote.peer, { + docId, + timestamp: pushedClock, + }); + this.statusUpdatedSubject$.next(docId); + } + }, + updatePulledRemoteClock: async (docId: string, pulledClock: Date) => { + const updated = this.status.pulledRemoteClocks.setIfBigger( + docId, + pulledClock + ); + if (updated) { + await this.syncMetadata.setPeerRemoteClock(this.remote.peer, { + docId, + timestamp: pulledClock, + }); + this.statusUpdatedSubject$.next(docId); + } + }, + addDoc: (docId: string) => { + if (!this.status.docs.has(docId)) { + this.status.docs.add(docId); + this.statusUpdatedSubject$.next(docId); + this.schedule({ + type: 'connect', + docId, + }); + } + }, + }; + + readonly events = { + localUpdated: ({ + docId, + update, + clock, + }: { + docId: string; + update: Uint8Array; + clock: Date; + }) => { + // try add doc for new doc + this.actions.addDoc(docId); + + // schedule push job + this.schedule({ + type: 'push', + docId, + clock, + update, + }); + }, + remoteUpdated: ({ + docId, + update, + remoteClock, + }: { + docId: string; + update: Uint8Array; + remoteClock: Date; + }) => { + // try add doc for new doc + this.actions.addDoc(docId); + + // schedule push job + this.schedule({ + type: 'save', + docId, + serverClock: remoteClock, + update, + }); + }, + }; + + async mainLoop(signal?: AbortSignal) { + // eslint-disable-next-line no-constant-condition + while (true) { + try { + await this.retryLoop(signal); + } catch (err) { + if (signal?.aborted) { + return; + } + console.warn('Sync error, retry in 5s', err); + this.status.errorMessage = + err instanceof Error ? err.message : `${err}`; + this.statusUpdatedSubject$.next(true); + } finally { + // reset all status + this.status = { + docs: new Set(), + connectedDocs: new Set(), + jobDocQueue: new AsyncPriorityQueue(), + jobMap: new Map(), + pulledRemoteClocks: new ClockMap(new Map()), + pushedClocks: new ClockMap(new Map()), + remoteClocks: new ClockMap(new Map()), + syncing: false, + // tell ui to show retrying status + retrying: true, + // error message from last retry + errorMessage: this.status.errorMessage, + }; + this.statusUpdatedSubject$.next(true); + } + // wait for 1s before next retry + await Promise.race([ + new Promise(resolve => { + setTimeout(resolve, 1000); + }), + new Promise((_, reject) => { + // exit if manually stopped + if (signal?.aborted) { + reject(signal.reason); + } + signal?.addEventListener('abort', () => { + reject(signal.reason); + }); + }), + ]); + } + } + + private async retryLoop(signal?: AbortSignal) { + throwIfAborted(signal); + const abort = new AbortController(); + + signal?.addEventListener('abort', reason => { + abort.abort(reason); + }); + + signal = abort.signal; + + const disposes: (() => void)[] = []; + + try { + console.info('Remote sync started'); + this.status.syncing = true; + this.statusUpdatedSubject$.next(true); + + // wait for all storages to connect, timeout after 30s + await Promise.race([ + Promise.all([ + this.local.connection.waitForConnected(signal), + this.remote.connection.waitForConnected(signal), + this.syncMetadata.connection.waitForConnected(signal), + ]), + new Promise((_, reject) => { + setTimeout(() => { + reject(new Error('Connect to remote timeout')); + }, 1000 * 30); + }), + new Promise((_, reject) => { + signal?.addEventListener('abort', reason => { + reject(reason); + }); + }), + ]); + + // throw error if failed to connect + for (const storage of [this.remote, this.local, this.syncMetadata]) { + // abort if disconnected + disposes.push( + storage.connection.onStatusChanged((_status, error) => { + abort.abort('Storage disconnected:' + error); + }) + ); + } + + // reset retrying flag after connected with server + this.status.retrying = false; + this.statusUpdatedSubject$.next(true); + + // subscribe local doc updates + disposes.push( + this.local.subscribeDocUpdate((update, origin) => { + if (origin === this.uniqueId) { + return; + } + this.events.localUpdated({ + docId: update.docId, + clock: update.timestamp, + update: update.bin, + }); + }) + ); + // subscribe remote doc updates + disposes.push( + this.remote.subscribeDocUpdate(({ bin, docId, timestamp }, origin) => { + if (origin === this.uniqueId) { + return; + } + this.events.remoteUpdated({ + docId, + update: bin, + remoteClock: timestamp, + }); + }) + ); + + // add all docs from local + const localDocs = Object.keys(await this.local.getDocTimestamps()); + throwIfAborted(signal); + for (const docId of localDocs) { + this.actions.addDoc(docId); + } + + // get cached clocks from metadata + const cachedClocks = await this.syncMetadata.getPeerRemoteClocks( + this.remote.peer + ); + throwIfAborted(signal); + for (const [id, v] of Object.entries(cachedClocks)) { + this.status.remoteClocks.set(id, v); + } + const pulledClocks = await this.syncMetadata.getPeerPulledRemoteClocks( + this.remote.peer + ); + for (const [id, v] of Object.entries(pulledClocks)) { + this.status.pulledRemoteClocks.set(id, v); + } + const pushedClocks = await this.syncMetadata.getPeerPushedClocks( + this.remote.peer + ); + throwIfAborted(signal); + for (const [id, v] of Object.entries(pushedClocks)) { + this.status.pushedClocks.set(id, v); + } + this.statusUpdatedSubject$.next(true); + + // get new clocks from server + const maxClockValue = this.status.remoteClocks.max; + const newClocks = await this.remote.getDocTimestamps(maxClockValue); + for (const [id, v] of Object.entries(newClocks)) { + await this.actions.updateRemoteClock(id, v); + } + + // add all docs from remote + for (const docId of this.status.remoteClocks.keys()) { + this.actions.addDoc(docId); + } + + // begin to process jobs + // eslint-disable-next-line no-constant-condition + while (true) { + throwIfAborted(signal); + + const docId = await this.status.jobDocQueue.asyncPop(signal); + // eslint-disable-next-line no-constant-condition + while (true) { + // batch process jobs for the same doc + const jobs = this.status.jobMap.get(docId); + if (!jobs || jobs.length === 0) { + this.status.jobMap.delete(docId); + this.statusUpdatedSubject$.next(docId); + break; + } + + const connect = remove(jobs, j => j.type === 'connect'); + if (connect && connect.length > 0) { + await this.jobs.connect(docId, signal); + continue; + } + + const pullAndPush = remove(jobs, j => j.type === 'pullAndPush'); + if (pullAndPush && pullAndPush.length > 0) { + await this.jobs.pullAndPush(docId, signal); + continue; + } + + const pull = remove(jobs, j => j.type === 'pull'); + if (pull && pull.length > 0) { + await this.jobs.pull(docId, signal); + continue; + } + + const push = remove(jobs, j => j.type === 'push'); + if (push && push.length > 0) { + await this.jobs.push( + docId, + push as (Job & { type: 'push' })[], + signal + ); + continue; + } + + const save = remove(jobs, j => j.type === 'save'); + if (save && save.length > 0) { + await this.jobs.save( + docId, + save as (Job & { type: 'save' })[], + signal + ); + continue; + } + } + } + } finally { + for (const dispose of disposes) { + dispose(); + } + this.status.syncing = false; + console.info('Remote sync ended'); + } + } + + private schedule(job: Job) { + const priority = this.prioritySettings.get(job.docId) ?? 0; + this.status.jobDocQueue.push(job.docId, priority); + + const existingJobs = this.status.jobMap.get(job.docId) ?? []; + existingJobs.push(job); + this.status.jobMap.set(job.docId, existingJobs); + this.statusUpdatedSubject$.next(job.docId); + } + + setPriority(docId: string, priority: number) { + this.prioritySettings.set(docId, priority); + this.status.jobDocQueue.updatePriority(docId, priority); + } + + protected mergeUpdates(updates: Uint8Array[]) { + const merge = this.options?.mergeUpdates ?? mergeUpdates; + + return merge(updates.filter(bin => !isEmptyUpdate(bin))); + } +} diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts new file mode 100644 index 0000000000000..857fd62a6c566 --- /dev/null +++ b/packages/common/nbstore/src/sync/index.ts @@ -0,0 +1,23 @@ +import type { DocStorage, SpaceStorage } from '../storage'; +import { DocSyncEngine } from './doc'; + +export class SyncEngine { + constructor( + readonly local: SpaceStorage, + readonly peers: SpaceStorage[] + ) {} + + async run(signal?: AbortSignal) { + const doc = this.local.tryGet('doc'); + const sync = this.local.tryGet('sync'); + + if (doc && sync) { + const peerDocs = this.peers + .map(peer => peer.tryGet('doc')) + .filter((v): v is DocStorage => !!v); + + const engine = new DocSyncEngine(doc, sync, peerDocs); + await engine.run(signal); + } + } +} diff --git a/packages/common/nbstore/src/utils/__tests__/priority-queue.spec.ts b/packages/common/nbstore/src/utils/__tests__/priority-queue.spec.ts new file mode 100644 index 0000000000000..f840c7d65dacb --- /dev/null +++ b/packages/common/nbstore/src/utils/__tests__/priority-queue.spec.ts @@ -0,0 +1,41 @@ +import { describe, expect, test } from 'vitest'; + +import { PriorityQueue } from '../priority-queue'; + +describe('Priority Queue', () => { + test('priority', () => { + const queue = new PriorityQueue(); + + queue.push('foo', 1); + queue.push('bar', 2); + queue.push('baz', 0); + + expect(queue.pop()).toBe('bar'); + expect(queue.pop()).toBe('foo'); + expect(queue.pop()).toBe('baz'); + expect(queue.pop()).toBe(null); + + queue.push('B', 1); + queue.push('A', 1); + + // if priority same then follow id binary order + expect(queue.pop()).toBe('B'); + expect(queue.pop()).toBe('A'); + expect(queue.pop()).toBe(null); + + queue.push('A', 1); + queue.push('B', 2); + queue.push('A', 3); // same id but different priority, update the priority + + expect(queue.pop()).toBe('A'); + expect(queue.pop()).toBe('B'); + expect(queue.pop()).toBe(null); + + queue.push('A', 1); + queue.push('B', 2); + queue.remove('B'); + + expect(queue.pop()).toBe('A'); + expect(queue.pop()).toBe(null); + }); +}); diff --git a/packages/common/nbstore/src/utils/async-priority-queue.ts b/packages/common/nbstore/src/utils/async-priority-queue.ts new file mode 100644 index 0000000000000..14ed54c997f21 --- /dev/null +++ b/packages/common/nbstore/src/utils/async-priority-queue.ts @@ -0,0 +1,43 @@ +import { PriorityQueue } from './priority-queue'; + +export class AsyncPriorityQueue extends PriorityQueue { + private _resolveUpdate: (() => void) | null = null; + private _waitForUpdate: Promise | null = null; + + async asyncPop(abort?: AbortSignal): Promise { + const update = this.pop(); + if (update) { + return update; + } else { + if (!this._waitForUpdate) { + this._waitForUpdate = new Promise(resolve => { + this._resolveUpdate = resolve; + }); + } + + await Promise.race([ + this._waitForUpdate, + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + + return this.asyncPop(abort); + } + } + + override push(id: string, priority: number = 0) { + super.push(id, priority); + if (this._resolveUpdate) { + const resolve = this._resolveUpdate; + this._resolveUpdate = null; + this._waitForUpdate = null; + resolve(); + } + } +} diff --git a/packages/common/nbstore/src/utils/clock.ts b/packages/common/nbstore/src/utils/clock.ts new file mode 100644 index 0000000000000..849f69f31d730 --- /dev/null +++ b/packages/common/nbstore/src/utils/clock.ts @@ -0,0 +1,38 @@ +export class ClockMap { + max: Date = new Date(0); + constructor(private readonly map: Map) { + for (const value of map.values()) { + if (value.getTime() > this.max.getTime()) { + this.max = value; + } + } + } + + get(id: string): Date { + return this.map.get(id) ?? new Date(0); + } + + set(id: string, value: Date) { + this.map.set(id, value); + if (value.getTime() > this.max.getTime()) { + this.max = value; + } + } + + setIfBigger(id: string, value: Date) { + if (value.getTime() > this.get(id).getTime()) { + this.set(id, value); + return true; + } + return false; + } + + clear() { + this.map.clear(); + this.max = new Date(0); + } + + keys() { + return Array.from(this.map.keys()); + } +} diff --git a/packages/common/nbstore/src/utils/is-empty-update.ts b/packages/common/nbstore/src/utils/is-empty-update.ts new file mode 100644 index 0000000000000..7bd5826281ed0 --- /dev/null +++ b/packages/common/nbstore/src/utils/is-empty-update.ts @@ -0,0 +1,13 @@ +/** + * Tell a binary is empty yjs binary or not. + * + * NOTE: + * `[0, 0]` is empty yjs update binary + * `[0]` is empty yjs state vector binary + */ +export function isEmptyUpdate(binary: Uint8Array) { + return ( + binary.byteLength === 0 || + (binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0) + ); +} diff --git a/packages/common/nbstore/src/utils/priority-queue.ts b/packages/common/nbstore/src/utils/priority-queue.ts new file mode 100644 index 0000000000000..0c38fca444ab9 --- /dev/null +++ b/packages/common/nbstore/src/utils/priority-queue.ts @@ -0,0 +1,69 @@ +import { BinarySearchTree } from '@datastructures-js/binary-search-tree'; + +export class PriorityQueue { + tree = new BinarySearchTree<{ id: string; priority: number }>((a, b) => { + return a.priority === b.priority + ? a.id === b.id + ? 0 + : a.id > b.id + ? 1 + : -1 + : a.priority - b.priority; + }); + priorityMap = new Map(); + + push(id: string, priority: number = 0) { + const oldPriority = this.priorityMap.get(id); + if (oldPriority === priority) { + return; + } + if (oldPriority !== undefined) { + this.remove(id); + } + this.tree.insert({ id, priority }); + this.priorityMap.set(id, priority); + } + + pop() { + const node = this.tree.max(); + + if (!node) { + return null; + } + + this.tree.removeNode(node); + + const { id } = node.getValue(); + this.priorityMap.delete(id); + + return id; + } + + remove(id: string, priority?: number) { + priority ??= this.priorityMap.get(id); + if (priority === undefined) { + return false; + } + const removed = this.tree.remove({ id, priority }); + if (removed) { + this.priorityMap.delete(id); + } + + return removed; + } + + clear() { + this.tree.clear(); + this.priorityMap.clear(); + } + + updatePriority(id: string, priority: number) { + if (this.remove(id)) { + this.push(id, priority); + } + } + + get length() { + return this.tree.count; + } +} diff --git a/packages/common/nbstore/src/utils/throw-if-aborted.ts b/packages/common/nbstore/src/utils/throw-if-aborted.ts new file mode 100644 index 0000000000000..54e2c81ac9c0f --- /dev/null +++ b/packages/common/nbstore/src/utils/throw-if-aborted.ts @@ -0,0 +1,9 @@ +// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill +export function throwIfAborted(abort?: AbortSignal) { + if (abort?.aborted) { + throw new Error(abort.reason); + } + return true; +} + +export const MANUALLY_STOP = 'manually-stop'; diff --git a/yarn.lock b/yarn.lock index f8fe4106d112b..89c2bb1d7492b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -727,11 +727,15 @@ __metadata: version: 0.0.0-use.local resolution: "@affine/nbstore@workspace:packages/common/nbstore" dependencies: + "@datastructures-js/binary-search-tree": "npm:^5.3.2" "@toeverything/infra": "workspace:*" eventemitter2: "npm:^6.4.9" + fake-indexeddb: "npm:^6.0.0" idb: "npm:^8.0.0" lodash-es: "npm:^4.17.21" + nanoid: "npm:^5.0.7" rxjs: "npm:^7.8.1" + vitest: "npm:2.1.4" yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" peerDependencies: idb: ^8.0.0