From 234ead5ab98a17b9a6b81eeeb1223132a59375f5 Mon Sep 17 00:00:00 2001 From: EYHN Date: Fri, 6 Dec 2024 09:45:01 +0900 Subject: [PATCH] feat(nbstore): better doc sync logic --- packages/common/nbstore/src/impls/idb/blob.ts | 2 +- packages/common/nbstore/src/impls/idb/db.ts | 41 ++--- packages/common/nbstore/src/impls/idb/doc.ts | 58 +++++++- packages/common/nbstore/src/impls/idb/lock.ts | 83 +++++++++++ .../common/nbstore/src/impls/idb/schema.ts | 11 ++ packages/common/nbstore/src/impls/idb/sync.ts | 47 +++++- packages/common/nbstore/src/storage/doc.ts | 6 +- packages/common/nbstore/src/storage/lock.ts | 2 +- packages/common/nbstore/src/storage/sync.ts | 13 +- packages/common/nbstore/src/sync/doc/peer.ts | 140 +++++++++--------- 10 files changed, 304 insertions(+), 99 deletions(-) create mode 100644 packages/common/nbstore/src/impls/idb/lock.ts diff --git a/packages/common/nbstore/src/impls/idb/blob.ts b/packages/common/nbstore/src/impls/idb/blob.ts index 2cc5a945c0991..02c67267d5a37 100644 --- a/packages/common/nbstore/src/impls/idb/blob.ts +++ b/packages/common/nbstore/src/impls/idb/blob.ts @@ -10,7 +10,7 @@ export class IndexedDBBlobStorage extends BlobStorage { readonly connection = share(new IDBConnection(this.options)); get db() { - return this.connection.inner; + return this.connection.inner.db; } override async get(key: string) { diff --git a/packages/common/nbstore/src/impls/idb/db.ts b/packages/common/nbstore/src/impls/idb/db.ts index 65facaa4e9a29..5397698ecdcfd 100644 --- a/packages/common/nbstore/src/impls/idb/db.ts +++ b/packages/common/nbstore/src/impls/idb/db.ts @@ -4,8 +4,11 @@ import { Connection } from '../../connection'; import type { StorageOptions } from '../../storage'; import { type DocStorageSchema, migrator } from './schema'; -export class IDBConnection extends Connection> { - private readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`; +export class IDBConnection extends Connection<{ + db: IDBPDatabase; + channel: BroadcastChannel; +}> { + readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`; override get shareId() { return `idb(${migrator.version}):${this.dbName}`; @@ -16,20 +19,23 @@ export class IDBConnection extends Connection> { } override async doConnect() { - return openDB(this.dbName, migrator.version, { - upgrade: migrator.migrate, - blocking: () => { - // if, for example, an tab with newer version is opened, this function will be called. - // we should close current connection to allow the new version to upgrade the db. - this.close( - new Error('Blocking a new version. Closing the connection.') - ); - }, - blocked: () => { - // fallback to retry auto retry - this.setStatus('error', new Error('Blocked by other tabs.')); - }, - }); + return { + db: await openDB(this.dbName, migrator.version, { + upgrade: migrator.migrate, + blocking: () => { + // if, for example, an tab with newer version is opened, this function will be called. + // we should close current connection to allow the new version to upgrade the db. + this.close( + new Error('Blocking a new version. Closing the connection.') + ); + }, + blocked: () => { + // fallback to retry auto retry + this.setStatus('error', new Error('Blocked by other tabs.')); + }, + }), + channel: new BroadcastChannel(this.dbName), + }; } override async doDisconnect() { @@ -37,7 +43,8 @@ export class IDBConnection extends Connection> { } private close(error?: Error) { - this.maybeConnection?.close(); + this.maybeConnection?.channel.close(); + this.maybeConnection?.db.close(); this.setStatus('closed', error); } } diff --git a/packages/common/nbstore/src/impls/idb/doc.ts b/packages/common/nbstore/src/impls/idb/doc.ts index d4ac6709c20b0..8451c831d941f 100644 --- a/packages/common/nbstore/src/impls/idb/doc.ts +++ b/packages/common/nbstore/src/impls/idb/doc.ts @@ -4,19 +4,37 @@ import { type DocClocks, type DocRecord, DocStorage, + type DocStorageOptions, type DocUpdate, } from '../../storage'; import { IDBConnection } from './db'; +import { IndexedDBLocker } from './lock'; + +interface ChannelMessage { + type: 'update'; + update: DocRecord; + origin?: string; +} export class IndexedDBDocStorage extends DocStorage { readonly connection = share(new IDBConnection(this.options)); get db() { - return this.connection.inner; + return this.connection.inner.db; + } + + get channel() { + return this.connection.inner.channel; } + override locker = new IndexedDBLocker(this.connection); + private _lastTimestamp = new Date(0); + constructor(options: DocStorageOptions) { + super(options); + } + private generateTimestamp() { const timestamp = new Date(); if (timestamp.getTime() <= this._lastTimestamp.getTime()) { @@ -47,6 +65,17 @@ export class IndexedDBDocStorage extends DocStorage { origin ); + this.channel.postMessage({ + type: 'update', + update: { + docId: update.docId, + bin: update.bin, + timestamp, + editor: update.editor, + }, + origin, + } satisfies ChannelMessage); + return { docId: update.docId, timestamp }; } @@ -144,4 +173,31 @@ export class IndexedDBDocStorage extends DocStorage { trx.commit(); return updates.length; } + + private docUpdateListener = 0; + + override subscribeDocUpdate( + callback: (update: DocRecord, origin?: string) => void + ): () => void { + if (this.docUpdateListener === 0) { + this.channel.addEventListener('message', this.handleChannelMessage); + } + this.docUpdateListener++; + + const dispose = super.subscribeDocUpdate(callback); + + return () => { + dispose(); + this.docUpdateListener--; + if (this.docUpdateListener === 0) { + this.channel.removeEventListener('message', this.handleChannelMessage); + } + }; + } + + handleChannelMessage(event: MessageEvent) { + if (event.data.type === 'update') { + this.emit('update', event.data.update, event.data.origin); + } + } } diff --git a/packages/common/nbstore/src/impls/idb/lock.ts b/packages/common/nbstore/src/impls/idb/lock.ts new file mode 100644 index 0000000000000..b07a0b6597e40 --- /dev/null +++ b/packages/common/nbstore/src/impls/idb/lock.ts @@ -0,0 +1,83 @@ +import EventEmitter2 from 'eventemitter2'; + +import { type Locker } from '../../storage/lock'; +import type { IDBConnection } from './db'; + +interface ChannelMessage { + type: 'unlock'; + key: string; +} + +export class IndexedDBLocker implements Locker { + get db() { + return this.dbConnection.inner.db; + } + private readonly eventEmitter = new EventEmitter2(); + + get channel() { + return this.dbConnection.inner.channel; + } + + constructor(private readonly dbConnection: IDBConnection) {} + + async lock(domain: string, resource: string) { + const key = `${domain}:${resource}`; + + // eslint-disable-next-line no-constant-condition + while (true) { + const trx = this.db.transaction('locks', 'readwrite'); + const record = await trx.store.get(key); + const lockTimestamp = record?.lock.getTime(); + + if ( + lockTimestamp && + lockTimestamp > Date.now() - 30000 /* lock timeout 3s */ + ) { + trx.commit(); + + await new Promise(resolve => { + const cleanup = () => { + this.channel.removeEventListener('message', channelListener); + this.eventEmitter.off('unlock', eventListener); + clearTimeout(timer); + }; + const channelListener = (event: MessageEvent) => { + if (event.data.type === 'unlock' && event.data.key === key) { + cleanup(); + resolve(); + } + }; + const eventListener = (unlockKey: string) => { + if (unlockKey === key) { + cleanup(); + resolve(); + } + }; + this.channel.addEventListener('message', channelListener); // add listener + this.eventEmitter.on('unlock', eventListener); + + const timer = setTimeout(() => { + cleanup(); + resolve(); + }, 3000); + // timeout to avoid dead lock + }); + continue; + } else { + await trx.store.put({ key, lock: new Date() }); + trx.commit(); + break; + } + } + + return { + [Symbol.asyncDispose]: async () => { + const trx = this.db.transaction('locks', 'readwrite'); + await trx.store.delete(key); + trx.commit(); + this.channel.postMessage({ type: 'unlock', key }); + this.eventEmitter.emit('unlock', key); + }, + }; + } +} diff --git a/packages/common/nbstore/src/impls/idb/schema.ts b/packages/common/nbstore/src/impls/idb/schema.ts index ec65f1228f65f..ce8d0263f581e 100644 --- a/packages/common/nbstore/src/impls/idb/schema.ts +++ b/packages/common/nbstore/src/impls/idb/schema.ts @@ -101,6 +101,13 @@ export interface DocStorageSchema extends DBSchema { peer: string; }; }; + locks: { + key: string; + value: { + key: string; + lock: Date; + }; + }; } const migrate: OpenDBCallbacks['upgrade'] = ( @@ -162,6 +169,10 @@ const init: Migrate = db => { keyPath: 'key', autoIncrement: false, }); + + db.createObjectStore('locks', { + keyPath: 'key', + }); }; // END REGION diff --git a/packages/common/nbstore/src/impls/idb/sync.ts b/packages/common/nbstore/src/impls/idb/sync.ts index ef99a479b1a1e..b359a1554db54 100644 --- a/packages/common/nbstore/src/impls/idb/sync.ts +++ b/packages/common/nbstore/src/impls/idb/sync.ts @@ -5,9 +5,24 @@ export class IndexedDBSyncStorage extends SyncStorage { readonly connection = share(new IDBConnection(this.options)); get db() { - return this.connection.inner; + return this.connection.inner.db; } + override async getPeerRemoteClock( + peer: string, + docId: string + ): Promise { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const record = await trx.store.get([peer, docId]); + + return record + ? { + docId: record.docId, + timestamp: record.clock, + } + : null; + } override async getPeerRemoteClocks(peer: string) { const trx = this.db.transaction('peerClocks', 'readonly'); @@ -34,6 +49,21 @@ export class IndexedDBSyncStorage extends SyncStorage { } } + override async getPeerPulledRemoteClock( + peer: string, + docId: string + ): Promise { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const record = await trx.store.get([peer, docId]); + + return record + ? { + docId: record.docId, + timestamp: record.pulledClock, + } + : null; + } override async getPeerPulledRemoteClocks(peer: string) { const trx = this.db.transaction('peerClocks', 'readonly'); @@ -59,6 +89,21 @@ export class IndexedDBSyncStorage extends SyncStorage { } } + override async getPeerPushedClock( + peer: string, + docId: string + ): Promise { + const trx = this.db.transaction('peerClocks', 'readonly'); + + const record = await trx.store.get([peer, docId]); + + return record + ? { + docId: record.docId, + timestamp: record.pushedClock, + } + : null; + } override async getPeerPushedClocks(peer: string) { const trx = this.db.transaction('peerClocks', 'readonly'); diff --git a/packages/common/nbstore/src/storage/doc.ts b/packages/common/nbstore/src/storage/doc.ts index a264bd346e9c7..3f5bfabe070bc 100644 --- a/packages/common/nbstore/src/storage/doc.ts +++ b/packages/common/nbstore/src/storage/doc.ts @@ -2,7 +2,7 @@ import EventEmitter2 from 'eventemitter2'; import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs'; import { isEmptyUpdate } from '../utils/is-empty-update'; -import type { Lock } from './lock'; +import type { Locker } from './lock'; import { SingletonLocker } from './lock'; import { Storage, type StorageOptions } from './storage'; @@ -42,7 +42,7 @@ export abstract class DocStorage< > extends Storage { private readonly event = new EventEmitter2(); override readonly storageType = 'doc'; - private readonly locker = new SingletonLocker(); + protected readonly locker: Locker = new SingletonLocker(); // REGION: open apis by Op system /** @@ -243,7 +243,7 @@ export abstract class DocStorage< return merge(updates.filter(bin => !isEmptyUpdate(bin))); } - protected async lockDocForUpdate(docId: string): Promise { + protected async lockDocForUpdate(docId: string): Promise { return this.locker.lock(`workspace:${this.spaceId}:update`, docId); } } diff --git a/packages/common/nbstore/src/storage/lock.ts b/packages/common/nbstore/src/storage/lock.ts index d83c2dc03ce64..fe485e0a15cae 100644 --- a/packages/common/nbstore/src/storage/lock.ts +++ b/packages/common/nbstore/src/storage/lock.ts @@ -1,5 +1,5 @@ export interface Locker { - lock(domain: string, resource: string): Promise; + lock(domain: string, resource: string): Promise; } export class SingletonLocker implements Locker { diff --git a/packages/common/nbstore/src/storage/sync.ts b/packages/common/nbstore/src/storage/sync.ts index 605f1ac92f4ca..cc10c3cf41420 100644 --- a/packages/common/nbstore/src/storage/sync.ts +++ b/packages/common/nbstore/src/storage/sync.ts @@ -8,14 +8,25 @@ export abstract class SyncStorage< > extends Storage { override readonly storageType = 'sync'; + abstract getPeerRemoteClock( + peer: string, + docId: string + ): Promise; abstract getPeerRemoteClocks(peer: string): Promise; abstract setPeerRemoteClock(peer: string, clock: DocClock): Promise; + abstract getPeerPulledRemoteClock( + peer: string, + docId: string + ): Promise; abstract getPeerPulledRemoteClocks(peer: string): Promise; - abstract setPeerPulledRemoteClock( peer: string, clock: DocClock ): Promise; + abstract getPeerPushedClock( + peer: string, + docId: string + ): Promise; abstract getPeerPushedClocks(peer: string): Promise; abstract setPeerPushedClock(peer: string, clock: DocClock): Promise; abstract clearClocks(): Promise; diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index 01c26082c60b5..6526c52342210 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -32,7 +32,7 @@ type Job = type: 'save'; docId: string; update?: Uint8Array; - serverClock: Date; + remoteClock: Date; }; interface Status { @@ -41,8 +41,6 @@ interface Status { jobDocQueue: AsyncPriorityQueue; jobMap: Map; remoteClocks: ClockMap; - pulledRemoteClocks: ClockMap; - pushedClocks: ClockMap; syncing: boolean; retrying: boolean; errorMessage: string | null; @@ -81,7 +79,7 @@ export class DocSyncPeer { /** * random unique id for recognize self in "update" event */ - private readonly uniqueId = nanoid(); + private readonly uniqueId = `sync:${this.local.peer}:${this.remote.peer}:${nanoid()}`; private readonly prioritySettings = new Map(); constructor( @@ -97,8 +95,6 @@ export class DocSyncPeer { jobDocQueue: new AsyncPriorityQueue(), jobMap: new Map(), remoteClocks: new ClockMap(new Map()), - pulledRemoteClocks: new ClockMap(new Map()), - pushedClocks: new ClockMap(new Map()), syncing: false, retrying: false, errorMessage: null, @@ -107,14 +103,23 @@ export class DocSyncPeer { private readonly jobs = createJobErrorCatcher({ connect: async (docId: string, signal?: AbortSignal) => { - const pushedClock = this.status.pushedClocks.get(docId); + const pushedClock = + (await this.syncMetadata.getPeerPushedClock(this.remote.peer, docId)) + ?.timestamp ?? null; const clock = await this.local.getDocTimestamp(docId); throwIfAborted(signal); if (pushedClock === null || pushedClock !== clock?.timestamp) { await this.jobs.pullAndPush(docId, signal); } else { - const pulled = this.status.pulledRemoteClocks.get(docId); + // no need to push + const pulled = + ( + await this.syncMetadata.getPeerPulledRemoteClock( + this.remote.peer, + docId + ) + )?.timestamp ?? null; if (pulled === null || pulled !== this.status.remoteClocks.get(docId)) { await this.jobs.pull(docId, signal); } @@ -133,6 +138,7 @@ export class DocSyncPeer { (a, b) => (a.getTime() > b.clock.getTime() ? a : b.clock), new Date(0) ); + const merged = await this.mergeUpdates( jobs.map(j => j.update).filter(update => !isEmptyUpdate(update)) ); @@ -147,19 +153,22 @@ export class DocSyncPeer { this.schedule({ type: 'save', docId, - serverClock: timestamp, + remoteClock: timestamp, }); } throwIfAborted(signal); - await this.actions.updatePushedClock(docId, maxClock); + await this.syncMetadata.setPeerPushedClock(this.remote.peer, { + docId, + timestamp: maxClock, + }); } }, pullAndPush: async (docId: string, signal?: AbortSignal) => { - const docRecord = await this.local.getDoc(docId); + const localDocRecord = await this.local.getDoc(docId); const stateVector = - docRecord && !isEmptyUpdate(docRecord.bin) - ? encodeStateVectorFromUpdate(docRecord.bin) + localDocRecord && !isEmptyUpdate(localDocRecord.bin) + ? encodeStateVectorFromUpdate(localDocRecord.bin) : new Uint8Array(); const remoteDocRecord = await this.remote.getDocDiff(docId, stateVector); @@ -167,12 +176,12 @@ export class DocSyncPeer { const { missing: newData, state: serverStateVector, - timestamp: serverClock, + timestamp: remoteClock, } = remoteDocRecord; this.schedule({ type: 'save', docId, - serverClock, + remoteClock, }); throwIfAborted(signal); const { timestamp: localClock } = await this.local.pushDocUpdate( @@ -183,14 +192,17 @@ export class DocSyncPeer { this.uniqueId ); throwIfAborted(signal); - await this.actions.updatePulledRemoteClock(docId, serverClock); + await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, { + docId, + timestamp: remoteClock, + }); const diff = - docRecord && serverStateVector && serverStateVector.length > 0 - ? diffUpdate(docRecord.bin, serverStateVector) - : docRecord?.bin; + localDocRecord && serverStateVector && serverStateVector.length > 0 + ? diffUpdate(localDocRecord.bin, serverStateVector) + : localDocRecord?.bin; if (diff && !isEmptyUpdate(diff)) { throwIfAborted(signal); - const { timestamp: serverClock } = await this.remote.pushDocUpdate( + const { timestamp: remoteClock } = await this.remote.pushDocUpdate( { bin: diff, docId, @@ -200,18 +212,21 @@ export class DocSyncPeer { this.schedule({ type: 'save', docId, - serverClock, + remoteClock, }); } throwIfAborted(signal); - await this.actions.updatePushedClock(docId, localClock); + await this.syncMetadata.setPeerPushedClock(this.remote.peer, { + docId, + timestamp: localClock, + }); } else { - if (docRecord) { - if (!isEmptyUpdate(docRecord.bin)) { + if (localDocRecord) { + if (!isEmptyUpdate(localDocRecord.bin)) { throwIfAborted(signal); - const { timestamp: serverClock } = await this.remote.pushDocUpdate( + const { timestamp: remoteClock } = await this.remote.pushDocUpdate( { - bin: docRecord.bin, + bin: localDocRecord.bin, docId, }, this.uniqueId @@ -219,10 +234,13 @@ export class DocSyncPeer { this.schedule({ type: 'save', docId, - serverClock, + remoteClock, }); } - await this.actions.updatePushedClock(docId, docRecord.timestamp); + await this.syncMetadata.setPeerPushedClock(this.remote.peer, { + docId, + timestamp: localDocRecord.timestamp, + }); } } }, @@ -237,7 +255,7 @@ export class DocSyncPeer { if (!serverDoc) { return; } - const { missing: newData, timestamp: serverClock } = serverDoc; + const { missing: newData, timestamp: remoteClock } = serverDoc; throwIfAborted(signal); await this.local.pushDocUpdate( { @@ -247,11 +265,14 @@ export class DocSyncPeer { this.uniqueId ); throwIfAborted(signal); - await this.actions.updatePulledRemoteClock(docId, serverClock); + await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, { + docId, + timestamp: remoteClock, + }); this.schedule({ type: 'save', docId, - serverClock, + remoteClock: remoteClock, }); }, save: async ( @@ -259,8 +280,8 @@ export class DocSyncPeer { jobs: (Job & { type: 'save' })[], signal?: AbortSignal ) => { - const serverClock = jobs.reduce( - (a, b) => (a.getTime() > b.serverClock.getTime() ? a : b.serverClock), + const remoteClock = jobs.reduce( + (a, b) => (a.getTime() > b.remoteClock.getTime() ? a : b.remoteClock), new Date(0) ); if (this.status.connectedDocs.has(docId)) { @@ -282,7 +303,10 @@ export class DocSyncPeer { ); throwIfAborted(signal); - await this.actions.updatePulledRemoteClock(docId, serverClock); + await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, { + docId, + timestamp: remoteClock, + }); } }, }); @@ -298,29 +322,6 @@ export class DocSyncPeer { this.statusUpdatedSubject$.next(docId); } }, - updatePushedClock: async (docId: string, pushedClock: Date) => { - const updated = this.status.pushedClocks.setIfBigger(docId, pushedClock); - if (updated) { - await this.syncMetadata.setPeerPushedClock(this.remote.peer, { - docId, - timestamp: pushedClock, - }); - this.statusUpdatedSubject$.next(docId); - } - }, - updatePulledRemoteClock: async (docId: string, pulledClock: Date) => { - const updated = this.status.pulledRemoteClocks.setIfBigger( - docId, - pulledClock - ); - if (updated) { - await this.syncMetadata.setPeerPulledRemoteClock(this.remote.peer, { - docId, - timestamp: pulledClock, - }); - this.statusUpdatedSubject$.next(docId); - } - }, addDoc: (docId: string) => { if (!this.status.docs.has(docId)) { this.status.docs.add(docId); @@ -370,7 +371,7 @@ export class DocSyncPeer { this.schedule({ type: 'save', docId, - serverClock: remoteClock, + remoteClock: remoteClock, update, }); }, @@ -396,8 +397,6 @@ export class DocSyncPeer { connectedDocs: new Set(), jobDocQueue: new AsyncPriorityQueue(), jobMap: new Map(), - pulledRemoteClocks: new ClockMap(new Map()), - pushedClocks: new ClockMap(new Map()), remoteClocks: new ClockMap(new Map()), syncing: false, // tell ui to show retrying status @@ -478,7 +477,13 @@ export class DocSyncPeer { // subscribe local doc updates disposes.push( this.local.subscribeDocUpdate((update, origin) => { - if (origin === this.uniqueId) { + if ( + origin === this.uniqueId || + origin?.startsWith( + `sync:${this.local.peer}:${this.remote.peer}:` + // skip if local and remote is same + ) + ) { return; } this.events.localUpdated({ @@ -517,19 +522,6 @@ export class DocSyncPeer { for (const [id, v] of Object.entries(cachedClocks)) { this.status.remoteClocks.set(id, v); } - const pulledClocks = await this.syncMetadata.getPeerPulledRemoteClocks( - this.remote.peer - ); - for (const [id, v] of Object.entries(pulledClocks)) { - this.status.pulledRemoteClocks.set(id, v); - } - const pushedClocks = await this.syncMetadata.getPeerPushedClocks( - this.remote.peer - ); - throwIfAborted(signal); - for (const [id, v] of Object.entries(pushedClocks)) { - this.status.pushedClocks.set(id, v); - } this.statusUpdatedSubject$.next(true); // get new clocks from server