Skip to content

Commit

Permalink
wip: split worker db interfaces and instantiation
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Nov 28, 2024
1 parent e937099 commit e06f3a9
Show file tree
Hide file tree
Showing 23 changed files with 854 additions and 643 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AppSchema } from '@/library/powersync/AppSchema';
import { SupabaseConnector } from '@/library/powersync/SupabaseConnector';
import { CircularProgress } from '@mui/material';
import { PowerSyncContext } from '@powersync/react';
import { PowerSyncDatabase } from '@powersync/web';
import { PowerSyncDatabase, WASQLiteOpenFactory, WASQLiteVFS } from '@powersync/web';
import Logger from 'js-logger';
import React, { Suspense } from 'react';
import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext';
Expand All @@ -13,18 +13,13 @@ export const useSupabase = () => React.useContext(SupabaseContext);

export const db = new PowerSyncDatabase({
schema: AppSchema,
database: {
dbFilename: 's.sqlite'
}
// database: new WASQLiteOpenFactory({
// dbFilename: 'examplse.db',
// vfs: WASQLiteVFS.OPFSCoopSyncVFS,
// // Can't use a shared worker for OPFS
// flags: { enableMultiTabs: false }
// }),
// flags: {
// enableMultiTabs: false
// database: {
// dbFilename: 's.sqlite'
// }
database: new WASQLiteOpenFactory({
dbFilename: 'examplse.db',
vfs: WASQLiteVFS.OPFSCoopSyncVFS
})
});

