Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add warning if crud transactions are not completed #254

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/mean-carrots-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/common': minor
'@powersync/web': minor
'@powersync/react-native': minor
---

Added a warning if connector `uploadData` functions don't process CRUD items completely.
12 changes: 6 additions & 6 deletions demos/react-native-supabase-todolist/ios/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PODS:
- ExpoModulesCore
- ExpoKeepAwake (13.0.2):
- ExpoModulesCore
- ExpoModulesCore (1.12.13):
- ExpoModulesCore (1.12.19):
- DoubleConversion
- glog
- hermes-engine
Expand Down Expand Up @@ -1340,15 +1340,15 @@ DEPENDENCIES:
- boost (from `../../../node_modules/react-native/third-party-podspecs/boost.podspec`)
- DoubleConversion (from `../../../node_modules/react-native/third-party-podspecs/DoubleConversion.podspec`)
- EXConstants (from `../../../node_modules/expo-constants/ios`)
- Expo (from `../node_modules/expo`)
- Expo (from `../../../node_modules/expo`)
- ExpoAsset (from `../../../node_modules/expo-asset/ios`)
- ExpoCamera (from `../../../node_modules/expo-camera/ios`)
- ExpoCrypto (from `../../../node_modules/expo-crypto/ios`)
- ExpoFileSystem (from `../../../node_modules/expo-file-system/ios`)
- ExpoFont (from `../../../node_modules/expo-font/ios`)
- ExpoHead (from `../../../node_modules/expo-router/ios`)
- ExpoKeepAwake (from `../../../node_modules/expo-keep-awake/ios`)
- ExpoModulesCore (from `../node_modules/expo-modules-core`)
- ExpoModulesCore (from `../../../node_modules/expo-modules-core`)
- ExpoSecureStore (from `../../../node_modules/expo-secure-store/ios`)
- EXSplashScreen (from `../../../node_modules/expo-splash-screen/ios`)
- FBLazyVector (from `../../../node_modules/react-native/Libraries/FBLazyVector`)
Expand Down Expand Up @@ -1428,7 +1428,7 @@ EXTERNAL SOURCES:
EXConstants:
:path: "../../../node_modules/expo-constants/ios"
Expo:
:path: "../node_modules/expo"
:path: "../../../node_modules/expo"
ExpoAsset:
:path: "../../../node_modules/expo-asset/ios"
ExpoCamera:
Expand All @@ -1444,7 +1444,7 @@ EXTERNAL SOURCES:
ExpoKeepAwake:
:path: "../../../node_modules/expo-keep-awake/ios"
ExpoModulesCore:
:path: "../node_modules/expo-modules-core"
:path: "../../../node_modules/expo-modules-core"
ExpoSecureStore:
:path: "../../../node_modules/expo-secure-store/ios"
EXSplashScreen:
Expand Down Expand Up @@ -1583,7 +1583,7 @@ SPEC CHECKSUMS:
ExpoFont: e7f2275c10ca8573c991e007329ad6bf98086485
ExpoHead: 8eb4deb289c2fdd8bb624f996cd31414cd07f38a
ExpoKeepAwake: 3b8815d9dd1d419ee474df004021c69fdd316d08
ExpoModulesCore: a4b45b5f081f5fe9b8e87667906d180cd52f32d7
ExpoModulesCore: 734c1802786b23c9598f4d15273753a779969368
ExpoSecureStore: 060cebcb956b80ddae09821610ac1aa9e1ac74cd
EXSplashScreen: fbf0ec78e9cee911df188bf17b4fe51d15a84b87
FBLazyVector: 898d14d17bf19e2435cafd9ea2a1033efe445709
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OpId } from './CrudEntry';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
import { CrudBatch } from './CrudBatch';
import { CrudEntry, OpId } from './CrudEntry';
import { SyncDataBatch } from './SyncDataBatch';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';

