Skip to content

Commit

Permalink
imrove worker connections
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Dec 3, 2024
1 parent 1f87766 commit 2b3b44d
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const db = new PowerSyncDatabase({
// dbFilename: 's.sqlite'
// }
database: new WASQLiteOpenFactory({
dbFilename: 'examplsw1se11.db',
dbFilename: 'examplsw1se112.db',
// vfs: WASQLiteVFS.OPFSCoopSyncVFS
vfs: WASQLiteVFS.OPFSCoopSyncVFS //Out of memory errors on iOS Safari
})
Expand Down
4 changes: 2 additions & 2 deletions packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
resolveWebSQLFlags,
WebSQLFlags
} from './adapters/web-sql-flags';
import { WorkerDBAdapter } from './adapters/WorkerDBAdapter';
import { WebDBAdapter } from './adapters/WebDBAdapter';
import { SharedWebStreamingSyncImplementation } from './sync/SharedWebStreamingSyncImplementation';
import { SSRStreamingSyncImplementation } from './sync/SSRWebStreamingSyncImplementation';
import { WebRemote } from './sync/WebRemote';
Expand Down Expand Up @@ -194,7 +194,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}
return new SharedWebStreamingSyncImplementation({
...syncOptions,
workerDatabase: this.database as WorkerDBAdapter // This should always be the case
db: this.database as WebDBAdapter // This should always be the case
});
default:
return new WebStreamingSyncImplementation(syncOptions);
Expand Down
17 changes: 13 additions & 4 deletions packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
import Logger, { ILogger } from 'js-logger';
import { getNavigatorLocks } from '../..//shared/navigator';
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter';
import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection';

/**
* @internal
Expand All @@ -31,12 +33,14 @@ export type LockedAsyncDatabaseAdapterListener = DBAdapterListener & {
* Wraps a {@link AsyncDatabaseConnection} and provides exclusive locking functions in
* order to implement {@link DBAdapter}.
*/
export class LockedAsyncDatabaseAdapter extends BaseObserver<LockedAsyncDatabaseAdapterListener> implements DBAdapter {
export class LockedAsyncDatabaseAdapter
extends BaseObserver<LockedAsyncDatabaseAdapterListener>
implements WebDBAdapter
{
private logger: ILogger;
private dbGetHelpers: DBGetUtils | null;
private debugMode: boolean;
private _dbIdentifier: string;
private _isInitialized = false;
protected initPromise: Promise<void>;
private _db: AsyncDatabaseConnection | null = null;
protected _disposeTableChangeListener: (() => void) | null = null;
Expand Down Expand Up @@ -79,6 +83,13 @@ export class LockedAsyncDatabaseAdapter extends BaseObserver<LockedAsyncDatabase
return this._dbIdentifier;
}

async shareConnection(): Promise<SharedConnectionWorker> {
if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) {
throw new Error(`Only worker connections can be shared`);
}
return this._db.shareConnection();
}

/**
* Registers a table change notification callback with the base database.
* This can be extended by custom implementations in order to handle proxy events.
Expand All @@ -100,8 +111,6 @@ export class LockedAsyncDatabaseAdapter extends BaseObserver<LockedAsyncDatabase
this._db = await this.options.openConnection();
await this._db.init();
await this.registerOnChangeListener(this._db);

this._isInitialized = true;
this.iterateListeners((cb) => cb.initialized?.());
}

Expand Down
13 changes: 13 additions & 0 deletions packages/web/src/db/adapters/WebDBAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { DBAdapter } from '@powersync/common';

export type SharedConnectionWorker = {
identifier: string;
port: MessagePort;
};

export interface WebDBAdapter extends DBAdapter {
/**
* Get a MessagePort which can be used to share the internals of this connection.
*/
shareConnection(): Promise<SharedConnectionWorker>;
}
8 changes: 0 additions & 8 deletions packages/web/src/db/adapters/WorkerDBAdapter.ts

This file was deleted.

47 changes: 0 additions & 47 deletions packages/web/src/db/adapters/WorkerLockedAsyncDatabaseAdapter.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import * as Comlink from 'comlink';
import { AsyncDatabaseConnection, OnTableChangeCallback, ProxiedQueryResult } from './AsyncDatabaseConnection';

export type SharedConnectionWorker = {
identifier: string;
port: MessagePort;
};

export type WrappedWorkerConnectionOptions = {
baseConnection: AsyncDatabaseConnection;
identifier: string;
worker: Worker | MessagePort;
};

