Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nbstore): better doc sync logic #9037

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/impls/idb/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export class IndexedDBBlobStorage extends BlobStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
return this.connection.inner.db;
}

override async get(key: string) {
Expand Down
41 changes: 24 additions & 17 deletions packages/common/nbstore/src/impls/idb/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { Connection } from '../../connection';
import type { StorageOptions } from '../../storage';
import { type DocStorageSchema, migrator } from './schema';

export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
private readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`;
export class IDBConnection extends Connection<{
db: IDBPDatabase<DocStorageSchema>;
channel: BroadcastChannel;
}> {
readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`;

override get shareId() {
return `idb(${migrator.version}):${this.dbName}`;
Expand All @@ -16,28 +19,32 @@ export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
}

override async doConnect() {
return openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
});
return {
db: await openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
}),
channel: new BroadcastChannel(this.dbName),
};
}

override async doDisconnect() {
this.close();
}

private close(error?: Error) {
this.maybeConnection?.close();
this.maybeConnection?.channel.close();
this.maybeConnection?.db.close();
this.setStatus('closed', error);
}
}
58 changes: 57 additions & 1 deletion packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,37 @@ import {
type DocClocks,
type DocRecord,
DocStorage,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
import { IDBConnection } from './db';
import { IndexedDBLocker } from './lock';

interface ChannelMessage {
type: 'update';
update: DocRecord;
origin?: string;
}

export class IndexedDBDocStorage extends DocStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
return this.connection.inner.db;
}

get channel() {
return this.connection.inner.channel;
}

override locker = new IndexedDBLocker(this.connection);

private _lastTimestamp = new Date(0);

constructor(options: DocStorageOptions) {
super(options);
}

private generateTimestamp() {
const timestamp = new Date();
if (timestamp.getTime() <= this._lastTimestamp.getTime()) {
Expand Down Expand Up @@ -47,6 +65,17 @@ export class IndexedDBDocStorage extends DocStorage {
origin
);

this.channel.postMessage({
type: 'update',
update: {
docId: update.docId,
bin: update.bin,
timestamp,
editor: update.editor,
},
origin,
} satisfies ChannelMessage);

return { docId: update.docId, timestamp };
}

Expand Down Expand Up @@ -144,4 +173,31 @@ export class IndexedDBDocStorage extends DocStorage {
trx.commit();
return updates.length;
}

private docUpdateListener = 0;

override subscribeDocUpdate(
callback: (update: DocRecord, origin?: string) => void
): () => void {
if (this.docUpdateListener === 0) {
this.channel.addEventListener('message', this.handleChannelMessage);
}
this.docUpdateListener++;

const dispose = super.subscribeDocUpdate(callback);

return () => {
dispose();
this.docUpdateListener--;
if (this.docUpdateListener === 0) {
this.channel.removeEventListener('message', this.handleChannelMessage);
}
};
}

handleChannelMessage(event: MessageEvent<ChannelMessage>) {
if (event.data.type === 'update') {
this.emit('update', event.data.update, event.data.origin);
}
}
}
83 changes: 83 additions & 0 deletions packages/common/nbstore/src/impls/idb/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import EventEmitter2 from 'eventemitter2';

import { type Locker } from '../../storage/lock';
import type { IDBConnection } from './db';

interface ChannelMessage {
type: 'unlock';
key: string;
}

