Skip to content

Commit

Permalink
feat(nbstore): better doc sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 9, 2024
1 parent 8fe188e commit 24492df
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 99 deletions.
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/impls/idb/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export class IndexedDBBlobStorage extends BlobStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
return this.connection.inner.db;
}

override async get(key: string) {
Expand Down
41 changes: 24 additions & 17 deletions packages/common/nbstore/src/impls/idb/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { Connection } from '../../connection';
import type { StorageOptions } from '../../storage';
import { type DocStorageSchema, migrator } from './schema';

export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
private readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`;
export class IDBConnection extends Connection<{
db: IDBPDatabase<DocStorageSchema>;
channel: BroadcastChannel;
}> {
readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`;

override get shareId() {
return `idb(${migrator.version}):${this.dbName}`;
Expand All @@ -16,28 +19,32 @@ export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
}

override async doConnect() {
return openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
});
return {
db: await openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
}),
channel: new BroadcastChannel(this.dbName),
};
}

override async doDisconnect() {
this.close();
}

private close(error?: Error) {
this.maybeConnection?.close();
this.maybeConnection?.channel.close();
this.maybeConnection?.db.close();
this.setStatus('closed', error);
}
}
58 changes: 57 additions & 1 deletion packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,37 @@ import {
type DocClocks,
type DocRecord,
DocStorage,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
import { IDBConnection } from './db';
import { IndexedDBLocker } from './lock';

interface ChannelMessage {
type: 'update';
update: DocRecord;
origin?: string;
}

export class IndexedDBDocStorage extends DocStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
return this.connection.inner.db;
}

get channel() {
return this.connection.inner.channel;
}

override locker = new IndexedDBLocker(this.connection);

private _lastTimestamp = new Date(0);

constructor(options: DocStorageOptions) {
super(options);
}

private generateTimestamp() {
const timestamp = new Date();
if (timestamp.getTime() <= this._lastTimestamp.getTime()) {
Expand Down Expand Up @@ -47,6 +65,17 @@ export class IndexedDBDocStorage extends DocStorage {
origin
);

this.channel.postMessage({
type: 'update',
update: {
docId: update.docId,
bin: update.bin,
timestamp,
editor: update.editor,
},
origin,
} satisfies ChannelMessage);

return { docId: update.docId, timestamp };
}

Expand Down Expand Up @@ -144,4 +173,31 @@ export class IndexedDBDocStorage extends DocStorage {
trx.commit();
return updates.length;
}

private docUpdateListener = 0;

override subscribeDocUpdate(
callback: (update: DocRecord, origin?: string) => void
): () => void {
if (this.docUpdateListener === 0) {
this.channel.addEventListener('message', this.handleChannelMessage);
}
this.docUpdateListener++;

const dispose = super.subscribeDocUpdate(callback);

return () => {
dispose();
this.docUpdateListener--;
if (this.docUpdateListener === 0) {
this.channel.removeEventListener('message', this.handleChannelMessage);
}
};
}

handleChannelMessage(event: MessageEvent<ChannelMessage>) {
if (event.data.type === 'update') {
this.emit('update', event.data.update, event.data.origin);
}
}
}
83 changes: 83 additions & 0 deletions packages/common/nbstore/src/impls/idb/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import EventEmitter2 from 'eventemitter2';

import { type Locker } from '../../storage/lock';
import type { IDBConnection } from './db';

interface ChannelMessage {
type: 'unlock';
key: string;
}