export const SystemProvider = ({ children }: { children: React.ReactNode }) => {
Expand Down
8 changes: 6 additions & 2 deletions packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@ import {
StreamingSyncImplementation
} from '@powersync/common';
import { Mutex } from 'async-mutex';
import { getNavigatorLocks } from '../shared/navigator';
import { WASQLiteOpenFactory } from './adapters/wa-sqlite/WASQLiteOpenFactory';
import {
DEFAULT_WEB_SQL_FLAGS,
ResolvedWebSQLOpenOptions,
resolveWebSQLFlags,
WebSQLFlags
} from './adapters/web-sql-flags';
import { WorkerDBAdapter } from './adapters/WorkerDBAdapter';
import { SharedWebStreamingSyncImplementation } from './sync/SharedWebStreamingSyncImplementation';
import { SSRStreamingSyncImplementation } from './sync/SSRWebStreamingSyncImplementation';
import { WebRemote } from './sync/WebRemote';
import {
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
} from './sync/WebStreamingSyncImplementation';
import { getNavigatorLocks } from '../shared/navigator';

export interface WebPowerSyncFlags extends WebSQLFlags {
/**
Expand Down Expand Up @@ -191,7 +192,10 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
const logger = this.options.logger;
logger ? logger.warn(warning) : console.warn(warning);
}
return new SharedWebStreamingSyncImplementation(syncOptions);
return new SharedWebStreamingSyncImplementation({
...syncOptions,
workerDatabase: this.database as WorkerDBAdapter // This should always be the case
});
default:
return new WebStreamingSyncImplementation(syncOptions);
}
Expand Down
3 changes: 3 additions & 0 deletions packages/web/src/db/adapters/AbstractWebSQLOpenFactory.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { DBAdapter, SQLOpenFactory } from '@powersync/common';
import Logger, { ILogger } from 'js-logger';
import { SSRDBAdapter } from './SSRDBAdapter';
import { ResolvedWebSQLFlags, WebSQLOpenFactoryOptions, isServerSide, resolveWebSQLFlags } from './web-sql-flags';

export abstract class AbstractWebSQLOpenFactory implements SQLOpenFactory {
protected resolvedFlags: ResolvedWebSQLFlags;
protected logger: ILogger;

constructor(protected options: WebSQLOpenFactoryOptions) {
this.resolvedFlags = resolveWebSQLFlags(options.flags);
this.logger = options.logger ?? Logger.get(`AbstractWebSQLOpenFactory - ${this.options.dbFilename}`);
}

/**
Expand Down
8 changes: 6 additions & 2 deletions packages/web/src/db/adapters/AsyncDatabaseConnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BatchedUpdateNotification, QueryResult } from '@powersync/common';
import { BatchedUpdateNotification, QueryResult, SQLOpenOptions } from '@powersync/common';

/**
* Proxied query result does not contain a function for accessing row values
Expand All @@ -21,5 +21,9 @@ export interface AsyncDatabaseConnection {
close(): Promise<void>;
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult>;
registerOnTableChange(callback: OnTableChangeCallback): () => void;
registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void>;
}

export type OpenAsyncDatabaseConnection<Options extends SQLOpenOptions = SQLOpenOptions> = (
options: Options
) => AsyncDatabaseConnection;
269 changes: 269 additions & 0 deletions packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import {
BaseObserver,
DBAdapter,
DBAdapterListener,
DBGetUtils,
DBLockOptions,
LockContext,
QueryResult,
Transaction
} from '@powersync/common';
import Logger, { ILogger } from 'js-logger';
import { getNavigatorLocks } from '../..//shared/navigator';
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';

/**
* @internal
*/
export interface LockedAsyncDatabaseAdapterOptions {
name: string;
openConnection: () => Promise<AsyncDatabaseConnection>;
debugMode?: boolean;
logger?: ILogger;
}

type LockedAsyncDatabaseAdapterListener = DBAdapterListener & {
initialized?: () => void;
};

/**
* @internal
* Wraps a {@link AsyncDatabaseConnection} and provides exclusive locking functions in
* order to implement {@link DBAdapter}.
*/
export class LockedAsyncDatabaseAdapter extends BaseObserver<LockedAsyncDatabaseAdapterListener> implements DBAdapter {
private logger: ILogger;
private dbGetHelpers: DBGetUtils | null;
private debugMode: boolean;
private _dbIdentifier: string;
private _isInitialized = false;
private _db: AsyncDatabaseConnection | null = null;
private _disposeTableChangeListener: (() => void) | null = null;

constructor(protected options: LockedAsyncDatabaseAdapterOptions) {
super();
this._dbIdentifier = options.name;
this.logger = options.logger ?? Logger.get(`LockedAsyncDatabaseAdapter - ${this._dbIdentifier}`);
// Set the name if provided. We can query for the name if not available yet
this.debugMode = options.debugMode ?? false;
if (this.debugMode) {
const originalExecute = this._execute.bind(this);
this._execute = async (sql, bindings) => {
const start = performance.now();
try {
const r = await originalExecute(sql, bindings);
performance.measure(`[SQL] ${sql}`, { start });
return r;
} catch (e: any) {
performance.measure(`[SQL] [ERROR: ${e.message}] ${sql}`, { start });
throw e;
}
};
}

this.dbGetHelpers = this.generateDBHelpers({
execute: (query, params) => this.acquireLock(() => this._execute(query, params))
});
}

protected get baseDB() {
if (!this._db) {
throw new Error(`Initialization has not completed yet. Cannot access base db`);
}
return this._db;
}

get name() {
return this._dbIdentifier;
}

async init() {
this._db = await this.options.openConnection();
await this._db.init();
this._disposeTableChangeListener = await this._db.registerOnTableChange((event) => {
this.iterateListeners((cb) => cb.tablesUpdated?.(event));
});
this._isInitialized = true;
this.iterateListeners((cb) => cb.initialized?.());
}

protected async waitForInitialized() {
if (this._isInitialized) {
return;
}
return new Promise<void>((resolve) => {
const l = this.registerListener({
initialized: () => {
resolve();
l();
}
});
});
}

/**
* This is currently a no-op on web
*/
async refreshSchema(): Promise<void> {}

async execute(query: string, params?: any[] | undefined): Promise<QueryResult> {
return this.writeLock((ctx) => ctx.execute(query, params));
}

async executeBatch(query: string, params?: any[][]): Promise<QueryResult> {
return this.writeLock((ctx) => this._executeBatch(query, params));
}

/**
* Attempts to close the connection.
* Shared workers might not actually close the connection if other
* tabs are still using it.
*/
close() {
this._disposeTableChangeListener?.();
this.baseDB?.close?.();
}

async getAll<T>(sql: string, parameters?: any[] | undefined): Promise<T[]> {
await this.waitForInitialized();
return this.dbGetHelpers!.getAll(sql, parameters);
}

async getOptional<T>(sql: string, parameters?: any[] | undefined): Promise<T | null> {
await this.waitForInitialized();
return this.dbGetHelpers!.getOptional(sql, parameters);
}

async get<T>(sql: string, parameters?: any[] | undefined): Promise<T> {
await this.waitForInitialized();
return this.dbGetHelpers!.get(sql, parameters);
}

async readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {
await this.waitForInitialized();
return this.acquireLock(async () => fn(this.generateDBHelpers({ execute: this._execute })));
}

async writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {
await this.waitForInitialized();
return this.acquireLock(async () => fn(this.generateDBHelpers({ execute: this._execute })));
}

protected acquireLock(callback: () => Promise<any>): Promise<any> {
return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, callback);
}

async readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {
return this.readLock(this.wrapTransaction(fn));
}

writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions | undefined): Promise<T> {
return this.writeLock(this.wrapTransaction(fn));
}

private generateDBHelpers<T extends { execute: (sql: string, params?: any[]) => Promise<QueryResult> }>(
tx: T
): T & DBGetUtils {
return {
...tx,
/**
* Execute a read-only query and return results
*/
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
const res = await tx.execute(sql, parameters);
return res.rows?._array ?? [];
},

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*/
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
const res = await tx.execute(sql, parameters);
return res.rows?.item(0) ?? null;
},

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*/
async get<T>(sql: string, parameters?: any[]): Promise<T> {
const res = await tx.execute(sql, parameters);
const first = res.rows?.item(0);
if (!first) {
throw new Error('Result set is empty');
}
return first;
}
};
}

