From 18dbd7f0ae6ed0182ad48dcd114985be2a0af1df Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 27 Nov 2024 18:42:25 +0200 Subject: [PATCH] wip: share db worker ports --- .../components/providers/SystemProvider.tsx | 21 ++++--- .../adapters/wa-sqlite/WASQLiteConnection.ts | 29 ++++++++- .../SharedWebStreamingSyncImplementation.ts | 21 +++++-- .../sync/AbstractSharedSyncClientProvider.ts | 1 + .../worker/sync/SharedSyncImplementation.ts | 62 +++++++++++++++---- 5 files changed, 106 insertions(+), 28 deletions(-) diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index 2a25434c..110e34ed 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,7 +3,7 @@ import { AppSchema } from '@/library/powersync/AppSchema'; import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web'; +import { PowerSyncDatabase } from '@powersync/web'; import Logger from 'js-logger'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -13,15 +13,18 @@ export const useSupabase = () => React.useContext(SupabaseContext); export const db = new PowerSyncDatabase({ schema: AppSchema, - database: new WASQLiteOpenFactory({ - dbFilename: 'examplse.db', - vfs: WASQLiteVFS.OPFSCoopSyncVFS, - // Can't use a shared worker for OPFS - flags: { enableMultiTabs: false } - }), - flags: { - enableMultiTabs: false + database: { + dbFilename: 's.sqlite' } + // database: new WASQLiteOpenFactory({ + // dbFilename: 'examplse.db', + // vfs: WASQLiteVFS.OPFSCoopSyncVFS, + // // Can't use a shared worker for OPFS + // flags: { enableMultiTabs: false } + // }), + // flags: { + // enableMultiTabs: false + // } }); export const SystemProvider = ({ children }: { children: React.ReactNode }) => { diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts index 3f06a966..72f0b9f1 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts @@ -12,33 +12,57 @@ export enum WASQLiteVFS { AccessHandlePoolVFS = 'AccessHandlePoolVFS' } +/** + * @internal + */ export type WASQLiteConnectionListener = { tablesUpdated: (event: BatchedUpdateNotification) => void; }; -// FIXME there are no types for Module +/** + * @internal + */ export type SQLiteModule = Parameters[0]; + +/** + * @internal + */ export type WASQLiteModuleFactoryOptions = { dbFileName: string }; +/** + * @internal + */ export type WASQLiteModuleFactory = ( options: WASQLiteModuleFactoryOptions ) => Promise<{ module: SQLiteModule; vfs: SQLiteVFS }>; +/** + * @internal + */ export type WASQLiteOpenOptions = { dbFileName: string; vfs?: WASQLiteVFS; }; +/** + * @internal + */ export const AsyncWASQLiteModuleFactory = async () => { const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); return factory(); }; +/** + * @internal + */ export const SyncWASQLiteModuleFactory = async () => { const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs'); return factory(); }; +/** + * @internal + */ export const DEFAULT_MODULE_FACTORIES = { [WASQLiteVFS.IDBBatchAtomicVFS]: async (options: WASQLiteModuleFactoryOptions) => { const module = await AsyncWASQLiteModuleFactory(); @@ -69,6 +93,9 @@ export const DEFAULT_MODULE_FACTORIES = { } }; +/** + * @internal + */ export class WASqliteConnection extends BaseObserver { private _sqliteAPI: SQLiteAPI | null = null; private _dbP: number | null = null; diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 934cfa58..cffe6b38 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -7,11 +7,11 @@ import { SharedSyncClientEvent, SharedSyncImplementation } from '../../worker/sync/SharedSyncImplementation'; +import { resolveWebSQLFlags } from '../adapters/web-sql-flags'; import { WebStreamingSyncImplementation, WebStreamingSyncImplementationOptions } from './WebStreamingSyncImplementation'; -import { resolveWebSQLFlags } from '../adapters/web-sql-flags'; /** * The shared worker will trigger methods on this side of the message port @@ -20,11 +20,16 @@ import { resolveWebSQLFlags } from '../adapters/web-sql-flags'; class SharedSyncClientProvider extends AbstractSharedSyncClientProvider { constructor( protected options: WebStreamingSyncImplementationOptions, - public statusChanged: (status: SyncStatusOptions) => void + public statusChanged: (status: SyncStatusOptions) => void, + protected dbWorkerPort: MessagePort ) { super(); } + async getDBWorkerPort(): Promise { + return Comlink.transfer(this.dbWorkerPort, [this.dbWorkerPort]); + } + async fetchCredentials(): Promise { const credentials = await this.options.remote.getCredentials(); if (credentials == null) { @@ -142,7 +147,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem const flags = { ...this.webOptions.flags, workers: undefined }; - this.isInitialized = this.syncManager.init(Comlink.transfer(dbOpenerPort, [dbOpenerPort]), { + this.isInitialized = this.syncManager.setParams({ dbName: this.options.identifier!, streamOptions: { crudUploadThrottleMs, @@ -155,9 +160,13 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem /** * Pass along any sync status updates to this listener */ - this.clientProvider = new SharedSyncClientProvider(this.webOptions, (status) => { - this.iterateListeners((l) => this.updateSyncStatus(status)); - }); + this.clientProvider = new SharedSyncClientProvider( + this.webOptions, + (status) => { + this.iterateListeners((l) => this.updateSyncStatus(status)); + }, + dbOpenerPort + ); /** * The sync worker will call this client provider when it needs diff --git a/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts b/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts index d909119c..2713f13f 100644 --- a/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts +++ b/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts @@ -7,6 +7,7 @@ export abstract class AbstractSharedSyncClientProvider { abstract fetchCredentials(): Promise; abstract uploadCrud(): Promise; abstract statusChanged(status: SyncStatusOptions): void; + abstract getDBWorkerPort(): Promise; abstract trace(...x: any[]): void; abstract debug(...x: any[]): void; diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index effbe477..92a50cf8 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -21,9 +21,9 @@ import { } from '../../db/sync/WebStreamingSyncImplementation'; import { WASQLiteDBAdapter } from '../../db/adapters/wa-sqlite/WASQLiteDBAdapter'; +import { getNavigatorLocks } from '../../shared/navigator'; import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider'; import { BroadcastLogger } from './BroadcastLogger'; -import { getNavigatorLocks } from '../../shared/navigator'; /** * Manual message events for shared sync clients @@ -46,6 +46,12 @@ export type SharedSyncInitOptions = { streamOptions: Omit; }; +type TrackedClientDB = { + client: AbstractSharedSyncClientProvider; + db: DBAdapter; + port: MessagePort; +}; + export interface SharedSyncImplementationListener extends StreamingSyncImplementationListener { initialized: () => void; } @@ -53,6 +59,7 @@ export interface SharedSyncImplementationListener extends StreamingSyncImplement export type WrappedSyncPort = { port: MessagePort; clientProvider: Comlink.Remote; + db?: DBAdapter; }; export type RemoteOperationAbortController = { @@ -79,6 +86,7 @@ export class SharedSyncImplementation protected dbAdapter: DBAdapter | null; protected syncParams: SharedSyncInitOptions | null; protected logger: ILogger; + protected lastConnectOptions: PowerSyncConnectionOptions | undefined; syncStatus: SyncStatus; broadCastLogger: ILogger; @@ -90,6 +98,7 @@ export class SharedSyncImplementation this.syncParams = null; this.syncStreamClient = null; this.logger = Logger.get('shared-sync'); + this.lastConnectOptions = undefined; this.isInitialized = new Promise((resolve) => { const callback = this.registerListener({ @@ -124,19 +133,12 @@ export class SharedSyncImplementation /** * Configures the DBAdapter connection and a streaming sync client. */ - async init(dbWorkerPort: MessagePort, params: SharedSyncInitOptions) { - if (this.dbAdapter) { + async setParams(params: SharedSyncInitOptions) { + if (this.syncParams) { // Cannot modify already existing sync implementation return; } - this.dbAdapter = new WASQLiteDBAdapter({ - dbFilename: params.dbName, - workerPort: dbWorkerPort, - flags: { enableMultiTabs: true, useWebWorker: true }, - logger: this.logger - }); - this.syncParams = params; if (params.streamOptions?.flags?.broadcastLogs) { @@ -148,6 +150,19 @@ export class SharedSyncImplementation this.logger.error('Uncaught exception in PowerSync shared sync worker', event); }; + // Ask for a new DB worker port handler + // We can only ask once per client provider since the port + // can only be transferred once + // TODO share logic here + const lastClient = this.ports[this.ports.length - 1]; + const workerPort = await lastClient.clientProvider.getDBWorkerPort(); + this.dbAdapter = lastClient.db = new WASQLiteDBAdapter({ + dbFilename: this.syncParams?.dbName!, + workerPort, + flags: { enableMultiTabs: true, useWebWorker: true }, + logger: this.logger + }); + this.iterateListeners((l) => l.initialized?.()); } @@ -168,7 +183,7 @@ export class SharedSyncImplementation // This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized return getNavigatorLocks().request('shared-sync-connect', async () => { this.syncStreamClient = this.generateStreamingImplementation(); - + this.lastConnectOptions = options; this.syncStreamClient.registerListener({ statusChanged: (status) => { this.updateAllStatuses(status.toJSON()); @@ -210,7 +225,7 @@ export class SharedSyncImplementation * Removes a message port client from this manager's managed * clients. */ - removePort(port: MessagePort) { + async removePort(port: MessagePort) { const index = this.ports.findIndex((p) => p.port == port); if (index < 0) { console.warn(`Could not remove port ${port} since it is not present in active ports.`); @@ -218,6 +233,10 @@ export class SharedSyncImplementation } const trackedPort = this.ports[index]; + if (trackedPort.db) { + trackedPort.db.close(); + } + // Release proxy trackedPort.clientProvider[Comlink.releaseProxy](); this.ports.splice(index, 1); @@ -231,6 +250,25 @@ export class SharedSyncImplementation abortController!.controller.abort(new AbortOperation('Closing pending requests after client port is removed')); } }); + + if (this.dbAdapter == trackedPort.db && this.syncStreamClient) { + // The db adapter belonged to a client which has closed. We need to reconnect + // FIXME better closing + // this.dbAdapter!.close(); + + await this.disconnect(); + // Ask for a new DB worker port handler + const lastClient = this.ports[this.ports.length - 1]; + const workerPort = await lastClient.clientProvider.getDBWorkerPort(); + + this.dbAdapter = lastClient.db = new WASQLiteDBAdapter({ + dbFilename: this.syncParams?.dbName!, + workerPort, + flags: { enableMultiTabs: true, useWebWorker: true }, + logger: this.logger + }); + await this.connect(this.lastConnectOptions); + } } triggerCrudUpload() {