Skip to content

Commit

Permalink
[Feature] Add warning if crud transactions are not completed (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney authored Aug 6, 2024
1 parent 8fe44fb commit 042589c
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 61 deletions.
7 changes: 7 additions & 0 deletions .changeset/mean-carrots-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/common': minor
'@powersync/web': minor
'@powersync/react-native': minor
---

Added a warning if connector `uploadData` functions don't process CRUD items completely.
12 changes: 6 additions & 6 deletions demos/react-native-supabase-todolist/ios/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PODS:
- ExpoModulesCore
- ExpoKeepAwake (13.0.2):
- ExpoModulesCore
- ExpoModulesCore (1.12.13):
- ExpoModulesCore (1.12.19):
- DoubleConversion
- glog
- hermes-engine
Expand Down Expand Up @@ -1340,15 +1340,15 @@ DEPENDENCIES:
- boost (from `../../../node_modules/react-native/third-party-podspecs/boost.podspec`)
- DoubleConversion (from `../../../node_modules/react-native/third-party-podspecs/DoubleConversion.podspec`)
- EXConstants (from `../../../node_modules/expo-constants/ios`)
- Expo (from `../node_modules/expo`)
- Expo (from `../../../node_modules/expo`)
- ExpoAsset (from `../../../node_modules/expo-asset/ios`)
- ExpoCamera (from `../../../node_modules/expo-camera/ios`)
- ExpoCrypto (from `../../../node_modules/expo-crypto/ios`)
- ExpoFileSystem (from `../../../node_modules/expo-file-system/ios`)
- ExpoFont (from `../../../node_modules/expo-font/ios`)
- ExpoHead (from `../../../node_modules/expo-router/ios`)
- ExpoKeepAwake (from `../../../node_modules/expo-keep-awake/ios`)
- ExpoModulesCore (from `../node_modules/expo-modules-core`)
- ExpoModulesCore (from `../../../node_modules/expo-modules-core`)
- ExpoSecureStore (from `../../../node_modules/expo-secure-store/ios`)
- EXSplashScreen (from `../../../node_modules/expo-splash-screen/ios`)
- FBLazyVector (from `../../../node_modules/react-native/Libraries/FBLazyVector`)
Expand Down Expand Up @@ -1428,7 +1428,7 @@ EXTERNAL SOURCES:
EXConstants:
:path: "../../../node_modules/expo-constants/ios"
Expo:
:path: "../node_modules/expo"
:path: "../../../node_modules/expo"
ExpoAsset:
:path: "../../../node_modules/expo-asset/ios"
ExpoCamera:
Expand All @@ -1444,7 +1444,7 @@ EXTERNAL SOURCES:
ExpoKeepAwake:
:path: "../../../node_modules/expo-keep-awake/ios"
ExpoModulesCore:
:path: "../node_modules/expo-modules-core"
:path: "../../../node_modules/expo-modules-core"
ExpoSecureStore:
:path: "../../../node_modules/expo-secure-store/ios"
EXSplashScreen:
Expand Down Expand Up @@ -1583,7 +1583,7 @@ SPEC CHECKSUMS:
ExpoFont: e7f2275c10ca8573c991e007329ad6bf98086485
ExpoHead: 8eb4deb289c2fdd8bb624f996cd31414cd07f38a
ExpoKeepAwake: 3b8815d9dd1d419ee474df004021c69fdd316d08
ExpoModulesCore: a4b45b5f081f5fe9b8e87667906d180cd52f32d7
ExpoModulesCore: 734c1802786b23c9598f4d15273753a779969368
ExpoSecureStore: 060cebcb956b80ddae09821610ac1aa9e1ac74cd
EXSplashScreen: fbf0ec78e9cee911df188bf17b4fe51d15a84b87
FBLazyVector: 898d14d17bf19e2435cafd9ea2a1033efe445709
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OpId } from './CrudEntry';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
import { CrudBatch } from './CrudBatch';
import { CrudEntry, OpId } from './CrudEntry';
import { SyncDataBatch } from './SyncDataBatch';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';