export class IndexedDBLocker implements Locker {
get db() {
return this.dbConnection.inner.db;
}
private readonly eventEmitter = new EventEmitter2();

get channel() {
return this.dbConnection.inner.channel;
}

constructor(private readonly dbConnection: IDBConnection) {}

async lock(domain: string, resource: string) {
const key = `${domain}:${resource}`;

// eslint-disable-next-line no-constant-condition
while (true) {
const trx = this.db.transaction('locks', 'readwrite');
const record = await trx.store.get(key);
const lockTimestamp = record?.lock.getTime();

if (
lockTimestamp &&
lockTimestamp > Date.now() - 30000 /* lock timeout 3s */
) {
trx.commit();

await new Promise<void>(resolve => {
const cleanup = () => {
this.channel.removeEventListener('message', channelListener);
this.eventEmitter.off('unlock', eventListener);
clearTimeout(timer);
};
const channelListener = (event: MessageEvent<ChannelMessage>) => {
if (event.data.type === 'unlock' && event.data.key === key) {
cleanup();
resolve();
}
};
const eventListener = (unlockKey: string) => {
if (unlockKey === key) {
cleanup();
resolve();
}
};
this.channel.addEventListener('message', channelListener); // add listener
this.eventEmitter.on('unlock', eventListener);

const timer = setTimeout(() => {
cleanup();
resolve();
}, 3000);
// timeout to avoid dead lock
});
continue;
} else {
await trx.store.put({ key, lock: new Date() });
trx.commit();
break;
}
}

return {
[Symbol.asyncDispose]: async () => {
const trx = this.db.transaction('locks', 'readwrite');
await trx.store.delete(key);
trx.commit();
this.channel.postMessage({ type: 'unlock', key });
this.eventEmitter.emit('unlock', key);
},
};
}
}
12 changes: 12 additions & 0 deletions packages/common/nbstore/src/impls/idb/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ export interface DocStorageSchema extends DBSchema {
peer: string;
};
};
locks: {
key: string;
value: {
key: string;
lock: Date;
};
};
}

const migrate: OpenDBCallbacks<DocStorageSchema>['upgrade'] = (
Expand Down Expand Up @@ -162,6 +169,11 @@ const init: Migrate = db => {
keyPath: 'key',
autoIncrement: false,
});

db.createObjectStore('locks', {
keyPath: 'key',
EYHN marked this conversation as resolved.
Show resolved Hide resolved
autoIncrement: false,
});
};
// END REGION

Expand Down
47 changes: 46 additions & 1 deletion packages/common/nbstore/src/impls/idb/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,24 @@ export class IndexedDBSyncStorage extends SyncStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
return this.connection.inner.db;
}

override async getPeerRemoteClock(
peer: string,
docId: string
): Promise<DocClock | null> {
const trx = this.db.transaction('peerClocks', 'readonly');

const record = await trx.store.get([peer, docId]);

return record
? {
docId: record.docId,
timestamp: record.clock,
}
: null;
}
override async getPeerRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

Expand All @@ -34,6 +49,21 @@ export class IndexedDBSyncStorage extends SyncStorage {
}
}

override async getPeerPulledRemoteClock(
peer: string,
docId: string
): Promise<DocClock | null> {
const trx = this.db.transaction('peerClocks', 'readonly');

const record = await trx.store.get([peer, docId]);

return record
? {
docId: record.docId,
timestamp: record.pulledClock,
}
: null;
}
override async getPeerPulledRemoteClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

Expand All @@ -59,6 +89,21 @@ export class IndexedDBSyncStorage extends SyncStorage {
}
}

override async getPeerPushedClock(
peer: string,
docId: string
): Promise<DocClock | null> {
const trx = this.db.transaction('peerClocks', 'readonly');

const record = await trx.store.get([peer, docId]);

return record
? {
docId: record.docId,
timestamp: record.pushedClock,
}
: null;
}
override async getPeerPushedClocks(peer: string) {
const trx = this.db.transaction('peerClocks', 'readonly');

Expand Down
6 changes: 3 additions & 3 deletions packages/common/nbstore/src/storage/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import EventEmitter2 from 'eventemitter2';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';

import { isEmptyUpdate } from '../utils/is-empty-update';
import type { Lock } from './lock';
import type { Locker } from './lock';
import { SingletonLocker } from './lock';
import { Storage, type StorageOptions } from './storage';

Expand Down Expand Up @@ -42,7 +42,7 @@ export abstract class DocStorage<
> extends Storage<Opts> {
private readonly event = new EventEmitter2();
override readonly storageType = 'doc';
private readonly locker = new SingletonLocker();
protected readonly locker: Locker = new SingletonLocker();

// REGION: open apis by Op system
/**
Expand Down Expand Up @@ -243,7 +243,7 @@ export abstract class DocStorage<
return merge(updates.filter(bin => !isEmptyUpdate(bin)));
}

protected async lockDocForUpdate(docId: string): Promise<Lock> {
protected async lockDocForUpdate(docId: string): Promise<AsyncDisposable> {
return this.locker.lock(`workspace:${this.spaceId}:update`, docId);
}
}
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/storage/lock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface Locker {
lock(domain: string, resource: string): Promise<Lock>;
lock(domain: string, resource: string): Promise<AsyncDisposable>;
}

export class SingletonLocker implements Locker {
Expand Down
Loading
Loading