Skip to content

Commit

Permalink
Add BroadcastChannel for sharing table change notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Dec 2, 2024
1 parent a1d6d64 commit ff93d0d
Show file tree
Hide file tree
Showing 2 changed files with 350 additions and 414 deletions.
45 changes: 41 additions & 4 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ export enum WASQLiteVFS {
AccessHandlePoolVFS = 'AccessHandlePoolVFS'
}

export type WASQLiteBroadCastTableUpdateEvent = {
changedTables: Set<string>;
connectionId: number;
};

/**
* @internal
*/
Expand Down Expand Up @@ -106,11 +111,19 @@ export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener>
protected updatedTables: Set<string>;
protected updateTimer: ReturnType<typeof setTimeout> | null;
protected statementMutex: Mutex;
protected broadcastChannel: BroadcastChannel | null;
/**
* Unique id for this specific connection. This is used to prevent broadcast table change
* notification loops.
*/
protected connectionId: number;

constructor(protected options: WASQLiteOpenOptions) {
super();
this.updatedTables = new Set();
this.updateTimer = null;
this.broadcastChannel = null;
this.connectionId = new Date().valueOf() + Math.random() * 1000;
this.statementMutex = new Mutex();
this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs ?? WASQLiteVFS.IDBBatchAtomicVFS];
}
Expand Down Expand Up @@ -146,24 +159,47 @@ export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener>
return sqlite3;
}

protected registerBroadcastListeners() {
this.broadcastChannel = new BroadcastChannel(`${this.options.dbFilename}-table-updates`);
this.broadcastChannel.addEventListener('message', (event) => {
const data: WASQLiteBroadCastTableUpdateEvent = event.data;
if (this.connectionId == data.connectionId) {
// Ignore messages from the same connection
return;
}
this.queueTableUpdate(data.changedTables);
});
}

protected queueTableUpdate(tableNames: Set<string>) {
tableNames.forEach((tableName) => this.updatedTables.add(tableName));
if (this.updateTimer == null) {
this.updateTimer = setTimeout(() => this.fireUpdates(), 0);
}
}

async init() {
this._sqliteAPI = await this.openSQLiteAPI();
await this.openDB();
this.registerBroadcastListeners();

this.sqliteAPI.update_hook(this.dbP, (updateType: number, dbName: string | null, tableName: string | null) => {
if (!tableName) {
return;
}
this.updatedTables.add(tableName);
if (this.updateTimer == null) {
this.updateTimer = setTimeout(() => this.fireUpdates(), 0);
}
const changedTables = new Set([tableName]);
this.queueTableUpdate(changedTables);
});
}

fireUpdates() {
this.updateTimer = null;
const event: BatchedUpdateNotification = { tables: [...this.updatedTables], groupedUpdates: {}, rawUpdates: [] };
// Share to other connections
this.broadcastChannel!.postMessage({
changedTables: this.updatedTables,
connectionId: this.connectionId
} satisfies WASQLiteBroadCastTableUpdateEvent);
this.updatedTables.clear();
this.iterateListeners((cb) => cb.tablesUpdated?.(event));
}
Expand Down Expand Up @@ -238,6 +274,7 @@ export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener>
}

async close() {
this.broadcastChannel?.close();
await this.sqliteAPI.close(this.dbP);
}

Expand Down
Loading

0 comments on commit ff93d0d

Please sign in to comment.