Skip to content

Commit

Permalink
Merge pull request #41 from powersync-ja/feature/better-crud-uploads
Browse files Browse the repository at this point in the history
[Improvement] CRUD Upload Triggers
  • Loading branch information
stevensJourney authored Jan 15, 2024
2 parents b0b0c4f + ff6a918 commit 8298b84
Show file tree
Hide file tree
Showing 8 changed files with 3,950 additions and 125 deletions.
5 changes: 5 additions & 0 deletions .changeset/many-hairs-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Improved connector CRUD uploads to be triggered whenever an internal CRUD operation change is triggered. Improved CRUD upload debouncing to rather use a throttled approach - executing multiple continous write/CRUD operations will now trigger a connector upload at most (every) 1 second (by default).
2 changes: 1 addition & 1 deletion apps/supabase-todolist
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import { UploadQueueStats } from '../db/crud/UploadQueueStatus';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector';
import {
AbstractStreamingSyncImplementation,
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
StreamingSyncImplementationListener
} from './sync/stream/AbstractStreamingSyncImplementation';
import { CrudBatch } from './sync/bucket/CrudBatch';
import { CrudTransaction } from './sync/bucket/CrudTransaction';
import { BucketStorageAdapter } from './sync/bucket/BucketStorageAdapter';
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter';
import { CrudEntry } from './sync/bucket/CrudEntry';
import { mutexRunExclusive } from '../utils/mutex';
import { BaseObserver } from '../utils/BaseObserver';
Expand All @@ -21,14 +22,29 @@ import { EventIterator } from 'event-iterator';
export interface PowerSyncDatabaseOptions {
schema: Schema;
database: DBAdapter;
/**
* Delay for retrying sync streaming operations
* from the PowerSync backend after an error occurs.
*/
retryDelay?: number;
/**
* Backend Connector CRUD operations are throttled
* to occur at most every `crudUploadThrottleMs`
* milliseconds.
*/
crudUploadThrottleMs?: number;
logger?: ILogger;
}

export interface SQLWatchOptions {
signal?: AbortSignal;
tables?: string[];
throttleMs?: number;
/**
* Allows for watching any SQL table
* by not removing PowerSync table name prefixes
*/
rawTableNames?: boolean;
}

