From fe9f2521c0040141c29c93634822d1d3187c842a Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 3 Dec 2024 16:32:16 +0900 Subject: [PATCH] feat(nbstore): add blob sync --- .../common/nbstore/src/__tests__/sync.spec.ts | 74 ++++++++++++++- packages/common/nbstore/src/op/consumer.ts | 8 +- packages/common/nbstore/src/storage/blob.ts | 14 +-- .../common/nbstore/src/sync/blob/index.ts | 89 +++++++++++++++++++ packages/common/nbstore/src/sync/doc/peer.ts | 14 ++- packages/common/nbstore/src/sync/index.ts | 32 +++++-- .../nbstore/src/utils/priority-queue.ts | 2 +- 7 files changed, 214 insertions(+), 19 deletions(-) create mode 100644 packages/common/nbstore/src/sync/blob/index.ts diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index 82bc69989e441..bbff7c3054b3c 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -3,11 +3,15 @@ import 'fake-indexeddb/auto'; import { expect, test } from 'vitest'; import { Doc as YDoc, encodeStateAsUpdate } from 'yjs'; -import { IndexedDBDocStorage, IndexedDBSyncStorage } from '../impls/idb'; +import { + IndexedDBBlobStorage, + IndexedDBDocStorage, + IndexedDBSyncStorage, +} from '../impls/idb'; import { SpaceStorage } from '../storage'; import { SyncEngine } from '../sync'; -test('sync', async () => { +test('doc', async () => { const doc = new YDoc(); doc.getMap('test').set('hello', 'world'); const update = encodeStateAsUpdate(doc); @@ -83,3 +87,69 @@ test('sync', async () => { expect(c?.bin).toEqual(update2); } }); + +test('blob', async () => { + const a = new IndexedDBBlobStorage({ + id: 'ws1', + peer: 'a', + type: 'workspace', + }); + + const b = new IndexedDBBlobStorage({ + id: 'ws1', + peer: 'b', + type: 'workspace', + }); + + const c = new IndexedDBBlobStorage({ + id: 'ws1', + peer: 'c', + type: 'workspace', + }); + + await a.set({ + key: 'test', + data: new Uint8Array([1, 2, 3, 4]), + mime: 'text/plain', + createdAt: new Date(100), + }); + + await c.set({ + key: 'test2', + data: new Uint8Array([4, 3, 2, 1]), + mime: 'text/plain', + createdAt: new Date(100), + }); + + const peerA = new SpaceStorage([a]); + const peerB = new SpaceStorage([b]); + const peerC = new SpaceStorage([c]); + + await peerA.connect(); + await peerB.connect(); + await peerC.connect(); + + const sync = new SyncEngine(peerA, [peerB, peerC]); + const abort = new AbortController(); + sync.run(abort.signal); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + { + const a = await peerA.get('blob').get('test'); + expect(a).not.toBeNull(); + expect(a?.data).toEqual(new Uint8Array([1, 2, 3, 4])); + } + + { + const b = await peerB.get('blob').get('test'); + expect(b).not.toBeNull(); + expect(b?.data).toEqual(new Uint8Array([1, 2, 3, 4])); + } + + { + const c = await peerC.get('blob').get('test2'); + expect(c).not.toBeNull(); + expect(c?.data).toEqual(new Uint8Array([4, 3, 2, 1])); + } +}); diff --git a/packages/common/nbstore/src/op/consumer.ts b/packages/common/nbstore/src/op/consumer.ts index f1f61c3e103b5..67b45ff901fd0 100644 --- a/packages/common/nbstore/src/op/consumer.ts +++ b/packages/common/nbstore/src/op/consumer.ts @@ -104,8 +104,12 @@ export class SpaceStorageConsumer extends SpaceStorage { } private registerBlobHandlers(storage: BlobStorage) { - this.consumer.register('getBlob', storage.get.bind(storage)); - this.consumer.register('setBlob', storage.set.bind(storage)); + this.consumer.register('getBlob', key => { + return storage.get(key); + }); + this.consumer.register('setBlob', blob => { + return storage.set(blob); + }); this.consumer.register('deleteBlob', ({ key, permanently }) => { return storage.delete(key, permanently); }); diff --git a/packages/common/nbstore/src/storage/blob.ts b/packages/common/nbstore/src/storage/blob.ts index 6cd877a2e0b47..6926468aa918d 100644 --- a/packages/common/nbstore/src/storage/blob.ts +++ b/packages/common/nbstore/src/storage/blob.ts @@ -21,9 +21,13 @@ export abstract class BlobStorage< > extends Storage { override readonly storageType = 'blob'; - abstract get(key: string): Promise; - abstract set(blob: BlobRecord): Promise; - abstract delete(key: string, permanently: boolean): Promise; - abstract release(): Promise; - abstract list(): Promise; + abstract get(key: string, signal?: AbortSignal): Promise; + abstract set(blob: BlobRecord, signal?: AbortSignal): Promise; + abstract delete( + key: string, + permanently: boolean, + signal?: AbortSignal + ): Promise; + abstract release(signal?: AbortSignal): Promise; + abstract list(signal?: AbortSignal): Promise; } diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts new file mode 100644 index 0000000000000..6e21d046c673d --- /dev/null +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -0,0 +1,89 @@ +import { difference } from 'lodash-es'; + +import type { BlobStorage } from '../../storage'; +import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; + +export class BlobSyncEngine { + constructor( + readonly local: BlobStorage, + readonly remotes: BlobStorage[] + ) {} + + private async sync(signal?: AbortSignal) { + throwIfAborted(signal); + + for (const remote of this.remotes) { + let localList: string[] = []; + let remoteList: string[] = []; + + try { + localList = (await this.local.list(signal)).map(b => b.key); + throwIfAborted(signal); + remoteList = (await remote.list(signal)).map(b => b.key); + throwIfAborted(signal); + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + console.error(`error when sync`, err); + continue; + } + + const needUpload = difference(localList, remoteList); + for (const key of needUpload) { + try { + const data = await this.local.get(key, signal); + throwIfAborted(signal); + if (data) { + await remote.set(data, signal); + throwIfAborted(signal); + } + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + console.error( + `error when sync ${key} from [${this.local.peer}] to [${remote.peer}]`, + err + ); + } + } + + const needDownload = difference(remoteList, localList); + + for (const key of needDownload) { + try { + const data = await remote.get(key, signal); + throwIfAborted(signal); + if (data) { + await this.local.set(data, signal); + throwIfAborted(signal); + } + } catch (err) { + if (err === MANUALLY_STOP) { + throw err; + } + console.error( + `error when sync ${key} from [${remote.peer}] to [${this.local.peer}]`, + err + ); + } + } + } + } + + async run(signal?: AbortSignal) { + if (signal?.aborted) { + return; + } + + try { + await this.sync(signal); + } catch (error) { + if (error === MANUALLY_STOP) { + return; + } + console.error('sync blob error', error); + } + } +} diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index 2834d6d93bd28..01c26082c60b5 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -620,7 +620,19 @@ export class DocSyncPeer { setPriority(docId: string, priority: number) { this.prioritySettings.set(docId, priority); - this.status.jobDocQueue.updatePriority(docId, priority); + return this.status.jobDocQueue.setPriority(docId, priority); + } + + addPriority(id: string, priority: number) { + const oldPriority = this.prioritySettings.get(id) ?? 0; + this.prioritySettings.set(id, priority); + this.status.jobDocQueue.setPriority(id, oldPriority + priority); + + return () => { + const currentPriority = this.prioritySettings.get(id) ?? 0; + this.prioritySettings.set(id, currentPriority - priority); + this.status.jobDocQueue.setPriority(id, currentPriority - priority); + }; } protected mergeUpdates(updates: Uint8Array[]) { diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts index 857fd62a6c566..ca2de2e58b421 100644 --- a/packages/common/nbstore/src/sync/index.ts +++ b/packages/common/nbstore/src/sync/index.ts @@ -1,4 +1,5 @@ -import type { DocStorage, SpaceStorage } from '../storage'; +import type { BlobStorage, DocStorage, SpaceStorage } from '../storage'; +import { BlobSyncEngine } from './blob'; import { DocSyncEngine } from './doc'; export class SyncEngine { @@ -9,15 +10,30 @@ export class SyncEngine { async run(signal?: AbortSignal) { const doc = this.local.tryGet('doc'); + const blob = this.local.tryGet('blob'); const sync = this.local.tryGet('sync'); - if (doc && sync) { - const peerDocs = this.peers - .map(peer => peer.tryGet('doc')) - .filter((v): v is DocStorage => !!v); + await Promise.allSettled([ + (async () => { + if (doc && sync) { + const peerDocs = this.peers + .map(peer => peer.tryGet('doc')) + .filter((v): v is DocStorage => !!v); - const engine = new DocSyncEngine(doc, sync, peerDocs); - await engine.run(signal); - } + const engine = new DocSyncEngine(doc, sync, peerDocs); + await engine.run(signal); + } + })(), + (async () => { + if (blob) { + const peerBlobs = this.peers + .map(peer => peer.tryGet('blob')) + .filter((v): v is BlobStorage => !!v); + + const engine = new BlobSyncEngine(blob, peerBlobs); + await engine.run(signal); + } + })(), + ]); } } diff --git a/packages/common/nbstore/src/utils/priority-queue.ts b/packages/common/nbstore/src/utils/priority-queue.ts index 0c38fca444ab9..3af09acc2921c 100644 --- a/packages/common/nbstore/src/utils/priority-queue.ts +++ b/packages/common/nbstore/src/utils/priority-queue.ts @@ -57,7 +57,7 @@ export class PriorityQueue { this.priorityMap.clear(); } - updatePriority(id: string, priority: number) { + setPriority(id: string, priority: number) { if (this.remove(id)) { this.push(id, priority); }