Skip to content

Commit

Permalink
feat(nbstore): new sync engine
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Nov 25, 2024
1 parent b5fed7b commit c8cafcc
Show file tree
Hide file tree
Showing 23 changed files with 1,105 additions and 41 deletions.
6 changes: 5 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
"./idb/v1": "./src/impls/idb/v1/index.ts"
},
"dependencies": {
"@datastructures-js/binary-search-tree": "^5.3.2",
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"nanoid": "^5.0.7",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"idb": "^8.0.0"
"fake-indexeddb": "^6.0.0",
"idb": "^8.0.0",
"vitest": "2.1.4"
},
"peerDependencies": {
"idb": "^8.0.0"
Expand Down
85 changes: 85 additions & 0 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import 'fake-indexeddb/auto';

import { expect, test } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';

import { IndexedDBDocStorage, IndexedDBSyncStorage } from '../impls/idb';
import { SpaceStorage } from '../storage';
import { SyncEngine } from '../sync';

test('sync', async () => {
const doc = new YDoc();
doc.getMap('test').set('hello', 'world');
const update = encodeStateAsUpdate(doc);

const peerADoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const peerASync = new IndexedDBSyncStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const peerBDoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
});
const peerCDoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'c',
type: 'workspace',
});

const peerA = new SpaceStorage([peerADoc, peerASync]);
const peerB = new SpaceStorage([peerBDoc]);
const peerC = new SpaceStorage([peerCDoc]);

await peerA.connect();
await peerB.connect();
await peerC.connect();

await peerA.get('doc').pushDocUpdate({
docId: 'doc1',
bin: update,
});

const sync = new SyncEngine(peerA, [peerB, peerC]);
const abort = new AbortController();
sync.run(abort.signal);

await new Promise(resolve => setTimeout(resolve, 1000));

{
const b = await peerB.get('doc').getDoc('doc1');
expect(b).not.toBeNull();
expect(b?.bin).toEqual(update);

const c = await peerC.get('doc').getDoc('doc1');
expect(c).not.toBeNull();
expect(c?.bin).toEqual(update);
}

doc.getMap('test').set('foo', 'bar');
const update2 = encodeStateAsUpdate(doc);
await peerC.get('doc').pushDocUpdate({
docId: 'doc1',
bin: update2,
});

await new Promise(resolve => setTimeout(resolve, 1000));

{
const a = await peerA.get('doc').getDoc('doc1');
expect(a).not.toBeNull();
expect(a?.bin).toEqual(update2);

const c = await peerC.get('doc').getDoc('doc1');
expect(c).not.toBeNull();
expect(c?.bin).toEqual(update2);
}
});
19 changes: 19 additions & 0 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,25 @@ export abstract class Connection<T = any> {
});
}

waitForConnected(signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
if (this.status === 'connected') {
resolve();
return;
}

this.onStatusChanged(status => {
if (status === 'connected') {
resolve();
}
});

signal?.addEventListener('abort', reason => {
reject(reason);
});
});
}

onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
Expand Down
33 changes: 31 additions & 2 deletions packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { share } from '../../connection';
import {
type DocClock,
type DocClocks,
type DocRecord,
DocStorage,
Expand All @@ -14,16 +15,38 @@ export class IndexedDBDocStorage extends DocStorage {
return this.connection.inner;
}

override async pushDocUpdate(update: DocUpdate) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
private _lastTimestamp = new Date(0);

private generateTimestamp() {
const timestamp = new Date();
if (timestamp.getTime() <= this._lastTimestamp.getTime()) {
timestamp.setTime(this._lastTimestamp.getTime() + 1);
}
this._lastTimestamp = timestamp;
return timestamp;
}

override async pushDocUpdate(update: DocUpdate, origin?: string) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
const timestamp = this.generateTimestamp();
await trx.objectStore('updates').add({
...update,
createdAt: timestamp,
});

await trx.objectStore('clocks').put({ docId: update.docId, timestamp });

this.emit(
'update',
{
docId: update.docId,
bin: update.bin,
timestamp,
editor: update.editor,
},
origin
);

return { docId: update.docId, timestamp };
}

Expand Down Expand Up @@ -72,6 +95,12 @@ export class IndexedDBDocStorage extends DocStorage {
}, {} as DocClocks);
}

override async getDocTimestamp(docId: string): Promise<DocClock | null> {
const trx = this.db.transaction('clocks', 'readonly');

return (await trx.store.get(docId)) ?? null;
}

protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
Expand Down
1 change: 1 addition & 0 deletions packages/common/nbstore/src/impls/idb/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface DocStorageSchema extends DBSchema {
peer: string;
docId: string;
clock: Date;
pulledClock: Date;
pushedClock: Date;
};
indexes: {
Expand Down
31 changes: 29 additions & 2 deletions packages/common/nbstore/src/impls/idb/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class IndexedDBSyncStorage extends SyncStorage {
return this.connection.inner;
}

override async getPeerClocks(peer: string) {
override async getPeerRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

const records = await trx.store.index('peer').getAll(peer);
Expand All @@ -19,7 +19,7 @@ export class IndexedDBSyncStorage extends SyncStorage {
}, {} as DocClocks);
}

override async setPeerClock(peer: string, clock: DocClock) {
override async setPeerRemoteClock(peer: string, clock: DocClock) {
const trx = this.db.transaction('peerClocks', 'readwrite');
const record = await trx.store.get([peer, clock.docId]);

Expand All @@ -28,6 +28,32 @@ export class IndexedDBSyncStorage extends SyncStorage {
peer,
docId: clock.docId,
clock: clock.timestamp,
pulledClock: record?.pulledClock ?? new Date(0),
pushedClock: record?.pushedClock ?? new Date(0),
});
}
}

override async getPeerPulledRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

const records = await trx.store.index('peer').getAll(peer);

return records.reduce((clocks, { docId, pulledClock }) => {
clocks[docId] = pulledClock;
return clocks;
}, {} as DocClocks);
}
override async setPeerPulledRemoteClock(peer: string, clock: DocClock) {
const trx = this.db.transaction('peerClocks', 'readwrite');
const record = await trx.store.get([peer, clock.docId]);

if (!record || record.pulledClock < clock.timestamp) {
await trx.store.put({
peer,
docId: clock.docId,
clock: record?.clock ?? new Date(0),
pulledClock: clock.timestamp,
pushedClock: record?.pushedClock ?? new Date(0),
});
}
Expand All @@ -54,6 +80,7 @@ export class IndexedDBSyncStorage extends SyncStorage {
docId: clock.docId,
clock: record?.clock ?? new Date(0),
pushedClock: clock.timestamp,
pulledClock: record?.pulledClock ?? new Date(0),
});
}
}
Expand Down
4 changes: 4 additions & 0 deletions packages/common/nbstore/src/impls/idb/v1/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export class IndexedDBV1DocStorage extends DocStorage {
return {};
}

override async getDocTimestamp(_docId: string) {
return null;
}

protected override async setDocSnapshot(): Promise<boolean> {
return false;
}
Expand Down
15 changes: 7 additions & 8 deletions packages/common/nbstore/src/op/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ export class SpaceStorageConsumer extends SpaceStorage {
this.consumer.register('getDocDiff', ({ docId, state }) => {
return storage.getDocDiff(docId, state);
});
this.consumer.register(
'pushDocUpdate',
storage.pushDocUpdate.bind(storage)
);
this.consumer.register('pushDocUpdate', ({ update, origin }) => {
return storage.pushDocUpdate(update, origin);
});
this.consumer.register(
'getDocTimestamps',
storage.getDocTimestamps.bind(storage)
Expand All @@ -81,8 +80,8 @@ export class SpaceStorageConsumer extends SpaceStorage {
this.consumer.register('subscribeDocUpdate', () => {
return new Observable(subscriber => {
subscriber.add(
storage.subscribeDocUpdate(update => {
subscriber.next(update);
storage.subscribeDocUpdate((update, origin) => {
subscriber.next({ update, origin });
})
);
});
Expand Down Expand Up @@ -117,10 +116,10 @@ export class SpaceStorageConsumer extends SpaceStorage {
private registerSyncHandlers(storage: SyncStorage) {
this.consumer.register(
'getPeerClocks',
storage.getPeerClocks.bind(storage)
storage.getPeerRemoteClocks.bind(storage)
);
this.consumer.register('setPeerClock', ({ peer, ...clock }) => {
return storage.setPeerClock(peer, clock);
return storage.setPeerRemoteClock(peer, clock);
});
this.consumer.register(
'getPeerPushedClocks',
Expand Down
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/op/ops.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ export interface SpaceStorageOps extends OpSchema {
// doc
getDoc: [string, DocRecord | null];
getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null];
pushDocUpdate: [DocUpdate, DocClock];
pushDocUpdate: [{ update: DocUpdate; origin?: string }, DocClock];
getDocTimestamps: [Date, DocClocks];
deleteDoc: [string, void];
subscribeDocUpdate: [void, DocRecord];
subscribeDocUpdate: [void, { update: DocRecord; origin?: string }];

// history
listHistory: [{ docId: string; filter?: HistoryFilter }, ListedHistory[]];
Expand Down
Loading

0 comments on commit c8cafcc

Please sign in to comment.