Skip to content

Commit

Permalink
wip: share db worker ports
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Nov 27, 2024
1 parent a265ca5 commit 18dbd7f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AppSchema } from '@/library/powersync/AppSchema';
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
import { CircularProgress } from '@mui/material';
import { PowerSyncContext } from '@powersync/react';
import { PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web';
import { PowerSyncDatabase } from '@powersync/web';
import Logger from 'js-logger';
import React, { Suspense } from 'react';
import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext';
Expand All @@ -13,15 +13,18 @@ export const useSupabase = () => React.useContext(SupabaseContext);

export const db = new PowerSyncDatabase({
schema: AppSchema,
database: new WASQLiteOpenFactory({
dbFilename: 'examplse.db',
vfs: WASQLiteVFS.OPFSCoopSyncVFS,
// Can't use a shared worker for OPFS
flags: { enableMultiTabs: false }
}),
flags: {
enableMultiTabs: false
database: {
dbFilename: 's.sqlite'
}
// database: new WASQLiteOpenFactory({
// dbFilename: 'examplse.db',
// vfs: WASQLiteVFS.OPFSCoopSyncVFS,
// // Can't use a shared worker for OPFS
// flags: { enableMultiTabs: false }
// }),
// flags: {
// enableMultiTabs: false
// }
});

export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
Expand Down
29 changes: 28 additions & 1 deletion packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,57 @@ export enum WASQLiteVFS {
AccessHandlePoolVFS = 'AccessHandlePoolVFS'
}

/**
* @internal
*/
export type WASQLiteConnectionListener = {
tablesUpdated: (event: BatchedUpdateNotification) => void;
};

// FIXME there are no types for Module
/**
* @internal
*/
export type SQLiteModule = Parameters<typeof SQLite.Factory>[0];

/**
* @internal
*/
export type WASQLiteModuleFactoryOptions = { dbFileName: string };

/**
* @internal
*/
export type WASQLiteModuleFactory = (
options: WASQLiteModuleFactoryOptions
) => Promise<{ module: SQLiteModule; vfs: SQLiteVFS }>;

/**
* @internal
*/
export type WASQLiteOpenOptions = {
dbFileName: string;
vfs?: WASQLiteVFS;
};

/**
* @internal
*/
export const AsyncWASQLiteModuleFactory = async () => {
const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs');
return factory();
};

/**
* @internal
*/
export const SyncWASQLiteModuleFactory = async () => {
const { default: factory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite.mjs');
return factory();
};

/**
* @internal
*/
export const DEFAULT_MODULE_FACTORIES = {
[WASQLiteVFS.IDBBatchAtomicVFS]: async (options: WASQLiteModuleFactoryOptions) => {
const module = await AsyncWASQLiteModuleFactory();
Expand Down Expand Up @@ -69,6 +93,9 @@ export const DEFAULT_MODULE_FACTORIES = {
}
};

/**
* @internal
*/
export class WASqliteConnection extends BaseObserver<WASQLiteConnectionListener> {
private _sqliteAPI: SQLiteAPI | null = null;
private _dbP: number | null = null;
Expand Down
21 changes: 15 additions & 6 deletions packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import {
SharedSyncClientEvent,
SharedSyncImplementation
} from '../../worker/sync/SharedSyncImplementation';
import { resolveWebSQLFlags } from '../adapters/web-sql-flags';
import {
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
} from './WebStreamingSyncImplementation';
import { resolveWebSQLFlags } from '../adapters/web-sql-flags';

/**
* The shared worker will trigger methods on this side of the message port
Expand All @@ -20,11 +20,16 @@ import { resolveWebSQLFlags } from '../adapters/web-sql-flags';
class SharedSyncClientProvider extends AbstractSharedSyncClientProvider {
constructor(
protected options: WebStreamingSyncImplementationOptions,
public statusChanged: (status: SyncStatusOptions) => void
public statusChanged: (status: SyncStatusOptions) => void,
protected dbWorkerPort: MessagePort
) {
super();
}

async getDBWorkerPort(): Promise<MessagePort> {
return Comlink.transfer(this.dbWorkerPort, [this.dbWorkerPort]);
}

async fetchCredentials(): Promise<PowerSyncCredentials | null> {
const credentials = await this.options.remote.getCredentials();
if (credentials == null) {
Expand Down Expand Up @@ -142,7 +147,7 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem

const flags = { ...this.webOptions.flags, workers: undefined };

this.isInitialized = this.syncManager.init(Comlink.transfer(dbOpenerPort, [dbOpenerPort]), {
this.isInitialized = this.syncManager.setParams({
dbName: this.options.identifier!,
streamOptions: {
crudUploadThrottleMs,
Expand All @@ -155,9 +160,13 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
/**
* Pass along any sync status updates to this listener
*/
this.clientProvider = new SharedSyncClientProvider(this.webOptions, (status) => {
this.iterateListeners((l) => this.updateSyncStatus(status));
});
this.clientProvider = new SharedSyncClientProvider(
this.webOptions,
(status) => {
this.iterateListeners((l) => this.updateSyncStatus(status));
},
dbOpenerPort
);

/**
* The sync worker will call this client provider when it needs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export abstract class AbstractSharedSyncClientProvider {
abstract fetchCredentials(): Promise<PowerSyncCredentials | null>;
abstract uploadCrud(): Promise<void>;
abstract statusChanged(status: SyncStatusOptions): void;
abstract getDBWorkerPort(): Promise<MessagePort>;

abstract trace(...x: any[]): void;
abstract debug(...x: any[]): void;
Expand Down
62 changes: 50 additions & 12 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import {
} from '../../db/sync/WebStreamingSyncImplementation';

import { WASQLiteDBAdapter } from '../../db/adapters/wa-sqlite/WASQLiteDBAdapter';
import { getNavigatorLocks } from '../../shared/navigator';
import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider';
import { BroadcastLogger } from './BroadcastLogger';
import { getNavigatorLocks } from '../../shared/navigator';

/**
* Manual message events for shared sync clients
Expand All @@ -46,13 +46,20 @@ export type SharedSyncInitOptions = {
streamOptions: Omit<WebStreamingSyncImplementationOptions, 'adapter' | 'uploadCrud' | 'remote'>;
};

type TrackedClientDB = {
client: AbstractSharedSyncClientProvider;
db: DBAdapter;
port: MessagePort;
};

export interface SharedSyncImplementationListener extends StreamingSyncImplementationListener {
initialized: () => void;
}

export type WrappedSyncPort = {
port: MessagePort;
clientProvider: Comlink.Remote<AbstractSharedSyncClientProvider>;
db?: DBAdapter;
};

export type RemoteOperationAbortController = {
Expand All @@ -79,6 +86,7 @@ export class SharedSyncImplementation
protected dbAdapter: DBAdapter | null;
protected syncParams: SharedSyncInitOptions | null;
protected logger: ILogger;
protected lastConnectOptions: PowerSyncConnectionOptions | undefined;

syncStatus: SyncStatus;
broadCastLogger: ILogger;
Expand All @@ -90,6 +98,7 @@ export class SharedSyncImplementation
this.syncParams = null;
this.syncStreamClient = null;
this.logger = Logger.get('shared-sync');
this.lastConnectOptions = undefined;

this.isInitialized = new Promise((resolve) => {
const callback = this.registerListener({
Expand Down Expand Up @@ -124,19 +133,12 @@ export class SharedSyncImplementation
/**
* Configures the DBAdapter connection and a streaming sync client.
*/
async init(dbWorkerPort: MessagePort, params: SharedSyncInitOptions) {
if (this.dbAdapter) {
async setParams(params: SharedSyncInitOptions) {
if (this.syncParams) {
// Cannot modify already existing sync implementation
return;
}

this.dbAdapter = new WASQLiteDBAdapter({
dbFilename: params.dbName,
workerPort: dbWorkerPort,
flags: { enableMultiTabs: true, useWebWorker: true },
logger: this.logger
});

this.syncParams = params;

if (params.streamOptions?.flags?.broadcastLogs) {
Expand All @@ -148,6 +150,19 @@ export class SharedSyncImplementation
this.logger.error('Uncaught exception in PowerSync shared sync worker', event);
};

// Ask for a new DB worker port handler
// We can only ask once per client provider since the port
// can only be transferred once
// TODO share logic here
const lastClient = this.ports[this.ports.length - 1];
const workerPort = await lastClient.clientProvider.getDBWorkerPort();
this.dbAdapter = lastClient.db = new WASQLiteDBAdapter({
dbFilename: this.syncParams?.dbName!,
workerPort,
flags: { enableMultiTabs: true, useWebWorker: true },
logger: this.logger
});

this.iterateListeners((l) => l.initialized?.());
}

Expand All @@ -168,7 +183,7 @@ export class SharedSyncImplementation
// This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized
return getNavigatorLocks().request('shared-sync-connect', async () => {
this.syncStreamClient = this.generateStreamingImplementation();

this.lastConnectOptions = options;
this.syncStreamClient.registerListener({
statusChanged: (status) => {
this.updateAllStatuses(status.toJSON());
Expand Down Expand Up @@ -210,14 +225,18 @@ export class SharedSyncImplementation
* Removes a message port client from this manager's managed
* clients.
*/
removePort(port: MessagePort) {
async removePort(port: MessagePort) {
const index = this.ports.findIndex((p) => p.port == port);
if (index < 0) {
console.warn(`Could not remove port ${port} since it is not present in active ports.`);
return;
}

const trackedPort = this.ports[index];
if (trackedPort.db) {
trackedPort.db.close();
}

// Release proxy
trackedPort.clientProvider[Comlink.releaseProxy]();
this.ports.splice(index, 1);
Expand All @@ -231,6 +250,25 @@ export class SharedSyncImplementation
abortController!.controller.abort(new AbortOperation('Closing pending requests after client port is removed'));
}
});

if (this.dbAdapter == trackedPort.db && this.syncStreamClient) {
// The db adapter belonged to a client which has closed. We need to reconnect
// FIXME better closing
// this.dbAdapter!.close();

await this.disconnect();
// Ask for a new DB worker port handler
const lastClient = this.ports[this.ports.length - 1];
const workerPort = await lastClient.clientProvider.getDBWorkerPort();

this.dbAdapter = lastClient.db = new WASQLiteDBAdapter({
dbFilename: this.syncParams?.dbName!,
workerPort,
flags: { enableMultiTabs: true, useWebWorker: true },
logger: this.logger
});
await this.connect(this.lastConnectOptions);
}
}

triggerCrudUpload() {
Expand Down

0 comments on commit 18dbd7f

Please sign in to comment.