export interface Checkpoint {
last_op_id: OpId;
Expand Down Expand Up @@ -62,6 +62,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener

syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;

nextCrudItem(): Promise<CrudEntry | undefined>;
hasCrud(): Promise<boolean>;
getCrudBatch(limit?: number): Promise<CrudBatch | null>;

Expand Down
43 changes: 26 additions & 17 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Mutex } from 'async-mutex';
import Logger, { ILogger } from 'js-logger';
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter';
import { BaseObserver } from '../../../utils/BaseObserver';
import {
BucketState,
BucketStorageAdapter,
Expand All @@ -8,12 +10,10 @@ import {
PSInternalTable,
SyncLocalDatabaseResult
} from './BucketStorageAdapter';
import { OpTypeEnum } from './OpType';
import { CrudBatch } from './CrudBatch';
import { CrudEntry } from './CrudEntry';
import { CrudEntry, CrudEntryJSON } from './CrudEntry';
import { OpTypeEnum } from './OpType';
import { SyncDataBatch } from './SyncDataBatch';
import Logger, { ILogger } from 'js-logger';
import { BaseObserver } from '../../../utils/BaseObserver';

const COMPACT_OPERATION_INTERVAL = 1_000;

Expand Down Expand Up @@ -51,10 +51,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp

async init() {
this._hasCompletedSync = false;
const existingTableRows = await this.db.execute(
const existingTableRows = await this.db.getAll<{ name: string }>(
`SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'`
);
for (const row of existingTableRows.rows?._array ?? []) {
for (const row of existingTableRows ?? []) {
this.tableNames.add(row.name);
}
}
Expand All @@ -72,10 +72,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
startSession(): void {}

async getBucketStates(): Promise<BucketState[]> {
const result = await this.db.execute(
const result = await this.db.getAll<BucketState>(
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0'
);
return result.rows?._array ?? [];
return result;
}

async saveSyncData(batch: SyncDataBatch) {
Expand Down Expand Up @@ -258,19 +258,20 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
}

async updateLocalTarget(cb: () => Promise<string>): Promise<boolean> {
const rs1 = await this.db.execute("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
const rs1 = await this.db.getAll("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
SqliteBucketStorage.MAX_OP_ID
]);
if (!rs1.rows?.length) {
if (!rs1.length) {
// Nothing to update
return false;
}
const rs = await this.db.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.rows?.length) {
const rs = await this.db.getAll<{ seq: number }>("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.length) {
// Nothing to update
return false;
}
const seqBefore: number = rs.rows?.item(0)['seq'];

const seqBefore: number = rs[0]['seq'];

const opId = await cb();

Expand Down Expand Up @@ -304,9 +305,17 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
});
}

async nextCrudItem(): Promise<CrudEntry | undefined> {
const next = await this.db.getOptional<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
DominicGBauer marked this conversation as resolved.
Show resolved Hide resolved
if (!next) {
return;
}
return CrudEntry.fromRow(next);
}

async hasCrud(): Promise<boolean> {
const anyData = await this.db.execute('SELECT 1 FROM ps_crud LIMIT 1');
return !!anyData.rows?.length;
const anyData = await this.db.getOptional('SELECT 1 FROM ps_crud LIMIT 1');
return !!anyData;
}

/**
Expand All @@ -318,10 +327,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return null;
}

const crudResult = await this.db.execute('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
const crudResult = await this.db.getAll<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
DominicGBauer marked this conversation as resolved.
Show resolved Hide resolved

const all: CrudEntry[] = [];
for (const row of crudResult.rows?._array ?? []) {
for (const row of crudResult) {
all.push(CrudEntry.fromRow(row));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import throttle from 'lodash/throttle';

import Logger, { ILogger } from 'js-logger';

import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { AbortOperation } from '../../../utils/AbortOperation';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { CrudEntry } from '../bucket/CrudEntry';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
import {
BucketRequest,
StreamingSyncRequestParameterType,
Expand All @@ -11,12 +18,6 @@ import {
isStreamingSyncCheckpointDiff,
isStreamingSyncData
} from './streaming-sync-types';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver';
import { AbortOperation } from '../../../utils/AbortOperation';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -215,18 +216,40 @@ export abstract class AbstractStreamingSyncImplementation
return this.obtainLock({
type: LockType.CRUD,
callback: async () => {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
/**
* Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
*/
let checkedCrudItem: CrudEntry | undefined;

while (true) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
try {
const done = await this.uploadCrudBatch();
if (done) {
/**
* This is the first item in the FIFO CRUD queue.
*/
const nextCrudItem = await this.options.adapter.nextCrudItem();
if (nextCrudItem) {
if (nextCrudItem.id == checkedCrudItem?.id) {
// This will force a higher log level than exceptions which are caught here.
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
The next upload iteration will be delayed.`);
throw new Error('Delaying due to previously encountered CRUD item.');
}

checkedCrudItem = nextCrudItem;
await this.options.uploadCrud();
} else {
// Uploading is completed
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
break;
}
} catch (ex) {
checkedCrudItem = undefined;
this.updateSyncStatus({
dataFlow: {
uploading: false
Expand All @@ -252,17 +275,6 @@ export abstract class AbstractStreamingSyncImplementation
});
}

protected async uploadCrudBatch(): Promise<boolean> {
const hasCrud = await this.options.adapter.hasCrud();
if (hasCrud) {
await this.options.uploadCrud();
return false;
} else {
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
return true;
}
}

async connect(options?: PowerSyncConnectionOptions) {
if (this.abortController) {
await this.disconnect();
Expand Down
37 changes: 25 additions & 12 deletions packages/web/tests/multiple_instances.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AbstractPowerSyncDatabase, SqliteBucketStorage, SyncStatus } from '@powersync/common';
import {
PowerSyncDatabase,
SharedWebStreamingSyncImplementation,
WebRemote,
WebStreamingSyncImplementationOptions
} from '@powersync/web';
import { testSchema } from './utils/testDb';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { Mutex } from 'async-mutex';
import Logger from 'js-logger';
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { testSchema } from './utils/testDb';

describe('Multiple Instances', () => {
const dbFilename = 'test-multiple-instances.db';
Expand All @@ -23,6 +23,8 @@ describe('Multiple Instances', () => {
schema: testSchema
});

beforeAll(() => Logger.useDefaults());

beforeEach(() => {
db = openDatabase();
});
Expand Down Expand Up @@ -184,34 +186,44 @@ describe('Multiple Instances', () => {
});

// Create the first streaming client
const syncOptions1: WebStreamingSyncImplementationOptions = {
const stream1 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload1();
connector1.uploadData(db);
},
identifier
};
const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1);
identifier,
retryDelayMs: 100,
flags: {
broadcastLogs: true
}
});

// Generate the second streaming sync implementation
const connector2 = new TestConnector();
const spy2 = vi.spyOn(connector2, 'uploadData');
// The second connector will be called first to upload, we don't want it to actually upload
stevensJourney marked this conversation as resolved.
Show resolved Hide resolved
// This will cause the sync uploads to be delayed as the CRUD queue did not change
const spy2 = vi.spyOn(connector2, 'uploadData').mockImplementation(async () => {});

let triggerUpload2: () => void;
const upload2TriggeredPromise = new Promise<void>((resolve) => {
triggerUpload2 = resolve;
});
const syncOptions2: WebStreamingSyncImplementationOptions = {

const stream2 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload2();
connector2.uploadData(db);
},
identifier
};
const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2);
identifier,
retryDelayMs: 100,
stevensJourney marked this conversation as resolved.
Show resolved Hide resolved
flags: {
broadcastLogs: true
}
});

// Waits for the stream to be marked as connected
const stream2UpdatedPromise = new Promise<void>((resolve, reject) => {
Expand All @@ -230,6 +242,7 @@ describe('Multiple Instances', () => {

// The status in the second stream client should be updated
await stream2UpdatedPromise;

expect(stream2.isConnected).true;

// Create something with CRUD in it.
Expand Down
Loading