/**
* Wraps a lock context into a transaction context
*/
private wrapTransaction<T>(cb: (tx: Transaction) => Promise<T>) {
return async (tx: LockContext): Promise<T> => {
await this._execute('BEGIN TRANSACTION');
let finalized = false;
const commit = async (): Promise<QueryResult> => {
if (finalized) {
return { rowsAffected: 0 };
}
finalized = true;
return this._execute('COMMIT');
};

const rollback = () => {
finalized = true;
return this._execute('ROLLBACK');
};

try {
const result = await cb({
...tx,
commit,
rollback
});

if (!finalized) {
await commit();
}
return result;
} catch (ex) {
this.logger.debug('Caught ex in transaction', ex);
try {
await rollback();
} catch (ex2) {
// In rare cases, a rollback may fail.
// Safe to ignore.
}
throw ex;
}
};
}

/**
* Wraps the worker execute function, awaiting for it to be available
*/
private _execute = async (sql: string, bindings?: any[]): Promise<QueryResult> => {
await this.waitForInitialized();
const result = await this.baseDB.execute(sql, bindings);
return {
...result,
rows: {
...result.rows,
item: (idx: number) => result.rows._array[idx]
}
};
};

/**
* Wraps the worker executeBatch function, awaiting for it to be available
*/
private _executeBatch = async (query: string, params?: any[]): Promise<QueryResult> => {
await this.waitForInitialized();
const result = await this.baseDB.executeBatch(query, params);
return {
...result,
rows: undefined
};
};
}
21 changes: 21 additions & 0 deletions packages/web/src/db/adapters/ProxiedAsyncDatabaseConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as Comlink from 'comlink';
import { AsyncDatabaseConnection, OnTableChangeCallback } from './AsyncDatabaseConnection';

/**
* @internal
* Proxies an {@link AsyncDatabaseConnection} which allows for registering table change notification
* callbacks over a worker channel
*/
export function ProxiedAsyncDatabaseConnection(base: AsyncDatabaseConnection) {
return new Proxy(base, {
get(target, prop: keyof AsyncDatabaseConnection, receiver) {
const original = Reflect.get(target, prop, receiver);
if (typeof original === 'function' && prop === 'registerOnTableChange') {
return function (callback: OnTableChangeCallback) {
return base.registerOnTableChange(Comlink.proxy(callback));
};
}
return original;
}
});
}
Loading

0 comments on commit e06f3a9

Please sign in to comment.