diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index 6e1e5e0f..8bebaf08 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,7 +3,7 @@ import { AppSchema } from '@/library/powersync/AppSchema'; import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web'; +import { PowerSyncDatabase } from '@powersync/web'; import Logger from 'js-logger'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -13,14 +13,17 @@ export const useSupabase = () => React.useContext(SupabaseContext); export const db = new PowerSyncDatabase({ schema: AppSchema, - // database: { - // dbFilename: 's.sqlite' - // } - database: new WASQLiteOpenFactory({ - dbFilename: 'examplsw1se112.db', - // vfs: WASQLiteVFS.OPFSCoopSyncVFS - vfs: WASQLiteVFS.OPFSCoopSyncVFS //Out of memory errors on iOS Safari - }) + database: { + dbFilename: 's.sqlite' + }, + flags: { + enableMultiTabs: true + } + // database: new WASQLiteOpenFactory({ + // dbFilename: 'examplsw1se112.db' + // // vfs: WASQLiteVFS.OPFSCoopSyncVFS + // // vfs: WASQLiteVFS.OPFSCoopSyncVFS //Out of memory errors on iOS Safari + // }) }); export const SystemProvider = ({ children }: { children: React.ReactNode }) => { diff --git a/packages/web/package.json b/packages/web/package.json index 4bcb9684..e2aa6cd9 100644 --- a/packages/web/package.json +++ b/packages/web/package.json @@ -67,7 +67,7 @@ "@powersync/common": "workspace:*", "async-mutex": "^0.4.0", "bson": "^6.6.0", - "comlink": "^4.4.1", + "comlink": "^4.4.2", "commander": "^12.1.0", "js-logger": "^1.6.1" }, diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index 1d95fa6b..109fdb08 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -73,7 +73,7 @@ export const DEFAULT_POWERSYNC_FLAGS: Required = { externallyUnload: false }; -export const resolveWebPowerSyncFlags = (flags?: WebPowerSyncFlags): WebPowerSyncFlags => { +export const resolveWebPowerSyncFlags = (flags?: WebPowerSyncFlags): Required => { return { ...DEFAULT_POWERSYNC_FLAGS, ...flags, diff --git a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts index 6c2f0265..c09b9120 100644 --- a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts @@ -1,4 +1,5 @@ -import { BatchedUpdateNotification, QueryResult, SQLOpenOptions } from '@powersync/common'; +import { BatchedUpdateNotification, QueryResult } from '@powersync/common'; +import { ResolvedWebSQLOpenOptions } from './web-sql-flags'; /** * Proxied query result does not contain a function for accessing row values @@ -24,6 +25,6 @@ export interface AsyncDatabaseConnection { registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void>; } -export type OpenAsyncDatabaseConnection = ( +export type OpenAsyncDatabaseConnection = ( options: Options ) => AsyncDatabaseConnection; diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 831671e4..c945f60d 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -1,23 +1,34 @@ import * as Comlink from 'comlink'; -import { AsyncDatabaseConnection, OnTableChangeCallback, ProxiedQueryResult } from './AsyncDatabaseConnection'; +import { + AsyncDatabaseConnection, + OnTableChangeCallback, + OpenAsyncDatabaseConnection, + ProxiedQueryResult +} from './AsyncDatabaseConnection'; +import { ResolvedWebSQLOpenOptions } from './web-sql-flags'; export type SharedConnectionWorker = { identifier: string; port: MessagePort; }; -export type WrappedWorkerConnectionOptions = { +export type WrappedWorkerConnectionOptions = { baseConnection: AsyncDatabaseConnection; identifier: string; - worker: Worker | MessagePort; + /** + * Need a remote in order to keep a reference to the Proxied worker + */ + remote: Comlink.Remote>; }; /** * Wraps a provided instance of {@link AsyncDatabaseConnection}, providing necessary proxy * functions for worker listeners. */ -export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnection { - constructor(protected options: WrappedWorkerConnectionOptions) {} +export class WorkerWrappedAsyncDatabaseConnection + implements AsyncDatabaseConnection +{ + constructor(protected options: WrappedWorkerConnectionOptions) {} protected get baseConnection() { return this.options.baseConnection; @@ -31,20 +42,10 @@ export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnec * Get a MessagePort which can be used to share the internals of this connection. */ async shareConnection(): Promise { - const { identifier, worker } = this.options; - if (worker instanceof Worker) { - // We can't transfer a Worker instance, need a MessagePort - // Comlink provides a nice utility for exposing a MessagePort - // from a Worker - const temp = Comlink.wrap(worker); - const newPort = await temp[Comlink.createEndpoint](); - return { port: newPort, identifier }; - } + const { identifier, remote } = this.options; - return { - identifier: identifier, - port: worker - }; + const newPort = await remote[Comlink.createEndpoint](); + return { port: newPort, identifier }; } /** @@ -55,8 +56,9 @@ export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnec return this.baseConnection.registerOnTableChange(Comlink.proxy(callback)); } - close(): Promise { - return this.baseConnection.close(); + async close(): Promise { + await this.baseConnection.close(); + this.options.remote[Comlink.releaseProxy](); } execute(sql: string, params?: any[]): Promise { diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts index 818f2ff6..7160ef11 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts @@ -1,8 +1,10 @@ import { type PowerSyncOpenFactoryOptions } from '@powersync/common'; import * as Comlink from 'comlink'; +import { resolveWebPowerSyncFlags } from '../../PowerSyncDatabase'; import { OpenAsyncDatabaseConnection } from '../AsyncDatabaseConnection'; import { LockedAsyncDatabaseAdapter } from '../LockedAsyncDatabaseAdapter'; import { ResolvedWebSQLOpenOptions, WebSQLFlags } from '../web-sql-flags'; +import { WorkerWrappedAsyncDatabaseConnection } from '../WorkerWrappedAsyncDatabaseConnection'; import { WASQLiteVFS } from './WASQLiteConnection'; import { WASQLiteOpenFactory } from './WASQLiteOpenFactory'; @@ -36,8 +38,12 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter { openConnection: async () => { const { workerPort } = options; if (workerPort) { - const wrapped = Comlink.wrap(workerPort); - return wrapped(options); + const remote = Comlink.wrap(workerPort); + return new WorkerWrappedAsyncDatabaseConnection({ + remote, + identifier: options.dbFilename, + baseConnection: await remote({ ...options, flags: resolveWebPowerSyncFlags(options.flags) }) + }); } const openFactory = new WASQLiteOpenFactory({ dbFilename: options.dbFilename, diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts index aceabd46..00d31118 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts @@ -4,14 +4,17 @@ import { openWorkerDatabasePort, resolveWorkerDatabasePortFactory } from '../../ import { AbstractWebSQLOpenFactory } from '../AbstractWebSQLOpenFactory'; import { AsyncDatabaseConnection, OpenAsyncDatabaseConnection } from '../AsyncDatabaseConnection'; import { LockedAsyncDatabaseAdapter } from '../LockedAsyncDatabaseAdapter'; -import { WebSQLOpenFactoryOptions } from '../web-sql-flags'; +import { ResolvedWebSQLOpenOptions, WebSQLOpenFactoryOptions } from '../web-sql-flags'; import { WorkerWrappedAsyncDatabaseConnection } from '../WorkerWrappedAsyncDatabaseConnection'; -import { WASqliteConnection, WASQLiteOpenOptions, WASQLiteVFS } from './WASQLiteConnection'; +import { WASqliteConnection, WASQLiteVFS } from './WASQLiteConnection'; export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions { vfs?: WASQLiteVFS; } +export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions { + vfs: WASQLiteVFS; +} /** * Opens a SQLite connection using WA-SQLite. */ @@ -36,6 +39,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory { async openConnection(): Promise { const { enableMultiTabs, useWebWorker } = this.resolvedFlags; + const { vfs = WASQLiteVFS.IDBBatchAtomicVFS } = this.waOptions; + if (!enableMultiTabs) { this.logger.warn('Multiple tabs are not enabled in this browser'); } @@ -53,16 +58,16 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory { ) : openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs); - const workerDBOpener = Comlink.wrap>(workerPort); + const workerDBOpener = Comlink.wrap>(workerPort); return new WorkerWrappedAsyncDatabaseConnection({ + remote: workerDBOpener, baseConnection: await workerDBOpener({ dbFilename: this.options.dbFilename, - vfs: this.waOptions.vfs, + vfs, flags: this.resolvedFlags }), - identifier: this.options.dbFilename, - worker: workerPort + identifier: this.options.dbFilename }); } else { // Don't use a web worker @@ -70,7 +75,7 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory { dbFilename: this.options.dbFilename, dbLocation: this.options.dbLocation, debugMode: this.options.debugMode, - vfs: this.waOptions.vfs, + vfs, flags: this.resolvedFlags }); } diff --git a/packages/web/src/worker/db/WASQLiteDB.worker.ts b/packages/web/src/worker/db/WASQLiteDB.worker.ts index d9848708..f1f1db0a 100644 --- a/packages/web/src/worker/db/WASQLiteDB.worker.ts +++ b/packages/web/src/worker/db/WASQLiteDB.worker.ts @@ -25,14 +25,14 @@ let nextClientId = 1; const openWorkerConnection = async (options: WASQLiteOpenOptions): Promise => { const connection = new WASqliteConnection(options); return { - init: () => connection.init(), - close: () => connection.close(), - execute: async (sql: string, params?: any[]) => connection.execute(sql, params), - executeBatch: async (sql: string, params?: any[]) => connection.executeBatch(sql, params), - registerOnTableChange: async (callback) => { + init: Comlink.proxy(() => connection.init()), + close: Comlink.proxy(() => connection.close()), + execute: Comlink.proxy(async (sql: string, params?: any[]) => connection.execute(sql, params)), + executeBatch: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeBatch(sql, params)), + registerOnTableChange: Comlink.proxy(async (callback) => { // Proxy the callback remove function return Comlink.proxy(await connection.registerOnTableChange(callback)); - } + }) }; }; @@ -57,8 +57,12 @@ const openDBShared = async (options: WASQLiteOpenOptions): Promise { + // the init has been done automatically + }), close: Comlink.proxy(() => { const { clientIds } = dbEntry; + console.debug(`Close requested from client ${clientId} of ${[...clientIds]}`); clientIds.delete(clientId); if (clientIds.size == 0) { console.debug(`Closing connection to ${dbFilename}.`); @@ -82,14 +86,14 @@ if (typeof SharedWorkerGlobalScope !== 'undefined') { console.debug('Exposing shared db on port', port); Comlink.expose(openDBShared, port); }; - - addEventListener('unload', () => { - Array.from(DBMap.values()).forEach(async (dbConnection) => { - const db = await dbConnection.db; - db.close?.(); - }); - }); } else { // A dedicated worker can be shared externally Comlink.expose(openDBShared); } + +addEventListener('unload', () => { + Array.from(DBMap.values()).forEach(async (dbConnection) => { + const { db } = dbConnection; + db.close?.(); + }); +}); diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index ee80e309..d4cf9461 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -159,11 +159,12 @@ export class SharedSyncImplementation // TODO share logic here const lastClient = this.ports[this.ports.length - 1]; const workerPort = await lastClient.clientProvider.getDBWorkerPort(); + const remote = Comlink.wrap>(workerPort); const locked = new LockedAsyncDatabaseAdapter({ name: this.syncParams!.dbName, openConnection: async () => { - const remote = Comlink.wrap>(workerPort); return new WorkerWrappedAsyncDatabaseConnection({ + remote, baseConnection: await remote({ dbFilename: this.syncParams!.dbName, // TODO improve @@ -175,8 +176,7 @@ export class SharedSyncImplementation ssrMode: false } }), - identifier: this.syncParams!.dbName, - worker: workerPort + identifier: this.syncParams!.dbName }); }, logger: this.logger @@ -273,30 +273,29 @@ export class SharedSyncImplementation }); if (this.dbAdapter == trackedPort.db && this.syncStreamClient) { - this.dbAdapter!.close(); - await this.disconnect(); // Ask for a new DB worker port handler const lastClient = this.ports[this.ports.length - 1]; const workerPort = await lastClient.clientProvider.getDBWorkerPort(); + const remote = Comlink.wrap>(workerPort); + const db = await remote({ + dbFilename: this.syncParams!.dbName, + // TODO improve + flags: { + enableMultiTabs: true, + useWebWorker: true, + broadcastLogs: true, + disableSSRWarning: true, + ssrMode: false + } + }); const locked = new LockedAsyncDatabaseAdapter({ name: this.syncParams!.dbName, openConnection: async () => { - const remote = Comlink.wrap>(workerPort); return new WorkerWrappedAsyncDatabaseConnection({ - baseConnection: await remote({ - dbFilename: this.syncParams!.dbName, - // TODO improve - flags: { - enableMultiTabs: true, - useWebWorker: true, - broadcastLogs: true, - disableSSRWarning: true, - ssrMode: false - } - }), - identifier: this.syncParams!.dbName, - worker: workerPort + remote, + baseConnection: db, + identifier: this.syncParams!.dbName }); }, logger: this.logger diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 52c13421..d3991451 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1890,8 +1890,8 @@ importers: specifier: ^6.6.0 version: 6.8.0 comlink: - specifier: ^4.4.1 - version: 4.4.1 + specifier: ^4.4.2 + version: 4.4.2 commander: specifier: ^12.1.0 version: 12.1.0 @@ -9233,8 +9233,8 @@ packages: resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} engines: {node: '>= 0.8'} - comlink@4.4.1: - resolution: {integrity: sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==} + comlink@4.4.2: + resolution: {integrity: sha512-OxGdvBmJuNKSCMO4NTl1L47VRp6xn2wG4F/2hYzB6tiCb709otOxtEYCSvK80PtjODfXXZu8ds+Nw5kVCjqd2g==} comma-separated-tokens@2.0.3: resolution: {integrity: sha512-Fu4hJdvzeylCfQPp9SGWidpzrMs7tTrlu6Vb8XGaRGck8QSNZJJp538Wrb60Lax4fPwR64ViY468OIUTbRlGZg==} @@ -31674,7 +31674,7 @@ snapshots: dependencies: delayed-stream: 1.0.0 - comlink@4.4.1: {} + comlink@4.4.2: {} comma-separated-tokens@2.0.3: {}