export interface WatchOnChangeEvent {
Expand All @@ -45,7 +61,8 @@ export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
retryDelay: 5000,
logger: Logger.get('PowerSyncDatabase')
logger: Logger.get('PowerSyncDatabase'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

/**
Expand Down Expand Up @@ -135,6 +152,19 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
this.iterateListeners((cb) => cb.initialized?.());
}

/**
* Queues a CRUD upload when internal CRUD tables have been updated
*/
protected async watchCrudUploads() {
for await (const event of this.onChange({
tables: [PSInternalTable.CRUD],
rawTableNames: true,
signal: this.abortController?.signal
})) {
this.syncStreamImplementation?.triggerCrudUpload();
}
}

/**
* Wait for initialization to complete.
* While initializing is automatic, this helps to catch and report initialization errors.
Expand Down Expand Up @@ -163,6 +193,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// Begin network stream
this.syncStreamImplementation.triggerCrudUpload();
this.syncStreamImplementation.streamingSync(this.abortController.signal);
this.watchCrudUploads();
}

async disconnect() {
Expand All @@ -182,9 +213,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

// TODO DB name, verify this is necessary with extension
await this.database.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_oplog WHERE 1');
await tx.execute('DELETE FROM ps_crud WHERE 1');
await tx.execute('DELETE FROM ps_buckets WHERE 1');
await tx.execute(`DELETE FROM ${PSInternalTable.OPLOG} WHERE 1`);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE 1`);
await tx.execute(`DELETE FROM ${PSInternalTable.BUCKETS} WHERE 1`);

const existingTableRows = await tx.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'"
Expand Down Expand Up @@ -220,12 +251,14 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async getUploadQueueStats(includeSize?: boolean): Promise<UploadQueueStats> {
return this.readTransaction(async (tx) => {
if (includeSize) {
const result = await tx.execute('SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud');
const result = await tx.execute(
`SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ${PSInternalTable.CRUD}`
);

const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0, row?.size ?? 0);
} else {
const result = await tx.execute('SELECT count(*) as count FROM ps_crud');
const result = await tx.execute(`SELECT count(*) as count FROM ${PSInternalTable.CRUD}`);
const row = result.rows.item(0);
return new UploadQueueStats(row?.count ?? 0);
}
Expand All @@ -250,9 +283,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* and a single transaction may be split over multiple batches.
*/
async getCrudBatch(limit: number): Promise<CrudBatch | null> {
const result = await this.database.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [
limit + 1
]);
const result = await this.database.execute(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT ?`,
[limit + 1]
);

const all: CrudEntry[] = result.rows?._array?.map((row) => CrudEntry.fromRow(row)) ?? [];

Expand All @@ -268,11 +302,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const last = all[all.length - 1];
return new CrudBatch(all, haveMore, async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
if (writeCheckpoint != null && (await tx.execute('SELECT 1 FROM ps_crud LIMIT 1')) == null) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [last.clientId]);
if (writeCheckpoint != null && (await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`)) == null) {
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
} else {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -295,7 +331,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async getNextCrudTransaction(): Promise<CrudTransaction> {
return await this.readTransaction(async (tx) => {
const first = await tx.execute('SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1');
const first = await tx.execute(`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`);

if (!first.rows.length) {
return null;
Expand All @@ -306,7 +342,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
if (!txId) {
all = [CrudEntry.fromRow(first.rows.item(0))];
} else {
const result = await tx.execute('SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC', [txId]);
const result = await tx.execute(
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC`,
[txId]
);
all = result.rows._array.map((row) => CrudEntry.fromRow(row));
}

Expand All @@ -316,14 +355,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
all,
async (writeCheckpoint?: string) => {
await this.writeTransaction(async (tx) => {
await tx.execute('DELETE FROM ps_crud WHERE id <= ?', [last.clientId]);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [last.clientId]);
if (writeCheckpoint) {
const check = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
const check = await tx.execute(`SELECT 1 FROM ${PSInternalTable.CRUD} LIMIT 1`);
if (!check.rows?.length) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
writeCheckpoint
]);
}
} else {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [
await tx.execute(`UPDATE ${PSInternalTable.BUCKETS} SET target_op = ? WHERE name='$local'`, [
this.bucketStorageAdapter.getMaxOpId()
]);
}
Expand All @@ -339,9 +380,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*/
async execute(sql: string, parameters?: any[]) {
await this.waitForReady();
const result = await this.database.execute(sql, parameters);
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return result;
return this.database.execute(sql, parameters);
}

/**
Expand Down Expand Up @@ -386,7 +425,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
await this.waitForReady();
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => {
const res = await callback(this.database);
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return res;
});
}
Expand Down Expand Up @@ -415,7 +453,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
async (tx) => {
const res = await callback(tx);
await tx.commit();
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
return res;
},
{ timeoutMs: lockTimeout }
Expand Down Expand Up @@ -475,10 +512,13 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const dispose = this.database.registerListener({
tablesUpdated: async (update) => {
const { table } = update;
if (!table.match(POWERSYNC_TABLE_MATCH)) {
const { rawTableNames } = options;

if (!rawTableNames && !table.match(POWERSYNC_TABLE_MATCH)) {
return;
}
const tableName = table.replace(POWERSYNC_TABLE_MATCH, '');

const tableName = rawTableNames ? table : table.replace(POWERSYNC_TABLE_MATCH, '');
throttledTableUpdates.push(tableName);

flushTableUpdates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { DBAdapter } from '../db/DBAdapter';
import { Schema } from '../db/schema/Schema';
import { AbstractPowerSyncDatabase, PowerSyncDatabaseOptions } from './AbstractPowerSyncDatabase';

export interface PowerSyncOpenFactoryOptions {
export interface PowerSyncOpenFactoryOptions extends Partial<PowerSyncDatabaseOptions> {
schema: Schema;
/**
* Filename for the database.
Expand All @@ -26,7 +26,8 @@ export abstract class AbstractPowerSyncDatabaseOpenFactory {
generateOptions(): PowerSyncDatabaseOptions {
return {
database: this.openDB(),
schema: this.schema
schema: this.schema,
...this.options
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export interface BucketChecksum {
count: number;
}

export enum PSInternalTable {
DATA = 'ps_data',
CRUD = 'ps_crud',
BUCKETS = 'ps_buckets',
OPLOG = 'ps_oplog'
}

export interface BucketStorageAdapter {
init(): Promise<void>;
saveSyncData(batch: SyncDataBatch): Promise<void>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,27 @@ export interface AbstractStreamingSyncImplementationOptions {
uploadCrud: () => Promise<void>;
logger?: ILogger;
retryDelayMs?: number;
crudUploadThrottleMs?: number;
}

export interface StreamingSyncImplementationListener extends BaseListener {
statusChanged?: (status: SyncStatus) => void;
}

export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;

export const DEFAULT_STREAMING_SYNC_OPTIONS = {
retryDelayMs: 5000,
logger: Logger.get('PowerSyncStream')
logger: Logger.get('PowerSyncStream'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

const CRUD_UPLOAD_DEBOUNCE_MS = 1000;

export abstract class AbstractStreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener> {
protected _lastSyncedAt: Date | null;
protected options: AbstractStreamingSyncImplementationOptions;

syncStatus: SyncStatus;
triggerCrudUpload: () => void;

constructor(options: AbstractStreamingSyncImplementationOptions) {
super();
Expand All @@ -67,6 +70,17 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
downloading: false
}
});

this.triggerCrudUpload = _.throttle(
() => {
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
return;
}
this._uploadAllCrud();
},
this.options.crudUploadThrottleMs,
{ trailing: true }
);
}

get lastSyncedAt() {
Expand All @@ -88,17 +102,6 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver<S
return this.options.adapter.hasCompletedSync();
}

triggerCrudUpload = _.debounce(
() => {
if (!this.syncStatus.connected || this.syncStatus.dataFlowStatus.uploading) {
return;
}
this._uploadAllCrud();
},
CRUD_UPLOAD_DEBOUNCE_MS,
{ trailing: true }
);

protected async _uploadAllCrud(): Promise<void> {
return this.obtainLock({
type: LockType.CRUD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
await this.waitForReady();
await connector.uploadData(this);
},
retryDelayMs: this.options.retryDelay
retryDelayMs: this.options.retryDelay,
crudUploadThrottleMs: this.options.crudUploadThrottleMs
});
}
}
Loading

0 comments on commit 8298b84

Please sign in to comment.