/**
* Wraps a provided instance of {@link AsyncDatabaseConnection}, providing necessary proxy
* functions for worker listeners.
*/
export class WorkerWrappedAsyncDatabaseConnection implements AsyncDatabaseConnection {
constructor(protected options: WrappedWorkerConnectionOptions) {}

protected get baseConnection() {
return this.options.baseConnection;
}

init(): Promise<void> {
return this.baseConnection.init();
}

/**
* Get a MessagePort which can be used to share the internals of this connection.
*/
async shareConnection(): Promise<SharedConnectionWorker> {
const { identifier, worker } = this.options;
if (worker instanceof Worker) {
// We can't transfer a Worker instance, need a MessagePort
// Comlink provides a nice utility for exposing a MessagePort
// from a Worker
const temp = Comlink.wrap(worker);
const newPort = await temp[Comlink.createEndpoint]();
return { port: newPort, identifier };
}

return {
identifier: identifier,
port: worker
};
}

/**
* Registers a table change notification callback with the base database.
* This can be extended by custom implementations in order to handle proxy events.
*/
async registerOnTableChange(callback: OnTableChangeCallback) {
return this.baseConnection.registerOnTableChange(Comlink.proxy(callback));
}

close(): Promise<void> {
return this.baseConnection.close();
}

execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.baseConnection.execute(sql, params);
}

executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
return this.baseConnection.executeBatch(sql, params);
}
}
123 changes: 30 additions & 93 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { type PowerSyncOpenFactoryOptions } from '@powersync/common';
import * as Comlink from 'comlink';
import { OpenAsyncDatabaseConnection } from '../AsyncDatabaseConnection';
import { LockedAsyncDatabaseAdapter } from '../LockedAsyncDatabaseAdapter';
import { ResolvedWebSQLOpenOptions, WebSQLFlags } from '../web-sql-flags';
import { WASQLiteVFS } from './WASQLiteConnection';
import { WASQLiteOpenFactory } from './WASQLiteOpenFactory';

/**
* These flags are the same as {@link WebSQLFlags}.
Expand All @@ -25,96 +29,29 @@ export interface WASQLiteDBAdapterOptions extends Omit<PowerSyncOpenFactoryOptio
/**
* Adapter for WA-SQLite SQLite connections.
*/
// FIXME
// export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implements DBAdapter {
// private initialized: Promise<void>;
// private logger: ILogger;
// private dbGetHelpers: DBGetUtils | null;
// private methods: DBFunctionsInterface | null;
// private debugMode: boolean;

// constructor(protected options: WASQLiteDBAdapterOptions) {
// super();
// this.logger = Logger.get('WASQLite');
// this.dbGetHelpers = null;
// this.methods = null;
// this.debugMode = options.debugMode ?? false;
// if (this.debugMode) {
// const originalExecute = this._execute.bind(this);
// this._execute = async (sql, bindings) => {
// const start = performance.now();
// try {
// const r = await originalExecute(sql, bindings);
// performance.measure(`[SQL] ${sql}`, { start });
// return r;
// } catch (e: any) {
// performance.measure(`[SQL] [ERROR: ${e.message}] ${sql}`, { start });
// throw e;
// }
// };
// }
// this.initialized = this.init();
// this.dbGetHelpers = this.generateDBHelpers({
// execute: (query, params) => this.acquireLock(() => this._execute(query, params))
// });
// }

// get name() {
// return this.options.dbFilename;
// }

// protected get flags(): Required<WASQLiteFlags> {
// return resolveWebSQLFlags(this.options.flags ?? {});
// }

// getWorker() {}

// protected async init() {
// const { enableMultiTabs, useWebWorker } = this.flags;
// if (!enableMultiTabs) {
// this.logger.warn('Multiple tabs are not enabled in this browser');
// }

// if (useWebWorker) {
// const optionsDbWorker = this.options.worker;

// const dbOpener = this.options.workerPort
// ? Comlink.wrap<OpenDB>(this.options.workerPort)
// : typeof optionsDbWorker === 'function'
// ? Comlink.wrap<OpenDB>(
// resolveWorkerDatabasePortFactory(() =>
// optionsDbWorker({
// ...this.options,
// flags: this.flags
// })
// )
// )
// : getWorkerDatabaseOpener(this.options.dbFilename, enableMultiTabs, optionsDbWorker);

// this.methods = await dbOpener({
// dbFileName: this.options.dbFilename,
// vfs: this.options.vfs
// });
// this.methods.registerOnTableChange(
// Comlink.proxy((event) => {
// this.iterateListeners((cb) => cb.tablesUpdated?.(event));
// })
// );

// return;
// }

// // Not using a worker
// const connection = new WASqliteConnection({
// dbFileName: this.options.dbFilename
// });
// await connection.init();

// this.methods = connection;
// this.methods.registerOnTableChange((event) => {
// this.iterateListeners((cb) => cb.tablesUpdated?.(event));
// });
// }

// async refreshSchema(): Promise<void> {}
// }
export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter {
constructor(options: WASQLiteDBAdapterOptions) {
super({
name: options.dbFilename,
openConnection: async () => {
const { workerPort } = options;
if (workerPort) {
const wrapped = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
return wrapped(options);
}
const openFactory = new WASQLiteOpenFactory({
dbFilename: options.dbFilename,
dbLocation: options.dbLocation,
debugMode: options.debugMode,
flags: options.flags,
logger: options.logger,
vfs: options.vfs,
worker: options.worker
});
return openFactory.openConnection();
},
debugMode: options.debugMode,
logger: options.logger
});
}
}
Loading

0 comments on commit 2b3b44d

Please sign in to comment.