Skip to content

Commit

Permalink
CRUD improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Jan 15, 2024
1 parent 9933c84 commit 9bc088d
Show file tree
Hide file tree
Showing 7 changed files with 3,938 additions and 121 deletions.
5 changes: 5 additions & 0 deletions .changeset/many-hairs-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Improved connector CRUD uploads to be triggered whenever an internal CRUD operation change is triggered. Improved CRUD upload debouncing to rather use a throttled approach - executing multiple continous write/CRUD operations will now trigger a connector upload at most (every) 1 second (by default).
2 changes: 1 addition & 1 deletion apps/supabase-todolist
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import { UploadQueueStats } from '../db/crud/UploadQueueStatus';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector';
import {
AbstractStreamingSyncImplementation,
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
StreamingSyncImplementationListener
} from './sync/stream/AbstractStreamingSyncImplementation';
import { CrudBatch } from './sync/bucket/CrudBatch';
import { CrudTransaction } from './sync/bucket/CrudTransaction';
import { BucketStorageAdapter } from './sync/bucket/BucketStorageAdapter';
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter';
import { CrudEntry } from './sync/bucket/CrudEntry';
import { mutexRunExclusive } from '../utils/mutex';
import { BaseObserver } from '../utils/BaseObserver';
Expand All @@ -22,13 +23,19 @@ export interface PowerSyncDatabaseOptions {
schema: Schema;
database: DBAdapter;
retryDelay?: number;
crudUploadThrottleMs?: number;
logger?: ILogger;
}

export interface SQLWatchOptions {
signal?: AbortSignal;
tables?: string[];
throttleMs?: number;
/**
* Allows for watching any SQL table
* by not removing PowerSync table name prefixes
*/
rawTableNames?: boolean;
}

