Skip to content

Commit

Permalink
feat(nbstore): add blob sync
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 5, 2024
1 parent 03abb75 commit aa67016
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 17 deletions.
74 changes: 72 additions & 2 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import 'fake-indexeddb/auto';
import { expect, test } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';

import { IndexedDBDocStorage, IndexedDBSyncStorage } from '../impls/idb';
import {
IndexedDBBlobStorage,
IndexedDBDocStorage,
IndexedDBSyncStorage,
} from '../impls/idb';
import { SpaceStorage } from '../storage';
import { SyncEngine } from '../sync';

test('sync', async () => {
test('doc', async () => {
const doc = new YDoc();
doc.getMap('test').set('hello', 'world');
const update = encodeStateAsUpdate(doc);
Expand Down Expand Up @@ -83,3 +87,69 @@ test('sync', async () => {
expect(c?.bin).toEqual(update2);
}
});

test('blob', async () => {
const a = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const b = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
});

const c = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'c',
type: 'workspace',
});

await a.set({
key: 'test',
data: new Uint8Array([1, 2, 3, 4]),
mime: 'text/plain',
createdAt: new Date(100),
});

await c.set({
key: 'test2',
data: new Uint8Array([4, 3, 2, 1]),
mime: 'text/plain',
createdAt: new Date(100),
});

const peerA = new SpaceStorage([a]);
const peerB = new SpaceStorage([b]);
const peerC = new SpaceStorage([c]);

await peerA.connect();
await peerB.connect();
await peerC.connect();

const sync = new SyncEngine(peerA, [peerB, peerC]);
const abort = new AbortController();
sync.run(abort.signal);

await new Promise(resolve => setTimeout(resolve, 1000));

{
const a = await peerA.get('blob').get('test');
expect(a).not.toBeNull();
expect(a?.data).toEqual(new Uint8Array([1, 2, 3, 4]));
}

{
const b = await peerB.get('blob').get('test');
expect(b).not.toBeNull();
expect(b?.data).toEqual(new Uint8Array([1, 2, 3, 4]));
}

{
const c = await peerC.get('blob').get('test2');
expect(c).not.toBeNull();
expect(c?.data).toEqual(new Uint8Array([4, 3, 2, 1]));
}
});
14 changes: 9 additions & 5 deletions packages/common/nbstore/src/storage/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ export abstract class BlobStorage<
> extends Storage<Options> {
override readonly storageType = 'blob';

abstract get(key: string): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord): Promise<void>;
abstract delete(key: string, permanently: boolean): Promise<void>;
abstract release(): Promise<void>;
abstract list(): Promise<ListedBlobRecord[]>;
abstract get(key: string, signal?: AbortSignal): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord, signal?: AbortSignal): Promise<void>;
abstract delete(
key: string,
permanently: boolean,
signal?: AbortSignal
): Promise<void>;
abstract release(signal?: AbortSignal): Promise<void>;
abstract list(signal?: AbortSignal): Promise<ListedBlobRecord[]>;
}
89 changes: 89 additions & 0 deletions packages/common/nbstore/src/sync/blob/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { difference } from 'lodash-es';

import type { BlobStorage } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';

export class BlobSyncEngine {
constructor(
readonly local: BlobStorage,
readonly remotes: BlobStorage[]
) {}

private async sync(signal?: AbortSignal) {
throwIfAborted(signal);

for (const remote of this.remotes) {
let localList: string[] = [];
let remoteList: string[] = [];

try {
localList = (await this.local.list(signal)).map(b => b.key);
throwIfAborted(signal);
remoteList = (await remote.list(signal)).map(b => b.key);
throwIfAborted(signal);
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(`error when sync`, err);
continue;
}

const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.local.get(key, signal);
throwIfAborted(signal);
if (data) {
await remote.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${this.local.peer}] to [${remote.peer}]`,
err
);
}
}

const needDownload = difference(remoteList, localList);

for (const key of needDownload) {
try {
const data = await remote.get(key, signal);
throwIfAborted(signal);
if (data) {
await this.local.set(data, signal);
throwIfAborted(signal);
}
} catch (err) {
if (err === MANUALLY_STOP) {
throw err;
}
console.error(
`error when sync ${key} from [${remote.peer}] to [${this.local.peer}]`,
err
);
}
}
}
}

async run(signal?: AbortSignal) {
if (signal?.aborted) {
return;
}

try {
await this.sync(signal);
} catch (error) {
if (error === MANUALLY_STOP) {
return;
}
console.error('sync blob error', error);
}
}
}
14 changes: 13 additions & 1 deletion packages/common/nbstore/src/sync/doc/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,19 @@ export class DocSyncPeer {

setPriority(docId: string, priority: number) {
this.prioritySettings.set(docId, priority);
this.status.jobDocQueue.updatePriority(docId, priority);
return this.status.jobDocQueue.setPriority(docId, priority);
}

addPriority(id: string, priority: number) {
const oldPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, priority);
this.status.jobDocQueue.setPriority(id, oldPriority + priority);

return () => {
const currentPriority = this.prioritySettings.get(id) ?? 0;
this.prioritySettings.set(id, currentPriority - priority);
this.status.jobDocQueue.setPriority(id, currentPriority - priority);
};
}

protected mergeUpdates(updates: Uint8Array[]) {
Expand Down
32 changes: 24 additions & 8 deletions packages/common/nbstore/src/sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { DocStorage, SpaceStorage } from '../storage';
import type { BlobStorage, DocStorage, SpaceStorage } from '../storage';
import { BlobSyncEngine } from './blob';
import { DocSyncEngine } from './doc';

export class SyncEngine {
Expand All @@ -9,15 +10,30 @@ export class SyncEngine {

async run(signal?: AbortSignal) {
const doc = this.local.tryGet('doc');
const blob = this.local.tryGet('blob');
const sync = this.local.tryGet('sync');

if (doc && sync) {
const peerDocs = this.peers
.map(peer => peer.tryGet('doc'))
.filter((v): v is DocStorage => !!v);
await Promise.allSettled([
(async () => {
if (doc && sync) {
const peerDocs = this.peers
.map(peer => peer.tryGet('doc'))
.filter((v): v is DocStorage => !!v);

const engine = new DocSyncEngine(doc, sync, peerDocs);
await engine.run(signal);
}
const engine = new DocSyncEngine(doc, sync, peerDocs);
await engine.run(signal);
}
})(),
(async () => {
if (blob) {
const peerBlobs = this.peers
.map(peer => peer.tryGet('blob'))
.filter((v): v is BlobStorage => !!v);

const engine = new BlobSyncEngine(blob, peerBlobs);
await engine.run(signal);
}
})(),
]);
}
}
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/utils/priority-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class PriorityQueue {
this.priorityMap.clear();
}

updatePriority(id: string, priority: number) {
setPriority(id: string, priority: number) {
if (this.remove(id)) {
this.push(id, priority);
}
Expand Down

0 comments on commit aa67016

Please sign in to comment.