Skip to content

Commit

Permalink
Merge pull request #7 from journeyapps/feature/concurrent-transactions
Browse files Browse the repository at this point in the history
[Feature] Concurrent DB Connections and Transactions
  • Loading branch information
stevensJourney authored Oct 30, 2023
2 parents af0031b + ec1f993 commit b612362
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 170 deletions.
11 changes: 11 additions & 0 deletions .changeset/chilled-walls-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@journeyapps/powersync-sdk-react-native': patch
'@journeyapps/powersync-sdk-common': patch
---

Updated logic to correspond with React Native Quick SQLite concurrent transactions. Added helper methods on transaction contexts.

API changes include:
- Removal of synchronous DB operations in transactions: `execute`, `commit`, `rollback` are now async functions. `executeAsync`, `commitAsync` and `rollbackAsync` have been removed.
- Transaction contexts now have `get`, `getAll` and `getOptional` helpers.
- Added a default lock timeout of 2 minutes to aide with potential recursive lock/transaction requests.
5 changes: 5 additions & 0 deletions .changeset/dry-pets-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-react-native': patch
---

Update README polyfill command.
5 changes: 5 additions & 0 deletions .changeset/friendly-shrimps-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Fix update trigger for local only watched tables.
132 changes: 55 additions & 77 deletions packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ export const DEFAULT_POWERSYNC_DB_OPTIONS = {
logger: Logger.get('PowerSyncDatabase')
};

/**
* Requesting nested or recursive locks can block the application in some circumstances.
* This default lock timeout will act as a failsafe to throw an error if a lock cannot
* be obtained.
*/
export const DEFAULT_LOCK_TIMEOUT_MS = 120_000; // 2 mins

export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
/**
* Transactions should be queued in the DBAdapter, but we also want to prevent
Expand All @@ -70,9 +77,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.closed = true;
this.options = { ...DEFAULT_POWERSYNC_DB_OPTIONS, ...options };
this.bucketStorageAdapter = this.generateBucketStorageAdapter();
this.sdkVersion = this.options.database.execute('SELECT powersync_rs_version()').rows?.item(0)[
'powersync_rs_version()'
];
this.sdkVersion = '';
}

get schema() {
Expand All @@ -98,7 +103,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.initialized = (async () => {
await this._init();
await this.bucketStorageAdapter.init();
await this.database.executeAsync('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]);
const version = await this.options.database.execute('SELECT powersync_rs_version()');
this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? '';
})();
await this.initialized;
}
Expand All @@ -111,7 +118,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.disconnect();

await this.initialized;

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
Expand Down Expand Up @@ -142,20 +148,20 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.disconnect();

// TODO DB name, verify this is necessary with extension
await this.database.transaction(async (tx) => {
await tx.executeAsync('DELETE FROM ps_oplog WHERE 1');
await tx.executeAsync('DELETE FROM ps_crud WHERE 1');
await tx.executeAsync('DELETE FROM ps_buckets WHERE 1');
await this.database.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_oplog WHERE 1');
await tx.execute('DELETE FROM ps_crud WHERE 1');
await tx.execute('DELETE FROM ps_buckets WHERE 1');

const existingTableRows = await tx.executeAsync(
const existingTableRows = await tx.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"
);

if (!existingTableRows.rows.length) {
return;
}
for (const row of existingTableRows.rows._array) {
await tx.executeAsync(`DELETE FROM ${row.name} WHERE 1`);
await tx.execute(`DELETE FROM ${row.name} WHERE 1`);
}
});
}
Expand All @@ -181,14 +187,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async getUploadQueueStats(includeSize?: boolean): Promise<UploadQueueStats> {
return this.readTransaction(async (tx) => {
if (includeSize) {
const result = await tx.executeAsync(
'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud'
);
const result = await tx.execute('SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');

const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0, row?.size ?? 0);
} else {
const result = await tx.executeAsync('SELECT count(*) as count FROM ps_crud');
const result = await tx.execute('SELECT count(*) as count FROM ps_crud');
const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0);
}
Expand All @@ -213,7 +217,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* and a single transaction may be split over multiple batches.
*/
async getCrudBatch(limit: number): Promise<CrudBatch | null> {
const result = await this.database.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
const result = await this.database.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
limit + 1
]);