export interface Checkpoint {
last_op_id: OpId;
Expand Down Expand Up @@ -62,6 +62,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener

syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;

nextCrudItem(): Promise<CrudEntry | undefined>;
hasCrud(): Promise<boolean>;
getCrudBatch(limit?: number): Promise<CrudBatch | null>;

Expand Down
43 changes: 26 additions & 17 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Mutex } from 'async-mutex';
import Logger, { ILogger } from 'js-logger';
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter';
import { BaseObserver } from '../../../utils/BaseObserver';
import {
BucketState,
BucketStorageAdapter,
Expand All @@ -8,12 +10,10 @@ import {
PSInternalTable,
SyncLocalDatabaseResult
} from './BucketStorageAdapter';
import { OpTypeEnum } from './OpType';
import { CrudBatch } from './CrudBatch';
import { CrudEntry } from './CrudEntry';
import { CrudEntry, CrudEntryJSON } from './CrudEntry';
import { OpTypeEnum } from './OpType';
import { SyncDataBatch } from './SyncDataBatch';
import Logger, { ILogger } from 'js-logger';
import { BaseObserver } from '../../../utils/BaseObserver';

const COMPACT_OPERATION_INTERVAL = 1_000;

Expand Down Expand Up @@ -51,10 +51,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp

async init() {
this._hasCompletedSync = false;
const existingTableRows = await this.db.execute(
const existingTableRows = await this.db.getAll<{ name: string }>(
`SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'`
);
for (const row of existingTableRows.rows?._array ?? []) {
for (const row of existingTableRows ?? []) {
this.tableNames.add(row.name);
}
}
Expand All @@ -72,10 +72,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
startSession(): void {}

async getBucketStates(): Promise<BucketState[]> {
const result = await this.db.execute(
const result = await this.db.getAll<BucketState>(
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0'
);
return result.rows?._array ?? [];
return result;
}

async saveSyncData(batch: SyncDataBatch) {
Expand Down Expand Up @@ -258,19 +258,20 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
}

async updateLocalTarget(cb: () => Promise<string>): Promise<boolean> {
const rs1 = await this.db.execute("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
const rs1 = await this.db.getAll("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
SqliteBucketStorage.MAX_OP_ID
]);
if (!rs1.rows?.length) {
if (!rs1.length) {
// Nothing to update
return false;
}
const rs = await this.db.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.rows?.length) {
const rs = await this.db.getAll<{ seq: number }>("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.length) {
// Nothing to update
return false;
}
const seqBefore: number = rs.rows?.item(0)['seq'];

const seqBefore: number = rs[0]['seq'];

const opId = await cb();

Expand Down Expand Up @@ -304,9 +305,17 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
});
}

async nextCrudItem(): Promise<CrudEntry | undefined> {
const next = await this.db.getOptional<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
if (!next) {
return;
}
return CrudEntry.fromRow(next);
}

async hasCrud(): Promise<boolean> {
const anyData = await this.db.execute('SELECT 1 FROM ps_crud LIMIT 1');
return !!anyData.rows?.length;
const anyData = await this.db.getOptional('SELECT 1 FROM ps_crud LIMIT 1');
return !!anyData;
}

/**
Expand All @@ -318,10 +327,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return null;
}

const crudResult = await this.db.execute('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
const crudResult = await this.db.getAll<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);

const all: CrudEntry[] = [];
for (const row of crudResult.rows?._array ?? []) {
for (const row of crudResult) {
all.push(CrudEntry.fromRow(row));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import throttle from 'lodash/throttle';

import Logger, { ILogger } from 'js-logger';

import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { AbortOperation } from '../../../utils/AbortOperation';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { CrudEntry } from '../bucket/CrudEntry';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
import {
BucketRequest,
StreamingSyncRequestParameterType,
Expand All @@ -11,12 +18,6 @@ import {
isStreamingSyncCheckpointDiff,
isStreamingSyncData
} from './streaming-sync-types';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver';
import { AbortOperation } from '../../../utils/AbortOperation';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -215,18 +216,40 @@ export abstract class AbstractStreamingSyncImplementation
return this.obtainLock({
type: LockType.CRUD,
callback: async () => {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
/**
* Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
*/
let checkedCrudItem: CrudEntry | undefined;

while (true) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
try {
const done = await this.uploadCrudBatch();
if (done) {
/**
* This is the first item in the FIFO CRUD queue.
*/
const nextCrudItem = await this.options.adapter.nextCrudItem();
if (nextCrudItem) {
if (nextCrudItem.id == checkedCrudItem?.id) {
// This will force a higher log level than exceptions which are caught here.
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
The next upload iteration will be delayed.`);
throw new Error('Delaying due to previously encountered CRUD item.');
}

checkedCrudItem = nextCrudItem;
await this.options.uploadCrud();
} else {
// Uploading is completed
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
break;
}
} catch (ex) {
checkedCrudItem = undefined;
this.updateSyncStatus({
dataFlow: {
uploading: false
Expand All @@ -252,17 +275,6 @@ export abstract class AbstractStreamingSyncImplementation
});
}

protected async uploadCrudBatch(): Promise<boolean> {
const hasCrud = await this.options.adapter.hasCrud();
if (hasCrud) {
await this.options.uploadCrud();
return false;
} else {
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
return true;
}
}

async connect(options?: PowerSyncConnectionOptions) {
if (this.abortController) {
await this.disconnect();
Expand Down
37 changes: 25 additions & 12 deletions packages/web/tests/multiple_instances.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AbstractPowerSyncDatabase, SqliteBucketStorage, SyncStatus } from '@powersync/common';
import {
PowerSyncDatabase,
SharedWebStreamingSyncImplementation,
WebRemote,
WebStreamingSyncImplementationOptions
} from '@powersync/web';
import { testSchema } from './utils/testDb';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { Mutex } from 'async-mutex';
import Logger from 'js-logger';
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { testSchema } from './utils/testDb';

describe('Multiple Instances', () => {
const dbFilename = 'test-multiple-instances.db';
Expand All @@ -23,6 +23,8 @@ describe('Multiple Instances', () => {
schema: testSchema
});

beforeAll(() => Logger.useDefaults());

beforeEach(() => {
db = openDatabase();
});
Expand Down Expand Up @@ -184,34 +186,44 @@ describe('Multiple Instances', () => {
});

// Create the first streaming client
const syncOptions1: WebStreamingSyncImplementationOptions = {
const stream1 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload1();
connector1.uploadData(db);
},
identifier
};
const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1);
identifier,
retryDelayMs: 100,
flags: {
broadcastLogs: true
}
});

// Generate the second streaming sync implementation
const connector2 = new TestConnector();
const spy2 = vi.spyOn(connector2, 'uploadData');
// The second connector will be called first to upload, we don't want it to actually upload
// This will cause the sync uploads to be delayed as the CRUD queue did not change
const spy2 = vi.spyOn(connector2, 'uploadData').mockImplementation(async () => {});

let triggerUpload2: () => void;
const upload2TriggeredPromise = new Promise<void>((resolve) => {
triggerUpload2 = resolve;
});
const syncOptions2: WebStreamingSyncImplementationOptions = {

const stream2 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload2();
connector2.uploadData(db);
},
identifier
};
const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2);
identifier,
retryDelayMs: 100,
flags: {
broadcastLogs: true
}
});

// Waits for the stream to be marked as connected
const stream2UpdatedPromise = new Promise<void>((resolve, reject) => {
Expand All @@ -230,6 +242,7 @@ describe('Multiple Instances', () => {

// The status in the second stream client should be updated
await stream2UpdatedPromise;

expect(stream2.isConnected).true;

// Create something with CRUD in it.
Expand Down

0 comments on commit 042589c

Please sign in to comment.