export class IndexedDBLocker implements Locker {
get db() {
return this.dbConnection.inner.db;
}
private readonly eventEmitter = new EventEmitter2();

get channel() {
return this.dbConnection.inner.channel;
}

constructor(private readonly dbConnection: IDBConnection) {}

async lock(domain: string, resource: string) {
const key = `${domain}:${resource}`;

// eslint-disable-next-line no-constant-condition
while (true) {
const trx = this.db.transaction('locks', 'readwrite');
const record = await trx.store.get(key);
const lockTimestamp = record?.lock.getTime();

if (
lockTimestamp &&
lockTimestamp > Date.now() - 30000 /* lock timeout 3s */
) {
trx.commit();

await new Promise<void>(resolve => {
const cleanup = () => {
this.channel.removeEventListener('message', channelListener);
this.eventEmitter.off('unlock', eventListener);
clearTimeout(timer);
};
const channelListener = (event: MessageEvent<ChannelMessage>) => {
if (event.data.type === 'unlock' && event.data.key === key) {
cleanup();
resolve();
}
};
const eventListener = (unlockKey: string) => {
if (unlockKey === key) {
cleanup();
resolve();
}
};
this.channel.addEventListener('message', channelListener); // add listener
this.eventEmitter.on('unlock', eventListener);

const timer = setTimeout(() => {
cleanup();
resolve();
}, 3000);
// timeout to avoid dead lock
});
continue;
} else {
await trx.store.put({ key, lock: new Date() });
trx.commit();
break;
}
}

return {
[Symbol.asyncDispose]: async () => {
const trx = this.db.transaction('locks', 'readwrite');
await trx.store.delete(key);
trx.commit();
this.channel.postMessage({ type: 'unlock', key });
this.eventEmitter.emit('unlock', key);
},
};
}
}
12 changes: 12 additions & 0 deletions packages/common/nbstore/src/impls/idb/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ export interface DocStorageSchema extends DBSchema {
peer: string;
};
};
locks: {
key: string;
value: {
key: string;
lock: Date;
};
};
}

const migrate: OpenDBCallbacks<DocStorageSchema>['upgrade'] = (
Expand Down Expand Up @@ -162,6 +169,11 @@ const init: Migrate = db => {
keyPath: 'key',
autoIncrement: false,
});

db.createObjectStore('locks', {
keyPath: 'key',
autoIncrement: false,
});
};
// END REGION

Expand Down
47 changes: 46 additions & 1 deletion packages/common/nbstore/src/impls/idb/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,24 @@ export class IndexedDBSyncStorage extends SyncStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
return this.connection.inner.db;
}

override async getPeerRemoteClock(
peer: string,
docId: string
): Promise<DocClock | null> {
const trx = this.db.transaction('peerClocks', 'readonly');

const record = await trx.store.get([peer, docId]);

return record
? {
docId: record.docId,
timestamp: record.clock,
}
: null;
}
override async getPeerRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

Expand All @@ -34,6 +49,21 @@ export class IndexedDBSyncStorage extends SyncStorage {
}
}

override async getPeerPulledRemoteClock(
peer: string,
docId: string
): Promise<DocClock | null> {
const trx = this.db.transaction('peerClocks', 'readonly');

const record = await trx.store.get([peer, docId]);

return record
? {
docId: record.docId,
timestamp: record.pulledClock,
}
: null;
}
override async getPeerPulledRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

Expand All @@ -59,6 +89,21 @@ export class IndexedDBSyncStorage extends SyncStorage {
}
}

override async getPeerPushedClock(
peer: string,
docId: string
): Promise<DocClock | null> {
const trx = this.db.transaction('peerClocks', 'readonly');

const record = await trx.store.get([peer, docId]);

return record
? {
docId: record.docId,
timestamp: record.pushedClock,
}
: null;
}
override async getPeerPushedClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

Expand Down
6 changes: 3 additions & 3 deletions packages/common/nbstore/src/storage/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import EventEmitter2 from 'eventemitter2';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';

import { isEmptyUpdate } from '../utils/is-empty-update';
import type { Lock } from './lock';
import type { Locker } from './lock';
import { SingletonLocker } from './lock';
import { Storage, type StorageOptions } from './storage';

Expand Down Expand Up @@ -42,7 +42,7 @@ export abstract class DocStorage<
> extends Storage<Opts> {
private readonly event = new EventEmitter2();
override readonly storageType = 'doc';
private readonly locker = new SingletonLocker();
protected readonly locker: Locker = new SingletonLocker();

// REGION: open apis by Op system
/**
Expand Down Expand Up @@ -243,7 +243,7 @@ export abstract class DocStorage<
return merge(updates.filter(bin => !isEmptyUpdate(bin)));
}

protected async lockDocForUpdate(docId: string): Promise<Lock> {
protected async lockDocForUpdate(docId: string): Promise<AsyncDisposable> {
return this.locker.lock(`workspace:${this.spaceId}:update`, docId);
}
}
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/storage/lock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface Locker {
lock(domain: string, resource: string): Promise<Lock>;
lock(domain: string, resource: string): Promise<AsyncDisposable>;
}

export class SingletonLocker implements Locker {
Expand Down
Loading

0 comments on commit 24492df

Please sign in to comment.