Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add warning if crud transactions are not completed #254

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-carrots-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Added a warning if connector `uploadData` functions don't process CRUD items completely.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OpId } from './CrudEntry';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
import { CrudBatch } from './CrudBatch';
import { CrudEntry, OpId } from './CrudEntry';
import { SyncDataBatch } from './SyncDataBatch';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';

export interface Checkpoint {
last_op_id: OpId;
Expand Down Expand Up @@ -62,6 +62,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener

syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;

nextCrudItem(): Promise<CrudEntry | undefined>;
hasCrud(): Promise<boolean>;
getCrudBatch(limit?: number): Promise<CrudBatch | null>;

Expand Down
43 changes: 26 additions & 17 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Mutex } from 'async-mutex';
import Logger, { ILogger } from 'js-logger';
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter';
import { BaseObserver } from '../../../utils/BaseObserver';
import {
BucketState,
BucketStorageAdapter,
Expand All @@ -8,12 +10,10 @@ import {
PSInternalTable,
SyncLocalDatabaseResult
} from './BucketStorageAdapter';
import { OpTypeEnum } from './OpType';
import { CrudBatch } from './CrudBatch';
import { CrudEntry } from './CrudEntry';
import { CrudEntry, CrudEntryJSON } from './CrudEntry';
import { OpTypeEnum } from './OpType';
import { SyncDataBatch } from './SyncDataBatch';
import Logger, { ILogger } from 'js-logger';
import { BaseObserver } from '../../../utils/BaseObserver';

const COMPACT_OPERATION_INTERVAL = 1_000;

Expand Down Expand Up @@ -51,10 +51,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp

async init() {
this._hasCompletedSync = false;
const existingTableRows = await this.db.execute(
const existingTableRows = await this.db.getAll<{ name: string }>(
`SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'`
);
for (const row of existingTableRows.rows?._array ?? []) {
for (const row of existingTableRows ?? []) {
this.tableNames.add(row.name);
}
}
Expand All @@ -72,10 +72,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
startSession(): void {}

async getBucketStates(): Promise<BucketState[]> {
const result = await this.db.execute(
const result = await this.db.getAll<BucketState>(
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0'
);
return result.rows?._array ?? [];
return result;
}

async saveSyncData(batch: SyncDataBatch) {
Expand Down Expand Up @@ -258,19 +258,20 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
}

async updateLocalTarget(cb: () => Promise<string>): Promise<boolean> {
const rs1 = await this.db.execute("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
const rs1 = await this.db.getAll("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
SqliteBucketStorage.MAX_OP_ID
]);
if (!rs1.rows?.length) {
if (!rs1.length) {
// Nothing to update
return false;
}
const rs = await this.db.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.rows?.length) {
const rs = await this.db.getAll<{ seq: number }>("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
if (!rs.length) {
// Nothing to update
return false;
}
const seqBefore: number = rs.rows?.item(0)['seq'];

const seqBefore: number = rs[0]['seq'];

const opId = await cb();

Expand Down Expand Up @@ -304,9 +305,17 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
});
}

async nextCrudItem(): Promise<CrudEntry | undefined> {
const next = await this.db.getOptional<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
DominicGBauer marked this conversation as resolved.
Show resolved Hide resolved
if (!next) {
return;
}
return CrudEntry.fromRow(next);
}

async hasCrud(): Promise<boolean> {
const anyData = await this.db.execute('SELECT 1 FROM ps_crud LIMIT 1');
return !!anyData.rows?.length;
const anyData = await this.db.getOptional('SELECT 1 FROM ps_crud LIMIT 1');
return !!anyData;
}

/**
Expand All @@ -318,10 +327,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return null;
}

const crudResult = await this.db.execute('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
const crudResult = await this.db.getAll<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
DominicGBauer marked this conversation as resolved.
Show resolved Hide resolved

const all: CrudEntry[] = [];
for (const row of crudResult.rows?._array ?? []) {
for (const row of crudResult) {
all.push(CrudEntry.fromRow(row));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import throttle from 'lodash/throttle';

import Logger, { ILogger } from 'js-logger';

import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { AbortOperation } from '../../../utils/AbortOperation';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { CrudEntry } from '../bucket/CrudEntry';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
import {
BucketRequest,
StreamingSyncRequestParameterType,
Expand All @@ -11,12 +18,6 @@ import {
isStreamingSyncCheckpointDiff,
isStreamingSyncData
} from './streaming-sync-types';
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver';
import { AbortOperation } from '../../../utils/AbortOperation';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -215,18 +216,40 @@ export abstract class AbstractStreamingSyncImplementation
return this.obtainLock({
type: LockType.CRUD,
callback: async () => {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
/**
* Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
*/
let checkedCrudItem: CrudEntry | undefined;

while (true) {
this.updateSyncStatus({
dataFlow: {
uploading: true
}
});
try {
const done = await this.uploadCrudBatch();
if (done) {
/**
* This is the first item in the FIFO CRUD queue.
*/
const nextCrudItem = await this.options.adapter.nextCrudItem();
if (nextCrudItem) {
if (nextCrudItem.id == checkedCrudItem?.id) {
// This will force a higher log level than exceptions which are caught here.
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
The next upload iteration will be delayed.`);
throw new Error('Delaying due to previously encountered CRUD item.');
}

checkedCrudItem = nextCrudItem;
await this.options.uploadCrud();
} else {
// Uploading is completed
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
break;
}
} catch (ex) {
checkedCrudItem = undefined;
this.updateSyncStatus({
dataFlow: {
uploading: false
Expand All @@ -252,17 +275,6 @@ export abstract class AbstractStreamingSyncImplementation
});
}

protected async uploadCrudBatch(): Promise<boolean> {
const hasCrud = await this.options.adapter.hasCrud();
if (hasCrud) {
await this.options.uploadCrud();
return false;
} else {
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
return true;
}
}

async connect(options?: PowerSyncConnectionOptions) {
if (this.abortController) {
await this.disconnect();
Expand Down
40 changes: 28 additions & 12 deletions packages/web/tests/multiple_instances.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AbstractPowerSyncDatabase, SqliteBucketStorage, SyncStatus } from '@powersync/common';
import {
PowerSyncDatabase,
SharedWebStreamingSyncImplementation,
WebRemote,
WebStreamingSyncImplementationOptions
} from '@powersync/web';
import { testSchema } from './utils/testDb';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { Mutex } from 'async-mutex';
import Logger from 'js-logger';
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { TestConnector } from './utils/MockStreamOpenFactory';
import { testSchema } from './utils/testDb';

describe('Multiple Instances', () => {
const dbFilename = 'test-multiple-instances.db';
Expand All @@ -23,6 +23,8 @@ describe('Multiple Instances', () => {
schema: testSchema
});

beforeAll(() => Logger.useDefaults());

beforeEach(() => {
db = openDatabase();
});
Expand Down Expand Up @@ -184,34 +186,44 @@ describe('Multiple Instances', () => {
});

// Create the first streaming client
const syncOptions1: WebStreamingSyncImplementationOptions = {
const stream1 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload1();
connector1.uploadData(db);
},
identifier
};
const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1);
identifier,
retryDelayMs: 100,
flags: {
broadcastLogs: true
}
});

// Generate the second streaming sync implementation
const connector2 = new TestConnector();
const spy2 = vi.spyOn(connector2, 'uploadData');
// The second connector will be called first to upload, we don't want it to actually upload
// This will cause the sync uploads to be delayed as the CRUD queue did not change
stevensJourney marked this conversation as resolved.
Show resolved Hide resolved
const spy2 = vi.spyOn(connector2, 'uploadData').mockImplementation(async () => {});

let triggerUpload2: () => void;
const upload2TriggeredPromise = new Promise<void>((resolve) => {
triggerUpload2 = resolve;
});
const syncOptions2: WebStreamingSyncImplementationOptions = {

const stream2 = new SharedWebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(db.database, new Mutex()),
remote: new WebRemote(connector1),
uploadCrud: async () => {
triggerUpload2();
connector2.uploadData(db);
},
identifier
};
const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2);
identifier,
retryDelayMs: 100,
flags: {
stevensJourney marked this conversation as resolved.
Show resolved Hide resolved
broadcastLogs: true
}
});

// Waits for the stream to be marked as connected
const stream2UpdatedPromise = new Promise<void>((resolve, reject) => {
Expand All @@ -230,6 +242,8 @@ describe('Multiple Instances', () => {

// The status in the second stream client should be updated
await stream2UpdatedPromise;

console.log('stream 2 status updated');
expect(stream2.isConnected).true;

// Create something with CRUD in it.
Expand All @@ -243,6 +257,8 @@ describe('Multiple Instances', () => {
// The second connector should be called to upload
await upload2TriggeredPromise;

console.log('2 upload was triggered');

// It should call the latest connected client
expect(spy2).toHaveBeenCalledOnce();

Expand Down
Loading