Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Concurrent DB Connections and Transactions #7

Merged
merged 15 commits into from
Oct 30, 2023
11 changes: 11 additions & 0 deletions .changeset/chilled-walls-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@journeyapps/powersync-sdk-react-native': patch
'@journeyapps/powersync-sdk-common': patch
---

Updated logic to correspond with React Native Quick SQLite concurrent transactions. Added helper methods on transaction contexts.

API changes include:
- Removal of synchronous DB operations in transactions: `execute`, `commit`, `rollback` are now async functions. `executeAsync`, `commitAsync` and `rollbackAsync` have been removed.
- Transaction contexts now have `get`, `getAll` and `getOptional` helpers.
- Added a default lock timeout of 2 minutes to aide with potential recursive lock/transaction requests.
5 changes: 5 additions & 0 deletions .changeset/dry-pets-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-react-native': patch
---

Update README polyfill command.
5 changes: 5 additions & 0 deletions .changeset/friendly-shrimps-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Fix update trigger for local only watched tables.
132 changes: 55 additions & 77 deletions packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ export const DEFAULT_POWERSYNC_DB_OPTIONS = {
logger: Logger.get('PowerSyncDatabase')
};

/**
* Requesting nested or recursive locks can block the application in some circumstances.
* This default lock timeout will act as a failsafe to throw an error if a lock cannot
* be obtained.
*/
export const DEFAULT_LOCK_TIMEOUT_MS = 120_000; // 2 mins

export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
/**
* Transactions should be queued in the DBAdapter, but we also want to prevent
Expand All @@ -70,9 +77,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.closed = true;
this.options = { ...DEFAULT_POWERSYNC_DB_OPTIONS, ...options };
this.bucketStorageAdapter = this.generateBucketStorageAdapter();
this.sdkVersion = this.options.database.execute('SELECT powersync_rs_version()').rows?.item(0)[
'powersync_rs_version()'
];
this.sdkVersion = '';
}

get schema() {
Expand All @@ -98,7 +103,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.initialized = (async () => {
await this._init();
await this.bucketStorageAdapter.init();
await this.database.executeAsync('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
const version = await this.options.database.execute('SELECT powersync_rs_version()');
this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? '';
})();
await this.initialized;
}
Expand All @@ -111,7 +118,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.disconnect();

await this.initialized;

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
Expand Down Expand Up @@ -142,20 +148,20 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.disconnect();

// TODO DB name, verify this is necessary with extension
await this.database.transaction(async (tx) => {
await tx.executeAsync('DELETE FROM ps_oplog WHERE 1');
await tx.executeAsync('DELETE FROM ps_crud WHERE 1');
await tx.executeAsync('DELETE FROM ps_buckets WHERE 1');
await this.database.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_oplog WHERE 1');
await tx.execute('DELETE FROM ps_crud WHERE 1');
await tx.execute('DELETE FROM ps_buckets WHERE 1');

const existingTableRows = await tx.executeAsync(
const existingTableRows = await tx.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"
);

if (!existingTableRows.rows.length) {
return;
}
for (const row of existingTableRows.rows._array) {
await tx.executeAsync(`DELETE FROM ${row.name} WHERE 1`);
await tx.execute(`DELETE FROM ${row.name} WHERE 1`);
}
});
}
Expand All @@ -181,14 +187,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async getUploadQueueStats(includeSize?: boolean): Promise<UploadQueueStats> {
return this.readTransaction(async (tx) => {
if (includeSize) {
const result = await tx.executeAsync(
'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud'
);
const result = await tx.execute('SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');

const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0, row?.size ?? 0);
} else {
const result = await tx.executeAsync('SELECT count(*) as count FROM ps_crud');
const result = await tx.execute('SELECT count(*) as count FROM ps_crud');
const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0);
}
Expand All @@ -213,7 +217,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* and a single transaction may be split over multiple batches.
*/
async getCrudBatch(limit: number): Promise<CrudBatch | null> {
const result = await this.database.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
const result = await this.database.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
limit + 1
]);

