Skip to content

Commit

Permalink
fix worker proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Dec 3, 2024
1 parent 2b3b44d commit 446a93c
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AppSchema } from '@/library/powersync/AppSchema';
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
import { CircularProgress } from '@mui/material';
import { PowerSyncContext } from '@powersync/react';
import { PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web';
import { PowerSyncDatabase } from '@powersync/web';
import Logger from 'js-logger';
import React, { Suspense } from 'react';
import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext';
Expand All @@ -13,14 +13,17 @@ export const useSupabase = () => React.useContext(SupabaseContext);

export const db = new PowerSyncDatabase({
schema: AppSchema,
// database: {
// dbFilename: 's.sqlite'
// }
database: new WASQLiteOpenFactory({
dbFilename: 'examplsw1se112.db',
// vfs: WASQLiteVFS.OPFSCoopSyncVFS
vfs: WASQLiteVFS.OPFSCoopSyncVFS //Out of memory errors on iOS Safari
})
database: {
dbFilename: 's.sqlite'
},
flags: {
enableMultiTabs: true
}
// database: new WASQLiteOpenFactory({
// dbFilename: 'examplsw1se112.db'
// // vfs: WASQLiteVFS.OPFSCoopSyncVFS
// // vfs: WASQLiteVFS.OPFSCoopSyncVFS //Out of memory errors on iOS Safari
// })
});

export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"@powersync/common": "workspace:*",
"async-mutex": "^0.4.0",
"bson": "^6.6.0",
"comlink": "^4.4.1",
"comlink": "^4.4.2",
"commander": "^12.1.0",
"js-logger": "^1.6.1"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export const DEFAULT_POWERSYNC_FLAGS: Required<WebPowerSyncFlags> = {
externallyUnload: false
};

export const resolveWebPowerSyncFlags = (flags?: WebPowerSyncFlags): WebPowerSyncFlags => {
export const resolveWebPowerSyncFlags = (flags?: WebPowerSyncFlags): Required<WebPowerSyncFlags> => {
return {
...DEFAULT_POWERSYNC_FLAGS,
...flags,
Expand Down
5 changes: 3 additions & 2 deletions packages/web/src/db/adapters/AsyncDatabaseConnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { BatchedUpdateNotification, QueryResult, SQLOpenOptions } from '@powersync/common';
import { BatchedUpdateNotification, QueryResult } from '@powersync/common';
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';

/**
* Proxied query result does not contain a function for accessing row values
Expand All @@ -24,6 +25,6 @@ export interface AsyncDatabaseConnection {
registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void>;
}

export type OpenAsyncDatabaseConnection<Options extends SQLOpenOptions = SQLOpenOptions> = (
export type OpenAsyncDatabaseConnection<Options extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = (
options: Options
) => AsyncDatabaseConnection;
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
import * as Comlink from 'comlink';
import { AsyncDatabaseConnection, OnTableChangeCallback, ProxiedQueryResult } from './AsyncDatabaseConnection';
import {
AsyncDatabaseConnection,
OnTableChangeCallback,
OpenAsyncDatabaseConnection,
ProxiedQueryResult
} from './AsyncDatabaseConnection';
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';

export type SharedConnectionWorker = {
identifier: string;
port: MessagePort;
};

export type WrappedWorkerConnectionOptions = {
export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
baseConnection: AsyncDatabaseConnection;
identifier: string;
worker: Worker | MessagePort;
/**
* Need a remote in order to keep a reference to the Proxied worker
*/
remote: Comlink.Remote<OpenAsyncDatabaseConnection<Config>>;
};

/**
* Wraps a provided instance of {@link AsyncDatabaseConnection}, providing necessary proxy
* functions for worker listeners.
*/
export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnection {
constructor(protected options: WrappedWorkerConnectionOptions) {}
export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions>
implements AsyncDatabaseConnection
{
constructor(protected options: WrappedWorkerConnectionOptions<Config>) {}

protected get baseConnection() {
return this.options.baseConnection;
Expand All @@ -31,20 +42,10 @@ export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnec
* Get a MessagePort which can be used to share the internals of this connection.
*/
async shareConnection(): Promise<SharedConnectionWorker> {
const { identifier, worker } = this.options;
if (worker instanceof Worker) {
// We can't transfer a Worker instance, need a MessagePort
// Comlink provides a nice utility for exposing a MessagePort
// from a Worker
const temp = Comlink.wrap(worker);
const newPort = await temp[Comlink.createEndpoint]();
return { port: newPort, identifier };
}
const { identifier, remote } = this.options;

return {
identifier: identifier,
port: worker
};
const newPort = await remote[Comlink.createEndpoint]();
return { port: newPort, identifier };
}

/**
Expand All @@ -55,8 +56,9 @@ export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnec
return this.baseConnection.registerOnTableChange(Comlink.proxy(callback));
}

close(): Promise<void> {
return this.baseConnection.close();
async close(): Promise<void> {
await this.baseConnection.close();
this.options.remote[Comlink.releaseProxy]();
}

execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
Expand Down
10 changes: 8 additions & 2 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { type PowerSyncOpenFactoryOptions } from '@powersync/common';
import * as Comlink from 'comlink';
import { resolveWebPowerSyncFlags } from '../../PowerSyncDatabase';
import { OpenAsyncDatabaseConnection } from '../AsyncDatabaseConnection';
import { LockedAsyncDatabaseAdapter } from '../LockedAsyncDatabaseAdapter';
import { ResolvedWebSQLOpenOptions, WebSQLFlags } from '../web-sql-flags';
import { WorkerWrappedAsyncDatabaseConnection } from '../WorkerWrappedAsyncDatabaseConnection';
import { WASQLiteVFS } from './WASQLiteConnection';
import { WASQLiteOpenFactory } from './WASQLiteOpenFactory';

Expand Down Expand Up @@ -36,8 +38,12 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter {
openConnection: async () => {
const { workerPort } = options;
if (workerPort) {
const wrapped = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
return wrapped(options);
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
return new WorkerWrappedAsyncDatabaseConnection({
remote,
identifier: options.dbFilename,
baseConnection: await remote({ ...options, flags: resolveWebPowerSyncFlags(options.flags) })
});
}
const openFactory = new WASQLiteOpenFactory({
dbFilename: options.dbFilename,
Expand Down
19 changes: 12 additions & 7 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import { openWorkerDatabasePort, resolveWorkerDatabasePortFactory } from '../../
import { AbstractWebSQLOpenFactory } from '../AbstractWebSQLOpenFactory';
import { AsyncDatabaseConnection, OpenAsyncDatabaseConnection } from '../AsyncDatabaseConnection';
import { LockedAsyncDatabaseAdapter } from '../LockedAsyncDatabaseAdapter';
import { WebSQLOpenFactoryOptions } from '../web-sql-flags';
import { ResolvedWebSQLOpenOptions, WebSQLOpenFactoryOptions } from '../web-sql-flags';
import { WorkerWrappedAsyncDatabaseConnection } from '../WorkerWrappedAsyncDatabaseConnection';
import { WASqliteConnection, WASQLiteOpenOptions, WASQLiteVFS } from './WASQLiteConnection';
import { WASqliteConnection, WASQLiteVFS } from './WASQLiteConnection';

export interface WASQLiteOpenFactoryOptions extends WebSQLOpenFactoryOptions {
vfs?: WASQLiteVFS;
}

export interface ResolvedWASQLiteOpenFactoryOptions extends ResolvedWebSQLOpenOptions {
vfs: WASQLiteVFS;
}
/**
* Opens a SQLite connection using WA-SQLite.
*/
Expand All @@ -36,6 +39,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory {

async openConnection(): Promise<AsyncDatabaseConnection> {
const { enableMultiTabs, useWebWorker } = this.resolvedFlags;
const { vfs = WASQLiteVFS.IDBBatchAtomicVFS } = this.waOptions;

if (!enableMultiTabs) {
this.logger.warn('Multiple tabs are not enabled in this browser');
}
Expand All @@ -53,24 +58,24 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory {
)
: openWorkerDatabasePort(this.options.dbFilename, enableMultiTabs, optionsDbWorker, this.waOptions.vfs);

const workerDBOpener = Comlink.wrap<OpenAsyncDatabaseConnection<WASQLiteOpenOptions>>(workerPort);
const workerDBOpener = Comlink.wrap<OpenAsyncDatabaseConnection<ResolvedWASQLiteOpenFactoryOptions>>(workerPort);

return new WorkerWrappedAsyncDatabaseConnection({
remote: workerDBOpener,
baseConnection: await workerDBOpener({
dbFilename: this.options.dbFilename,
vfs: this.waOptions.vfs,
vfs,
flags: this.resolvedFlags
}),
identifier: this.options.dbFilename,
worker: workerPort
identifier: this.options.dbFilename
});
} else {
// Don't use a web worker
return new WASqliteConnection({
dbFilename: this.options.dbFilename,
dbLocation: this.options.dbLocation,
debugMode: this.options.debugMode,
vfs: this.waOptions.vfs,
vfs,
flags: this.resolvedFlags
});
}
Expand Down
30 changes: 17 additions & 13 deletions packages/web/src/worker/db/WASQLiteDB.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ let nextClientId = 1;
const openWorkerConnection = async (options: WASQLiteOpenOptions): Promise<AsyncDatabaseConnection> => {
const connection = new WASqliteConnection(options);
return {
init: () => connection.init(),
close: () => connection.close(),
execute: async (sql: string, params?: any[]) => connection.execute(sql, params),
executeBatch: async (sql: string, params?: any[]) => connection.executeBatch(sql, params),
registerOnTableChange: async (callback) => {
init: Comlink.proxy(() => connection.init()),
close: Comlink.proxy(() => connection.close()),
execute: Comlink.proxy(async (sql: string, params?: any[]) => connection.execute(sql, params)),
executeBatch: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeBatch(sql, params)),
registerOnTableChange: Comlink.proxy(async (callback) => {
// Proxy the callback remove function
return Comlink.proxy(await connection.registerOnTableChange(callback));
}
})
};
};

Expand All @@ -57,8 +57,12 @@ const openDBShared = async (options: WASQLiteOpenOptions): Promise<AsyncDatabase

const wrappedConnection = {
...db,
init: Comlink.proxy(() => {
// the init has been done automatically
}),
close: Comlink.proxy(() => {
const { clientIds } = dbEntry;
console.debug(`Close requested from client ${clientId} of ${[...clientIds]}`);
clientIds.delete(clientId);
if (clientIds.size == 0) {
console.debug(`Closing connection to ${dbFilename}.`);
Expand All @@ -82,14 +86,14 @@ if (typeof SharedWorkerGlobalScope !== 'undefined') {
console.debug('Exposing shared db on port', port);
Comlink.expose(openDBShared, port);
};

addEventListener('unload', () => {
Array.from(DBMap.values()).forEach(async (dbConnection) => {
const db = await dbConnection.db;
db.close?.();
});
});
} else {
// A dedicated worker can be shared externally
Comlink.expose(openDBShared);
}

addEventListener('unload', () => {
Array.from(DBMap.values()).forEach(async (dbConnection) => {
const { db } = dbConnection;
db.close?.();
});
});
37 changes: 18 additions & 19 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ export class SharedSyncImplementation
// TODO share logic here
const lastClient = this.ports[this.ports.length - 1];
const workerPort = await lastClient.clientProvider.getDBWorkerPort();
const remote = Comlink.wrap<OpenAsyncDatabaseConnection<WASQLiteOpenOptions>>(workerPort);
const locked = new LockedAsyncDatabaseAdapter({
name: this.syncParams!.dbName,
openConnection: async () => {
const remote = Comlink.wrap<OpenAsyncDatabaseConnection<WASQLiteOpenOptions>>(workerPort);
return new WorkerWrappedAsyncDatabaseConnection({
remote,
baseConnection: await remote({
dbFilename: this.syncParams!.dbName,
// TODO improve
Expand All @@ -175,8 +176,7 @@ export class SharedSyncImplementation
ssrMode: false
}
}),
identifier: this.syncParams!.dbName,
worker: workerPort
identifier: this.syncParams!.dbName
});
},
logger: this.logger
Expand Down Expand Up @@ -273,30 +273,29 @@ export class SharedSyncImplementation
});

if (this.dbAdapter == trackedPort.db && this.syncStreamClient) {
this.dbAdapter!.close();

await this.disconnect();
// Ask for a new DB worker port handler
const lastClient = this.ports[this.ports.length - 1];
const workerPort = await lastClient.clientProvider.getDBWorkerPort();
const remote = Comlink.wrap<OpenAsyncDatabaseConnection<WASQLiteOpenOptions>>(workerPort);
const db = await remote({
dbFilename: this.syncParams!.dbName,
// TODO improve
flags: {
enableMultiTabs: true,
useWebWorker: true,
broadcastLogs: true,
disableSSRWarning: true,
ssrMode: false
}
});
const locked = new LockedAsyncDatabaseAdapter({
name: this.syncParams!.dbName,
openConnection: async () => {
const remote = Comlink.wrap<OpenAsyncDatabaseConnection<WASQLiteOpenOptions>>(workerPort);
return new WorkerWrappedAsyncDatabaseConnection({
baseConnection: await remote({
dbFilename: this.syncParams!.dbName,
// TODO improve
flags: {
enableMultiTabs: true,
useWebWorker: true,
broadcastLogs: true,
disableSSRWarning: true,
ssrMode: false
}
}),
identifier: this.syncParams!.dbName,
worker: workerPort
remote,
baseConnection: db,
identifier: this.syncParams!.dbName
});
},
logger: this.logger
Expand Down
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 446a93c

Please sign in to comment.