Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nbstore): new doc sync engine #8918

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
"./idb/v1": "./src/impls/idb/v1/index.ts"
},
"dependencies": {
"@datastructures-js/binary-search-tree": "^5.3.2",
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"nanoid": "^5.0.7",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"idb": "^8.0.0"
"fake-indexeddb": "^6.0.0",
"idb": "^8.0.0",
"vitest": "2.1.4"
},
"peerDependencies": {
"idb": "^8.0.0"
Expand Down
85 changes: 85 additions & 0 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import 'fake-indexeddb/auto';

import { expect, test } from 'vitest';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';

import { IndexedDBDocStorage, IndexedDBSyncStorage } from '../impls/idb';
import { SpaceStorage } from '../storage';
import { SyncEngine } from '../sync';

test('sync', async () => {
const doc = new YDoc();
doc.getMap('test').set('hello', 'world');
const update = encodeStateAsUpdate(doc);

const peerADoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const peerASync = new IndexedDBSyncStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
});

const peerBDoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
});
const peerCDoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'c',
type: 'workspace',
});

const peerA = new SpaceStorage([peerADoc, peerASync]);
const peerB = new SpaceStorage([peerBDoc]);
const peerC = new SpaceStorage([peerCDoc]);

await peerA.connect();
await peerB.connect();
await peerC.connect();

await peerA.get('doc').pushDocUpdate({
docId: 'doc1',
bin: update,
});

const sync = new SyncEngine(peerA, [peerB, peerC]);
const abort = new AbortController();
sync.run(abort.signal);

await new Promise(resolve => setTimeout(resolve, 1000));

{
const b = await peerB.get('doc').getDoc('doc1');
expect(b).not.toBeNull();
expect(b?.bin).toEqual(update);

const c = await peerC.get('doc').getDoc('doc1');
expect(c).not.toBeNull();
expect(c?.bin).toEqual(update);
}

doc.getMap('test').set('foo', 'bar');
const update2 = encodeStateAsUpdate(doc);
await peerC.get('doc').pushDocUpdate({
docId: 'doc1',
bin: update2,
});

await new Promise(resolve => setTimeout(resolve, 1000));

{
const a = await peerA.get('doc').getDoc('doc1');
expect(a).not.toBeNull();
expect(a?.bin).toEqual(update2);

const c = await peerC.get('doc').getDoc('doc1');
expect(c).not.toBeNull();
expect(c?.bin).toEqual(update2);
}
});
19 changes: 19 additions & 0 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,25 @@ export abstract class Connection<T = any> {
});
}

waitForConnected(signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
if (this.status === 'connected') {
resolve();
return;
}

this.onStatusChanged(status => {
if (status === 'connected') {
resolve();
}
});

signal?.addEventListener('abort', reason => {
reject(reason);
});
});
}
EYHN marked this conversation as resolved.
Show resolved Hide resolved

onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
Expand Down
33 changes: 31 additions & 2 deletions packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { share } from '../../connection';
import {
type DocClock,
type DocClocks,
type DocRecord,
DocStorage,
Expand All @@ -14,16 +15,38 @@ export class IndexedDBDocStorage extends DocStorage {
return this.connection.inner;
}

override async pushDocUpdate(update: DocUpdate) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
private _lastTimestamp = new Date(0);

private generateTimestamp() {
const timestamp = new Date();
if (timestamp.getTime() <= this._lastTimestamp.getTime()) {
timestamp.setTime(this._lastTimestamp.getTime() + 1);
}
this._lastTimestamp = timestamp;
return timestamp;
}

override async pushDocUpdate(update: DocUpdate, origin?: string) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
const timestamp = this.generateTimestamp();
await trx.objectStore('updates').add({
...update,
createdAt: timestamp,
});

await trx.objectStore('clocks').put({ docId: update.docId, timestamp });

this.emit(
'update',
{
docId: update.docId,
bin: update.bin,
timestamp,
editor: update.editor,
},
origin
);

return { docId: update.docId, timestamp };
}

Expand Down Expand Up @@ -72,6 +95,12 @@ export class IndexedDBDocStorage extends DocStorage {
}, {} as DocClocks);
}

override async getDocTimestamp(docId: string): Promise<DocClock | null> {
const trx = this.db.transaction('clocks', 'readonly');

return (await trx.store.get(docId)) ?? null;
}

protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
Expand Down
1 change: 1 addition & 0 deletions packages/common/nbstore/src/impls/idb/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface DocStorageSchema extends DBSchema {
peer: string;
docId: string;
clock: Date;
pulledClock: Date;
EYHN marked this conversation as resolved.
Show resolved Hide resolved
pushedClock: Date;
};
indexes: {
Expand Down
31 changes: 29 additions & 2 deletions packages/common/nbstore/src/impls/idb/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class IndexedDBSyncStorage extends SyncStorage {
return this.connection.inner;
}

override async getPeerClocks(peer: string) {
override async getPeerRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

const records = await trx.store.index('peer').getAll(peer);
Expand All @@ -19,7 +19,7 @@ export class IndexedDBSyncStorage extends SyncStorage {
}, {} as DocClocks);
}

override async setPeerClock(peer: string, clock: DocClock) {
override async setPeerRemoteClock(peer: string, clock: DocClock) {
const trx = this.db.transaction('peerClocks', 'readwrite');
const record = await trx.store.get([peer, clock.docId]);

Expand All @@ -28,6 +28,32 @@ export class IndexedDBSyncStorage extends SyncStorage {
peer,
docId: clock.docId,
clock: clock.timestamp,
pulledClock: record?.pulledClock ?? new Date(0),
pushedClock: record?.pushedClock ?? new Date(0),
});
}
}

override async getPeerPulledRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

const records = await trx.store.index('peer').getAll(peer);

return records.reduce((clocks, { docId, pulledClock }) => {
clocks[docId] = pulledClock;
return clocks;
}, {} as DocClocks);
}
override async setPeerPulledRemoteClock(peer: string, clock: DocClock) {
const trx = this.db.transaction('peerClocks', 'readwrite');
const record = await trx.store.get([peer, clock.docId]);

if (!record || record.pulledClock < clock.timestamp) {
await trx.store.put({
peer,
docId: clock.docId,
clock: record?.clock ?? new Date(0),
pulledClock: clock.timestamp,
pushedClock: record?.pushedClock ?? new Date(0),
});
}
Expand All @@ -54,6 +80,7 @@ export class IndexedDBSyncStorage extends SyncStorage {
docId: clock.docId,
clock: record?.clock ?? new Date(0),
pushedClock: clock.timestamp,
pulledClock: record?.pulledClock ?? new Date(0),
});
}
}
Expand Down
4 changes: 4 additions & 0 deletions packages/common/nbstore/src/impls/idb/v1/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export class IndexedDBV1DocStorage extends DocStorage {
return {};
}

override async getDocTimestamp(_docId: string) {
return null;
}

protected override async setDocSnapshot(): Promise<boolean> {
return false;
}
Expand Down
15 changes: 7 additions & 8 deletions packages/common/nbstore/src/op/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ export class SpaceStorageConsumer extends SpaceStorage {
this.consumer.register('getDocDiff', ({ docId, state }) => {
return storage.getDocDiff(docId, state);
});
this.consumer.register(
'pushDocUpdate',
storage.pushDocUpdate.bind(storage)
);
this.consumer.register('pushDocUpdate', ({ update, origin }) => {
return storage.pushDocUpdate(update, origin);
});
this.consumer.register(
'getDocTimestamps',
storage.getDocTimestamps.bind(storage)
Expand All @@ -81,8 +80,8 @@ export class SpaceStorageConsumer extends SpaceStorage {
this.consumer.register('subscribeDocUpdate', () => {
return new Observable(subscriber => {
subscriber.add(
storage.subscribeDocUpdate(update => {
subscriber.next(update);
storage.subscribeDocUpdate((update, origin) => {
subscriber.next({ update, origin });
})
);
});
Expand Down Expand Up @@ -117,10 +116,10 @@ export class SpaceStorageConsumer extends SpaceStorage {
private registerSyncHandlers(storage: SyncStorage) {
this.consumer.register(
'getPeerClocks',
storage.getPeerClocks.bind(storage)
storage.getPeerRemoteClocks.bind(storage)
);
this.consumer.register('setPeerClock', ({ peer, ...clock }) => {
return storage.setPeerClock(peer, clock);
return storage.setPeerRemoteClock(peer, clock);
});
this.consumer.register(
'getPeerPushedClocks',
Expand Down
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/op/ops.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ export interface SpaceStorageOps extends OpSchema {
// doc
getDoc: [string, DocRecord | null];
getDocDiff: [{ docId: string; state?: Uint8Array }, DocDiff | null];
pushDocUpdate: [DocUpdate, DocClock];
pushDocUpdate: [{ update: DocUpdate; origin?: string }, DocClock];
getDocTimestamps: [Date, DocClocks];
deleteDoc: [string, void];
subscribeDocUpdate: [void, DocRecord];
subscribeDocUpdate: [void, { update: DocRecord; origin?: string }];

// history
listHistory: [{ docId: string; filter?: HistoryFilter }, ListedHistory[]];
Expand Down
Loading
Loading