export interface WatchOnChangeEvent {
Expand All @@ -45,7 +52,8 @@ export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
retryDelay: 5000,
logger: Logger.get('PowerSyncDatabase')
logger: Logger.get('PowerSyncDatabase'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

/**
Expand Down Expand Up @@ -133,6 +141,21 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? '';
this.ready = true;
this.iterateListeners((cb) => cb.initialized?.());
this.watchCrudUploads();
}

/**
* Queues a CRUD upload when internal CRUD tables have been updated
*/
protected async watchCrudUploads() {
for await (const event of this.onChange({
tables: [PSInternalTable.CRUD],
rawTableNames: true
})) {
if (this.connected) {
this.syncStreamImplementation?.triggerCrudUpload();
}
}
}

/**
Expand Down Expand Up @@ -182,9 +205,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

// TODO DB name, verify this is necessary with extension
await this.database.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_oplog WHERE 1');
await tx.execute('DELETE FROM ps_crud WHERE 1');
await tx.execute('DELETE FROM ps_buckets WHERE 1');
await tx.execute(`DELETE FROM ${PSInternalTable.OPLOG} WHERE 1`);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE 1`);
await tx.execute(`DELETE FROM ${PSInternalTable.BUCKETS} WHERE 1`);

const existingTableRows = await tx.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"
Expand Down Expand Up @@ -220,12 +243,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async getUploadQueueStats(includeSize?: boolean): Promise<UploadQueueStats> {
return this.readTransaction(async (tx) => {
if (includeSize) {
const result = await tx.execute('SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');
const result = await tx.execute(
`SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ${PSInternalTable.CRUD}`
);

const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0, row?.size ?? 0);
} else {
const result = await tx.execute('SELECT count(*) as count FROM ps_crud');
const result = await tx.execute(`SELECT count(*) as count FROM ${PSInternalTable.CRUD}`);
const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0);
}
Expand All @@ -250,9 +275,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* and a single transaction may be split over multiple batches.
*/
async getCrudBatch(limit: number): Promise<CrudBatch | null> {
const result = await this.database.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
limit + 1
]);
const result = await this.database.execute(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT ?`,
[limit + 1]
);

const all: CrudEntry[] = result.rows?._array?.map((row) => CrudEntry.fromRow(row)) ?? [];

Expand All @@ -268,11 +294,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const last = all[all.length - 1];
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null && (await tx.execute('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [last.clientId]);
if (writeCheckpoint != null && (await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`)) == null) {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
} else {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -295,7 +323,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async getNextCrudTransaction(): Promise<CrudTransaction> {
return await this.readTransaction(async (tx) => {
const first = await tx.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
const first = await tx.execute(`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`);

if (!first.rows.length) {
return null;
Expand All @@ -306,7 +334,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
if (!txId) {
all = [CrudEntry.fromRow(first.rows.item(0))];
} else {
const result = await tx.execute('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [txId]);
const result = await tx.execute(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`,
[txId]
);
all = result.rows._array.map((row) => CrudEntry.fromRow(row));
}

Expand All @@ -316,14 +347,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
all,
async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [last.clientId]);
if (writeCheckpoint) {
const check = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
const check = await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`);
if (!check.rows?.length) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
}
} else {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -340,7 +373,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async execute(sql: string, parameters?: any[]) {
await this.waitForReady();
const result = await this.database.execute(sql, parameters);
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return result;
}

Expand Down Expand Up @@ -386,7 +418,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.waitForReady();
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => {
const res = await callback(this.database);
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return res;
});
}
Expand Down Expand Up @@ -415,7 +446,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async (tx) => {
const res = await callback(tx);
await tx.commit();
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return res;
},
{ timeoutMs: lockTimeout }
Expand Down Expand Up @@ -475,10 +505,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const dispose = this.database.registerListener({
tablesUpdated: async (update) => {
const { table } = update;
if (!table.match(POWERSYNC_TABLE_MATCH)) {
const { rawTableNames } = options;

if (!rawTableNames && !table.match(POWERSYNC_TABLE_MATCH)) {
return;
}
const tableName = table.replace(POWERSYNC_TABLE_MATCH, '');

const tableName = rawTableNames ? table : table.replace(POWERSYNC_TABLE_MATCH, '');
throttledTableUpdates.push(tableName);

flushTableUpdates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export interface BucketChecksum {
count: number;
}

export enum PSInternalTable {
DATA = 'ps_data',
CRUD = 'ps_crud',
BUCKETS = 'ps_buckets',
OPLOG = 'ps_oplog'
}

export interface BucketStorageAdapter {
init(): Promise<void>;
saveSyncData(batch: SyncDataBatch): Promise<void>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,27 @@ export interface AbstractStreamingSyncImplementationOptions {
uploadCrud: () => Promise<void>;
logger?: ILogger;
retryDelayMs?: number;
crudUploadThrottleMs?: number;
}

export interface StreamingSyncImplementationListener extends BaseListener {
statusChanged?: (status: SyncStatus) => void;
}

export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;

export const DEFAULT_STREAMING_SYNC_OPTIONS = {
retryDelayMs: 5000,
logger: Logger.get('PowerSyncStream')
logger: Logger.get('PowerSyncStream'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

const CRUD_UPLOAD_DEBOUNCE_MS = 1000;

export abstract class AbstractStreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener> {
protected _lastSyncedAt: Date | null;
protected options: AbstractStreamingSyncImplementationOptions;

syncStatus: SyncStatus;
triggerCrudUpload: () => void;

constructor(options: AbstractStreamingSyncImplementationOptions) {
super();
Expand All @@ -67,6 +70,17 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
downloading: false
}
});

this.triggerCrudUpload = _.throttle(
() => {
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
return;
}
this._uploadAllCrud();
},
this.options.crudUploadThrottleMs,
{ trailing: true }
);
}

get lastSyncedAt() {
Expand All @@ -88,17 +102,6 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
return this.options.adapter.hasCompletedSync();
}

triggerCrudUpload = _.debounce(
() => {
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
return;
}
this._uploadAllCrud();
},
CRUD_UPLOAD_DEBOUNCE_MS,
{ trailing: true }
);

protected async _uploadAllCrud(): Promise<void> {
return this.obtainLock({
type: LockType.CRUD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
await this.waitForReady();
await connector.uploadData(this);
},
retryDelayMs: this.options.retryDelay
retryDelayMs: this.options.retryDelay,
crudUploadThrottleMs: this.options.crudUploadThrottleMs
});
}
}
Loading

0 comments on commit 9bc088d

Please sign in to comment.