Expand All @@ -231,11 +235,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const last = all[all.length - 1];
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null && (await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null && (await tx.execute('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
} else {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -258,7 +262,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async getNextCrudTransaction(): Promise<CrudTransaction> {
return await this.readTransaction(async (tx) => {
const first = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
const first = await tx.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');

if (!first.rows.length) {
return null;
Expand All @@ -269,9 +273,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
if (!txId) {
all = [CrudEntry.fromRow(first.rows.item(0))];
} else {
const result = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [
txId
]);
const result = await tx.execute('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [txId]);
all = result.rows._array.map((row) => CrudEntry.fromRow(row));
}

Expand All @@ -281,14 +283,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
all,
async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint) {
const check = await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1');
const check = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
if (!check.rows?.length) {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
}
} else {
await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -303,36 +305,32 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a statement and optionally return results
*/
async execute(sql: string, parameters?: any[]) {
const res = await this.writeLock((tx) => tx.executeAsync(sql, parameters));
return res;
await this.initialized;
return this.database.execute(sql, parameters);
}

/**
* Execute a read-only query and return results
*/
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
return res.rows?._array ?? [];
await this.initialized;
return this.database.getAll(sql, parameters);
}

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*/
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
return res.rows?.item(0) ?? null;
await this.initialized;
return this.database.getOptional(sql, parameters);
}

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*/
async get<T>(sql: string, parameters?: any[]): Promise<T> {
const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters));
const first = res.rows?.item(0);
if (!first) {
throw new Error('Result set is empty');
}
return first;
await this.initialized;
return this.database.get(sql, parameters);
}

/**
Expand All @@ -358,40 +356,43 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
});
}

async readTransaction<T>(callback: (tx: Transaction) => Promise<T>, lockTimeout?: number): Promise<T> {
async readTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS
): Promise<T> {
await this.initialized;
return this.runLockedTransaction(
AbstractPowerSyncDatabase.transactionMutex,
return this.database.readTransaction(
async (tx) => {
const res = await callback(tx);
await tx.rollbackAsync();
const res = await callback({ ...tx });
await tx.rollback();
return res;
},
lockTimeout
{ timeoutMs: lockTimeout }
);
}

async writeTransaction<T>(callback: (tx: Transaction) => Promise<T>, lockTimeout?: number): Promise<T> {
async writeTransaction<T>(
callback: (tx: Transaction) => Promise<T>,
lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS
): Promise<T> {
await this.initialized;
return this.runLockedTransaction(
AbstractPowerSyncDatabase.transactionMutex,
return this.database.writeTransaction(
async (tx) => {
const res = await callback(tx);
await tx.commitAsync();
await tx.commit();
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return res;
},
lockTimeout
{ timeoutMs: lockTimeout }
);
}

async *watch(sql: string, parameters: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
async *watch(sql: string, parameters?: any[], options?: SQLWatchOptions): AsyncIterable<QueryResult> {
//Fetch initial data
yield await this.execute(sql, parameters);

const resolvedTables = options?.tables ?? [];
if (!options?.tables) {
// TODO get tables from sql if not specified
const explained = await this.getAll(`EXPLAIN ${sql}`, parameters);
const rootPages = _.chain(explained)
.filter((row) => row['opcode'] == 'OpenRead' && row['p3'] == 0 && _.isNumber(row['p2']))
Expand All @@ -401,7 +402,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
`SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))`,
[JSON.stringify(rootPages)]
);
tables.forEach((t) => resolvedTables.push(t.tbl_name.replace(/^ps_data__/, '')));
tables.forEach((t) => resolvedTables.push(t.tbl_name.replace(POWERSYNC_TABLE_MATCH, '')));
}
for await (const event of this.onChange({
...(options ?? {}),
Expand Down Expand Up @@ -458,27 +459,4 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
return () => dispose();
});
}

private runLockedTransaction<T>(
mutex: Mutex,
callback: (tx: Transaction) => Promise<T>,
lockTimeout?: number
): Promise<T> {
return mutexRunExclusive(
mutex,
() => {
return new Promise<T>(async (resolve, reject) => {
try {
await this.database.transaction(async (tx) => {
const r = await callback(tx);
resolve(r);
});
} catch (ex) {
reject(ex);
}
});
},
{ timeoutMs: lockTimeout }
);
}
}
Loading

0 comments on commit b612362

Please sign in to comment.