Expand All @@ -231,11 +235,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const last = all[all.length - 1];
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null && (await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null && (await tx.execute('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
} else {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -258,7 +262,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async getNextCrudTransaction(): Promise<CrudTransaction> {
return await this.readTransaction(async (tx) => {
const first = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
const first = await tx.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');

if (!first.rows.length) {
return null;
Expand All @@ -269,9 +273,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
if (!txId) {
all = [CrudEntry.fromRow(first.rows.item(0))];
} else {
const result = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [
txId
]);
const result = await tx.execute('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [txId]);
all = result.rows._array.map((row) => CrudEntry.fromRow(row));
}

Expand All @@ -281,14 +283,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
all,
async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint) {
const check = await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1');
const check = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
if (!check.rows?.length) {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
}
} else {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -303,36 +305,32 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a statement and optionally return results
*/
async execute(sql: string, parameters?: any[]) {
const res = await this.writeLock((tx) => tx.executeAsync(sql, parameters));
return res;
await this.initialized;
return this.database.execute(sql, parameters);
}

/**
* Execute a read-only query and return results
*/
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
return res.rows?._array ?? [];
await this.initialized;
return this.database.getAll(sql, parameters);
}

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*/
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
return res.rows?.item(0) ?? null;
await this.initialized;
return this.database.getOptional(sql, parameters);
}

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*/
async get<T>(sql: string, parameters?: any[]): Promise<T> {
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
const first = res.rows?.item(0);
if (!first) {
throw new Error('Result set is empty');
}
return first;
await this.initialized;
return this.database.get(sql, parameters);
}

/**
Expand All @@ -358,40 +356,43 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
});
}

async readTransaction<T>(callback: (tx: Transaction) => Promise<T>, lockTimeout?: number): Promise<T> {
async readTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS
): Promise<T> {
await this.initialized;
return this.runLockedTransaction(
AbstractPowerSyncDatabase.transactionMutex,
return this.database.readTransaction(
async (tx) => {
const res = await callback(tx);
await tx.rollbackAsync();
const res = await callback({ ...tx });
await tx.rollback();
return res;
},
lockTimeout
{ timeoutMs: lockTimeout }
);
}

async writeTransaction<T>(callback: (tx: Transaction) => Promise<T>, lockTimeout?: number): Promise<T> {
async writeTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS
): Promise<T> {
await this.initialized;
return this.runLockedTransaction(
AbstractPowerSyncDatabase.transactionMutex,
return this.database.writeTransaction(
async (tx) => {
const res = await callback(tx);
await tx.commitAsync();
await tx.commit();
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return res;
},
lockTimeout
{ timeoutMs: lockTimeout }
);
}

async *watch(sql: string, parameters: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
async *watch(sql: string, parameters?: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
//Fetch initial data
yield await this.execute(sql, parameters);

const resolvedTables = options?.tables ?? [];
if (!options?.tables) {
// TODO get tables from sql if not specified
const explained = await this.getAll(`EXPLAIN ${sql}`, parameters);
const rootPages = _.chain(explained)
.filter((row) => row['opcode'] == 'OpenRead' && row['p3'] == 0 && _.isNumber(row['p2']))
Expand All @@ -401,7 +402,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
`SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))`,
[JSON.stringify(rootPages)]
);
tables.forEach((t) => resolvedTables.push(t.tbl_name.replace(/^ps_data__/, '')));
tables.forEach((t) => resolvedTables.push(t.tbl_name.replace(POWERSYNC_TABLE_MATCH, '')));
}
for await (const event of this.onChange({
...(options ?? {}),
Expand Down Expand Up @@ -458,27 +459,4 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
return () => dispose();
});
}

private runLockedTransaction<T>(
mutex: Mutex,
callback: (tx: Transaction) => Promise<T>,
lockTimeout?: number
): Promise<T> {
return mutexRunExclusive(
mutex,
() => {
return new Promise<T>(async (resolve, reject) => {
try {
await this.database.transaction(async (tx) => {
const r = await callback(tx);
resolve(r);
});
} catch (ex) {
reject(ex);
}
});
},
{ timeoutMs: lockTimeout }
);
}
}
Loading
Loading