diff --git a/.changeset/chilled-walls-promise.md b/.changeset/chilled-walls-promise.md new file mode 100644 index 00000000..e4fd2496 --- /dev/null +++ b/.changeset/chilled-walls-promise.md @@ -0,0 +1,11 @@ +--- +'@journeyapps/powersync-sdk-react-native': patch +'@journeyapps/powersync-sdk-common': patch +--- + +Updated logic to correspond with React Native Quick SQLite concurrent transactions. Added helper methods on transaction contexts. + +API changes include: +- Removal of synchronous DB operations in transactions: `execute`, `commit`, `rollback` are now async functions. `executeAsync`, `commitAsync` and `rollbackAsync` have been removed. +- Transaction contexts now have `get`, `getAll` and `getOptional` helpers. +- Added a default lock timeout of 2 minutes to aide with potential recursive lock/transaction requests. \ No newline at end of file diff --git a/.changeset/dry-pets-yawn.md b/.changeset/dry-pets-yawn.md new file mode 100644 index 00000000..f103c429 --- /dev/null +++ b/.changeset/dry-pets-yawn.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-react-native': patch +--- + +Update README polyfill command. diff --git a/.changeset/friendly-shrimps-fry.md b/.changeset/friendly-shrimps-fry.md new file mode 100644 index 00000000..1aea5671 --- /dev/null +++ b/.changeset/friendly-shrimps-fry.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-common': patch +--- + +Fix update trigger for local only watched tables. diff --git a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts index 8abe3598..1b5fcd8e 100644 --- a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts @@ -46,6 +46,13 @@ export const DEFAULT_POWERSYNC_DB_OPTIONS = { logger: Logger.get('PowerSyncDatabase') }; +/** + * Requesting nested or recursive locks can block the application in some circumstances. + * This default lock timeout will act as a failsafe to throw an error if a lock cannot + * be obtained. + */ +export const DEFAULT_LOCK_TIMEOUT_MS = 120_000; // 2 mins + export abstract class AbstractPowerSyncDatabase extends BaseObserver { /** * Transactions should be queued in the DBAdapter, but we also want to prevent @@ -70,9 +77,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { await this._init(); await this.bucketStorageAdapter.init(); - await this.database.executeAsync('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]); + await this.database.execute('SELECT powersync_replace_schema(?)', [JSON.stringify(this.schema.toJSON())]); + const version = await this.options.database.execute('SELECT powersync_rs_version()'); + this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? ''; })(); await this.initialized; } @@ -111,7 +118,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { @@ -142,12 +148,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { - await tx.executeAsync('DELETE FROM ps_oplog WHERE 1'); - await tx.executeAsync('DELETE FROM ps_crud WHERE 1'); - await tx.executeAsync('DELETE FROM ps_buckets WHERE 1'); + await this.database.writeTransaction(async (tx) => { + await tx.execute('DELETE FROM ps_oplog WHERE 1'); + await tx.execute('DELETE FROM ps_crud WHERE 1'); + await tx.execute('DELETE FROM ps_buckets WHERE 1'); - const existingTableRows = await tx.executeAsync( + const existingTableRows = await tx.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'" ); @@ -155,7 +161,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { return this.readTransaction(async (tx) => { if (includeSize) { - const result = await tx.executeAsync( - 'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud' - ); + const result = await tx.execute('SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud'); const row = result.rows.item(0); return new UploadQueueStats(row?.count ?? 0, row?.size ?? 0); } else { - const result = await tx.executeAsync('SELECT count(*) as count FROM ps_crud'); + const result = await tx.execute('SELECT count(*) as count FROM ps_crud'); const row = result.rows.item(0); return new UploadQueueStats(row?.count ?? 0); } @@ -213,7 +217,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { - const result = await this.database.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [ + const result = await this.database.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [ limit + 1 ]); @@ -231,11 +235,11 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { await this.writeTransaction(async (tx) => { - await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); - if (writeCheckpoint != null && (await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1')) == null) { - await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]); + await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); + if (writeCheckpoint != null && (await tx.execute('SELECT 1 FROM ps_crud LIMIT 1')) == null) { + await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]); } else { - await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [ + await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [ this.bucketStorageAdapter.getMaxOpId() ]); } @@ -258,7 +262,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { return await this.readTransaction(async (tx) => { - const first = await tx.executeAsync('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1'); + const first = await tx.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1'); if (!first.rows.length) { return null; @@ -269,9 +273,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver CrudEntry.fromRow(row)); } @@ -281,14 +283,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { await this.writeTransaction(async (tx) => { - await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); + await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); if (writeCheckpoint) { - const check = await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1'); + const check = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1'); if (!check.rows?.length) { - await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]); + await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]); } } else { - await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [ + await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [ this.bucketStorageAdapter.getMaxOpId() ]); } @@ -303,36 +305,32 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver tx.executeAsync(sql, parameters)); - return res; + await this.initialized; + return this.database.execute(sql, parameters); } /** * Execute a read-only query and return results */ async getAll(sql: string, parameters?: any[]): Promise { - const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters)); - return res.rows?._array ?? []; + await this.initialized; + return this.database.getAll(sql, parameters); } /** * Execute a read-only query and return the first result, or null if the ResultSet is empty. */ async getOptional(sql: string, parameters?: any[]): Promise { - const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters)); - return res.rows?.item(0) ?? null; + await this.initialized; + return this.database.getOptional(sql, parameters); } /** * Execute a read-only query and return the first result, error if the ResultSet is empty. */ async get(sql: string, parameters?: any[]): Promise { - const res = await this.readTransaction((tx) => tx.executeAsync(sql, parameters)); - const first = res.rows?.item(0); - if (!first) { - throw new Error('Result set is empty'); - } - return first; + await this.initialized; + return this.database.get(sql, parameters); } /** @@ -358,40 +356,43 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver(callback: (tx: Transaction) => Promise, lockTimeout?: number): Promise { + async readTransaction( + callback: (tx: Transaction) => Promise, + lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS + ): Promise { await this.initialized; - return this.runLockedTransaction( - AbstractPowerSyncDatabase.transactionMutex, + return this.database.readTransaction( async (tx) => { - const res = await callback(tx); - await tx.rollbackAsync(); + const res = await callback({ ...tx }); + await tx.rollback(); return res; }, - lockTimeout + { timeoutMs: lockTimeout } ); } - async writeTransaction(callback: (tx: Transaction) => Promise, lockTimeout?: number): Promise { + async writeTransaction( + callback: (tx: Transaction) => Promise, + lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS + ): Promise { await this.initialized; - return this.runLockedTransaction( - AbstractPowerSyncDatabase.transactionMutex, + return this.database.writeTransaction( async (tx) => { const res = await callback(tx); - await tx.commitAsync(); + await tx.commit(); _.defer(() => this.syncStreamImplementation?.triggerCrudUpload()); return res; }, - lockTimeout + { timeoutMs: lockTimeout } ); } - async *watch(sql: string, parameters: any[], options?: SQLWatchOptions): AsyncIterable { + async *watch(sql: string, parameters?: any[], options?: SQLWatchOptions): AsyncIterable { //Fetch initial data yield await this.execute(sql, parameters); const resolvedTables = options?.tables ?? []; if (!options?.tables) { - // TODO get tables from sql if not specified const explained = await this.getAll(`EXPLAIN ${sql}`, parameters); const rootPages = _.chain(explained) .filter((row) => row['opcode'] == 'OpenRead' && row['p3'] == 0 && _.isNumber(row['p2'])) @@ -401,7 +402,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver resolvedTables.push(t.tbl_name.replace(/^ps_data__/, ''))); + tables.forEach((t) => resolvedTables.push(t.tbl_name.replace(POWERSYNC_TABLE_MATCH, ''))); } for await (const event of this.onChange({ ...(options ?? {}), @@ -458,27 +459,4 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver dispose(); }); } - - private runLockedTransaction( - mutex: Mutex, - callback: (tx: Transaction) => Promise, - lockTimeout?: number - ): Promise { - return mutexRunExclusive( - mutex, - () => { - return new Promise(async (resolve, reject) => { - try { - await this.database.transaction(async (tx) => { - const r = await callback(tx); - resolve(r); - }); - } catch (ex) { - reject(ex); - } - }); - }, - { timeoutMs: lockTimeout } - ); - } } diff --git a/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts index 0086b34a..69afc3cc 100644 --- a/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/powersync-sdk-common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -6,7 +6,6 @@ import { OpTypeEnum } from './OpType'; import { CrudBatch } from './CrudBatch'; import { CrudEntry } from './CrudEntry'; import { SyncDataBatch } from './SyncDataBatch'; -import { mutexRunExclusive } from '../../../utils/mutex'; import Logger, { ILogger } from 'js-logger'; const COMPACT_OPERATION_INTERVAL = 1_000; @@ -35,7 +34,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { async init() { this._hasCompletedSync = false; - const existingTableRows = await this.db.executeAsync( + const existingTableRows = await this.db.execute( `SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'` ); for (let row of existingTableRows.rows?._array ?? []) { @@ -52,7 +51,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { startSession(): void {} async getBucketStates(): Promise { - const result = await this.db.executeAsync( + const result = await this.db.execute( 'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0' ); return result.rows?._array ?? []; @@ -62,7 +61,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { await this.writeTransaction(async (tx) => { let count = 0; for (let b of batch.buckets) { - const result = await tx.executeAsync('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ + const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ 'save', JSON.stringify({ buckets: [b.toJSON()] }) ]); @@ -93,14 +92,14 @@ export class SqliteBucketStorage implements BucketStorageAdapter { this.logger.debug('Deleting bucket', bucket); // This await this.writeTransaction(async (tx) => { - await tx.executeAsync( + await tx.execute( `UPDATE ps_oplog SET op=${OpTypeEnum.REMOVE}, data=NULL WHERE op=${OpTypeEnum.PUT} AND superseded=0 AND bucket=?`, [bucket] ); // Rename bucket - await tx.executeAsync('UPDATE ps_oplog SET bucket=? WHERE bucket=?', [newName, bucket]); - await tx.executeAsync('DELETE FROM ps_buckets WHERE name = ?', [bucket]); - await tx.executeAsync( + await tx.execute('UPDATE ps_oplog SET bucket=? WHERE bucket=?', [newName, bucket]); + await tx.execute('DELETE FROM ps_buckets WHERE name = ?', [bucket]); + await tx.execute( 'INSERT INTO ps_buckets(name, pending_delete, last_op) SELECT ?, 1, IFNULL(MAX(op_id), 0) FROM ps_oplog WHERE bucket = ?', [newName, newName] ); @@ -113,9 +112,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { if (this._hasCompletedSync) { return true; } - const r = await this.db.executeAsync( - `SELECT name, last_applied_op FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1` - ); + const r = await this.db.execute(`SELECT name, last_applied_op FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1`); const completed = !!r.rows?.length; if (completed) { this._hasCompletedSync = true; @@ -135,13 +132,13 @@ export class SqliteBucketStorage implements BucketStorageAdapter { const bucketNames = checkpoint.buckets.map((b) => b.bucket); await this.writeTransaction(async (tx) => { - await tx.executeAsync( - `UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))`, - [checkpoint.last_op_id, JSON.stringify(bucketNames)] - ); + await tx.execute(`UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))`, [ + checkpoint.last_op_id, + JSON.stringify(bucketNames) + ]); if (checkpoint.write_checkpoint) { - await tx.executeAsync("UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", [checkpoint.write_checkpoint]); + await tx.execute("UPDATE ps_buckets SET last_op = ? WHERE name = '$local'", [checkpoint.write_checkpoint]); } }); @@ -170,7 +167,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { * It's best to execute this on the same thread * https://github.com/journeyapps/powersync-sqlite-core/blob/40554dc0e71864fe74a0cb00f1e8ca4e328ff411/crates/sqlite/sqlite/sqlite3.h#L2578 */ - const { insertId: result } = await tx.executeAsync('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ + const { insertId: result } = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ 'sync_local', '' ]); @@ -179,9 +176,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { } async validateChecksums(checkpoint: Checkpoint): Promise { - const rs = await this.db.executeAsync('SELECT powersync_validate_checkpoint(?) as result', [ - JSON.stringify(checkpoint) - ]); + const rs = await this.db.execute('SELECT powersync_validate_checkpoint(?) as result', [JSON.stringify(checkpoint)]); const resultItem = rs.rows?.item(0); this.logger.debug('validateChecksums result item', resultItem); @@ -224,10 +219,10 @@ export class SqliteBucketStorage implements BucketStorageAdapter { private async deletePendingBuckets() { if (this.pendingBucketDeletes !== false) { await this.writeTransaction(async (tx) => { - await tx.executeAsync( + await tx.execute( 'DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)' ); - await tx.executeAsync( + await tx.execute( 'DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op' ); }); @@ -242,20 +237,20 @@ export class SqliteBucketStorage implements BucketStorageAdapter { } await this.writeTransaction(async (tx) => { - await tx.executeAsync('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', ['clear_remove_ops', '']); + await tx.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', ['clear_remove_ops', '']); }); this.compactCounter = 0; } async updateLocalTarget(cb: () => Promise): Promise { - const rs1 = await this.db.executeAsync("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [ + const rs1 = await this.db.execute("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [ SqliteBucketStorage.MAX_OP_ID ]); if (!rs1.rows?.length) { // Nothing to update return false; } - const rs = await this.db.executeAsync("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); + const rs = await this.db.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); if (!rs.rows?.length) { // Nothing to update return false; @@ -267,14 +262,14 @@ export class SqliteBucketStorage implements BucketStorageAdapter { this.logger.debug(`[updateLocalTarget] Updating target to checkpoint ${opId}`); return this.writeTransaction(async (tx) => { - const anyData = await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1'); + const anyData = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1'); if (!!anyData.rows?.length) { // if isNotEmpty this.logger.debug('updateLocalTarget', 'ps crud is not empty'); return false; } - const rs = await tx.executeAsync("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); + const rs = await tx.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); if (!rs.rows?.length) { // assert isNotEmpty throw new Error('SQlite Sequence should not be empty'); @@ -288,14 +283,14 @@ export class SqliteBucketStorage implements BucketStorageAdapter { return false; } - const response = await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [opId]); + const response = await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [opId]); this.logger.debug(['[updateLocalTarget] Response from updating target_op ', JSON.stringify(response)]); return true; }); } async hasCrud(): Promise { - const anyData = this.db.execute('SELECT 1 FROM ps_crud LIMIT 1'); + const anyData = await this.db.execute('SELECT 1 FROM ps_crud LIMIT 1'); return !!anyData.rows?.length; } @@ -308,7 +303,7 @@ export class SqliteBucketStorage implements BucketStorageAdapter { return null; } - const crudResult = await this.db.executeAsync('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]); + const crudResult = await this.db.execute('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]); let all: CrudEntry[] = []; for (let row of crudResult.rows?._array ?? []) { @@ -325,42 +320,22 @@ export class SqliteBucketStorage implements BucketStorageAdapter { haveMore: true, complete: async (writeCheckpoint?: string) => { return this.writeTransaction(async (tx) => { - await tx.executeAsync('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); + await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]); if (writeCheckpoint) { - const crudResult = await tx.executeAsync('SELECT 1 FROM ps_crud LIMIT 1'); + const crudResult = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1'); if (crudResult.rows?.length) { - await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]); + await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]); } } else { - await tx.executeAsync("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [this.getMaxOpId()]); + await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [this.getMaxOpId()]); } }); } }; } - /// Note: The asynchronous nature of this is due to this needing a global - /// lock. The actual database operations are still synchronous, and it - /// is assumed that multiple functions on this instance won't be called - /// concurrently. async writeTransaction(callback: (tx: Transaction) => Promise, options?: { timeoutMs: number }): Promise { - return mutexRunExclusive( - this.mutex, - () => { - return new Promise(async (resolve, reject) => { - try { - await this.db.transaction(async (tx) => { - const r = await callback(tx); - await tx.commitAsync(); - resolve(r); - }); - } catch (ex) { - reject(ex); - } - }); - }, - options - ); + return this.db.writeTransaction(callback, options); } /** diff --git a/packages/powersync-sdk-common/src/db/DBAdapter.ts b/packages/powersync-sdk-common/src/db/DBAdapter.ts index 5a701154..4c6f234f 100644 --- a/packages/powersync-sdk-common/src/db/DBAdapter.ts +++ b/packages/powersync-sdk-common/src/db/DBAdapter.ts @@ -31,13 +31,19 @@ export type QueryResult = { }; }; -export interface Transaction { - commit: () => QueryResult; - commitAsync: () => Promise; - execute: (query: string, params?: any[]) => QueryResult; - executeAsync: (query: string, params?: any[] | undefined) => Promise; - rollback: () => QueryResult; - rollbackAsync: () => Promise; +export interface DBGetUtils { + getAll(sql: string, parameters?: any[]): Promise; + getOptional(sql: string, parameters?: any[]): Promise; + get(sql: string, parameters?: any[]): Promise; +} + +export interface LockContext extends DBGetUtils { + execute: (query: string, params?: any[] | undefined) => Promise; +} + +export interface Transaction extends LockContext { + commit: () => Promise; + rollback: () => Promise; } /** @@ -58,9 +64,15 @@ export interface DBAdapterListener extends BaseListener { tablesUpdated: (updateNotification: UpdateNotification) => void; } -export interface DBAdapter extends BaseObserverInterface { +export interface DBLockOptions { + timeoutMs?: number; +} + +export interface DBAdapter extends BaseObserverInterface, DBGetUtils { close: () => void; - transaction: (fn: (tx: Transaction) => Promise | void) => Promise; - execute: (query: string, params?: any[]) => QueryResult; - executeAsync: (query: string, params?: any[]) => Promise; + readLock: (fn: (tx: LockContext) => Promise, options?: DBLockOptions) => Promise; + readTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; + writeLock: (fn: (tx: LockContext) => Promise, options?: DBLockOptions) => Promise; + writeTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; + execute: (query: string, params?: any[]) => Promise; } diff --git a/packages/powersync-sdk-react-native/README.md b/packages/powersync-sdk-react-native/README.md index 167a491d..d1b0c2cb 100644 --- a/packages/powersync-sdk-react-native/README.md +++ b/packages/powersync-sdk-react-native/README.md @@ -63,7 +63,7 @@ import 'react-native-polyfill-globals/auto'; ``` ```bash - npx expo install -D @babel/plugin-transform-async-generator-functions + yarn add -D @babel/plugin-transform-async-generator-functions ``` Add the Babel plugin to your `babel.config.js` file @@ -427,9 +427,9 @@ export const ListsWidget = () => { try { await PowerSync.writeTransaction(async (tx) => { // Delete the main list - await tx.executeAsync(`DELETE FROM lists WHERE id = ?`, [item.id]); + await tx.execute(`DELETE FROM lists WHERE id = ?`, [item.id]); // Delete any children of the list - await tx.executeAsync(`DELETE FROM todos WHERE list_id = ?`, [item.id]); + await tx.execute(`DELETE FROM todos WHERE list_id = ?`, [item.id]); // Transactions are automatically committed at the end of execution // Transactions are automatically rolled back if an exception ocurred diff --git a/packages/powersync-sdk-react-native/package.json b/packages/powersync-sdk-react-native/package.json index 02abac1c..51864c30 100644 --- a/packages/powersync-sdk-react-native/package.json +++ b/packages/powersync-sdk-react-native/package.json @@ -24,7 +24,7 @@ }, "homepage": "https://docs.powersync.co/", "peerDependencies": { - "@journeyapps/react-native-quick-sqlite": "^0.0.1", + "@journeyapps/react-native-quick-sqlite": "0.0.2", "base-64": "^1.0.0", "react": "*", "react-native-fetch-api": "^3.0.0", @@ -35,12 +35,12 @@ "web-streams-polyfill": "^3.2.1" }, "dependencies": { - "@journeyapps/powersync-react": "^0.0.1", - "@journeyapps/powersync-sdk-common": "^0.0.1", + "@journeyapps/powersync-react": "0.0.1", + "@journeyapps/powersync-sdk-common": "0.0.1", "async-lock": "^1.4.0" }, "devDependencies": { - "@journeyapps/react-native-quick-sqlite": "^0.0.1", + "@journeyapps/react-native-quick-sqlite": "0.0.2", "@types/async-lock": "^1.4.0", "react": "18.2.0", "typescript": "^4.1.3" diff --git a/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts b/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts index e5a6c28d..02b4effd 100644 --- a/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts +++ b/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts @@ -1,32 +1,97 @@ -import { BaseObserver, DBAdapter, DBAdapterListener, Transaction } from '@journeyapps/powersync-sdk-common'; +import { + BaseObserver, + DBAdapter, + DBAdapterListener, + LockContext as PowerSyncLockContext, + Transaction as PowerSyncTransaction, + DBLockOptions, + DBGetUtils, + QueryResult +} from '@journeyapps/powersync-sdk-common'; import { QuickSQLiteConnection } from '@journeyapps/react-native-quick-sqlite'; /** * Adapter for React Native Quick SQLite - * This will be updated more once we implement concurrent transactions */ export class RNQSDBAdapter extends BaseObserver implements DBAdapter { + getAll: (sql: string, parameters?: any[]) => Promise; + getOptional: (sql: string, parameters?: any[]) => Promise; + get: (sql: string, parameters?: any[]) => Promise; + constructor(protected baseDB: QuickSQLiteConnection) { super(); // link table update commands baseDB.registerUpdateHook((update) => { this.iterateListeners((cb) => cb.tablesUpdated?.(update)); }); + + const topLevelUtils = this.generateDBHelpers({ execute: this.baseDB.execute }); + this.getAll = topLevelUtils.getAll; + this.getOptional = topLevelUtils.getOptional; + this.get = topLevelUtils.get; } close() { return this.baseDB.close(); } - transaction(fn: (tx: Transaction) => void | Promise) { - return this.baseDB.transaction(fn); + readLock(fn: (tx: PowerSyncLockContext) => Promise, options?: DBLockOptions): Promise { + return this.baseDB.readLock((dbTx) => fn(this.generateDBHelpers(dbTx)), options); + } + + readTransaction(fn: (tx: PowerSyncTransaction) => Promise, options?: DBLockOptions): Promise { + return this.baseDB.readTransaction((dbTx) => fn(this.generateDBHelpers(dbTx)), options); + } + + writeLock(fn: (tx: PowerSyncLockContext) => Promise, options?: DBLockOptions): Promise { + return this.baseDB.writeLock((dbTx) => fn(this.generateDBHelpers(dbTx)), options); + } + + writeTransaction(fn: (tx: PowerSyncTransaction) => Promise, options?: DBLockOptions): Promise { + return this.baseDB.writeTransaction((dbTx) => fn(this.generateDBHelpers(dbTx)), options); } execute(query: string, params?: any[]) { return this.baseDB.execute(query, params); } - executeAsync(query: string, params?: any[]) { - return this.baseDB.executeAsync(query, params); + /** + * Adds DB get utils to lock contexts and transaction contexts + * @param tx + * @returns + */ + private generateDBHelpers Promise }>( + tx: T + ): T & DBGetUtils { + return { + ...tx, + /** + * Execute a read-only query and return results + */ + async getAll(sql: string, parameters?: any[]): Promise { + const res = await tx.execute(sql, parameters); + return res.rows?._array ?? []; + }, + + /** + * Execute a read-only query and return the first result, or null if the ResultSet is empty. + */ + async getOptional(sql: string, parameters?: any[]): Promise { + const res = await tx.execute(sql, parameters); + return res.rows?.item(0) ?? null; + }, + + /** + * Execute a read-only query and return the first result, error if the ResultSet is empty. + */ + async get(sql: string, parameters?: any[]): Promise { + const res = await tx.execute(sql, parameters); + const first = res.rows?.item(0); + if (!first) { + throw new Error('Result set is empty'); + } + return first; + } + }; } } diff --git a/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBOpenFactory.ts b/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBOpenFactory.ts index 9c218b1b..c44530be 100644 --- a/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBOpenFactory.ts +++ b/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBOpenFactory.ts @@ -1,4 +1,4 @@ -import { open } from '@journeyapps/react-native-quick-sqlite'; +import { open, QuickSQLite, QuickSQLiteConnection } from '@journeyapps/react-native-quick-sqlite'; import { AbstractPowerSyncDatabase, @@ -19,7 +19,22 @@ export class RNQSPowerSyncDatabaseOpenFactory extends AbstractPowerSyncDatabaseO * in the options (if provided) * https://github.com/margelo/react-native-quick-sqlite/blob/main/README.md#loading-existing-dbs */ - return new RNQSDBAdapter(open({ name: this.options.dbFilename, location: this.options.dbLocation })); + const { dbFilename } = this.options; + const openOptions = { location: this.options.dbLocation }; + let DB: QuickSQLiteConnection; + try { + // Hot reloads can sometimes clear global JS state, but not close DB on native side + DB = open(dbFilename, openOptions); + } catch (ex) { + if (ex.message.includes('already open')) { + QuickSQLite.close(dbFilename); + DB = open(dbFilename, openOptions); + } else { + throw ex; + } + } + + return new RNQSDBAdapter(DB); } generateInstance(options: PowerSyncDatabaseOptions): AbstractPowerSyncDatabase { diff --git a/packages/powersync-sdk-react-native/src/sync/stream/ReactNativeRemote.ts b/packages/powersync-sdk-react-native/src/sync/stream/ReactNativeRemote.ts index c5249378..8af575d8 100644 --- a/packages/powersync-sdk-react-native/src/sync/stream/ReactNativeRemote.ts +++ b/packages/powersync-sdk-react-native/src/sync/stream/ReactNativeRemote.ts @@ -70,6 +70,7 @@ export class ReactNativeRemote extends AbstractRemote { error.status = res.status; throw error; } + return res.body; } } diff --git a/yarn.lock b/yarn.lock index a3e57c92..d004bfa7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2134,19 +2134,12 @@ "@types/yargs" "^17.0.8" chalk "^4.0.0" -"@journeyapps/powersync-sdk-react-native@0.0.1-alpha.0": - version "0.0.1-alpha.0" - resolved "https://registry.npmjs.org/@journeyapps/powersync-sdk-react-native/-/powersync-sdk-react-native-0.0.1-alpha.0.tgz#f5cef94d61c9e910f1a197f8da147f996acdd78f" - integrity sha512-FcQW+ei899RO2vFXV4qF+tHCxAEdYmGFZCQEePtzXQ66KrMhpH5Pz31+97JAkFCw0TDsrq0l++4SDwvYV7JrQg== +"@journeyapps/react-native-quick-sqlite@0.0.2": + version "0.0.2" + resolved "https://registry.npmjs.org/@journeyapps/react-native-quick-sqlite/-/react-native-quick-sqlite-0.0.2.tgz#a6bb13de4446a11163d4dca441928f187f7ef859" + integrity sha512-5kYfBCHKr8qNdgxTFces6huXeigeTpkfcVMSjuZgPW+4ljHsH7brdocvBKmx8iHz2Q7pZ7cIyV0RZ0oz/htnLA== dependencies: - "@journeyapps/powersync-react" "^0.0.1-alpha.0" - "@journeyapps/powersync-sdk-common" "^0.0.1-alpha.0" - async-lock "^1.4.0" - -"@journeyapps/react-native-quick-sqlite@0.0.1", "@journeyapps/react-native-quick-sqlite@^0.0.1": - version "0.0.1" - resolved "https://registry.npmjs.org/@journeyapps/react-native-quick-sqlite/-/react-native-quick-sqlite-0.0.1.tgz#a5b731bfc658d6c8257ef3edebf243941a69a764" - integrity sha512-vUXWngjpTFX6xCsy6Y8jd9zcI1eajIw/rWfdkpJOdnazR0SzAmrMc0ViRB6K84fWq9CmAV3DZw+GCVEHubqhSw== + lodash "^4.17.21" "@jridgewell/gen-mapping@^0.3.0", "@jridgewell/gen-mapping@^0.3.2": version "0.3.3"