Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add warning if crud transactions are not completed #172

Merged
merged 7 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/powersync/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ class BucketStorage {
});
}

Future<CrudEntry?> nextCrudItem() async {
var next = await _internalDb
.getOptional('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
return next == null ? null : CrudEntry.fromRow(next);
}

Future<bool> hasCrud() async {
final anyData = await select('SELECT 1 FROM ps_crud LIMIT 1');
return anyData.isNotEmpty;
Expand Down
77 changes: 47 additions & 30 deletions packages/powersync/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import 'package:sqlite_async/mutex.dart';

import 'bucket_storage.dart';
import 'connector.dart';
import 'crud.dart';
import 'stream_utils.dart';
import 'sync_status.dart';
import 'sync_types.dart';
Expand Down Expand Up @@ -102,6 +103,10 @@ class StreamingSyncImplementation {
return _abort?.aborted ?? false;
}

bool get isConnected {
return lastStatus.connected;
}

Future<void> streamingSync() async {
try {
_abort = AbortController();
Expand Down Expand Up @@ -159,40 +164,52 @@ class StreamingSyncImplementation {
}

Future<void> uploadAllCrud() async {
while (true) {
try {
bool done = await uploadCrudBatch();
_updateStatus(uploadError: _noError);
if (done) {
break;
}
} catch (e, stacktrace) {
isolateLogger.warning('Data upload error', e, stacktrace);
_updateStatus(uploading: false, uploadError: e);
await Future.delayed(retryDelay);
}
}
_updateStatus(uploading: false);
}

Future<bool> uploadCrudBatch() async {
return crudMutex.lock(() async {
if ((await adapter.hasCrud())) {
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
CrudEntry? checkedCrudItem;

while (true) {
_updateStatus(uploading: true);
mugikhan marked this conversation as resolved.
Show resolved Hide resolved
await uploadCrud();
return false;
} else {
// This isolate is the only one triggering
final updated = await adapter.updateLocalTarget(() async {
return getWriteCheckpoint();
});
if (updated) {
_localPingController.add(null);
try {
// This is the first item in the FIFO CRUD queue.
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
if (nextCrudItem != null) {
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
// This will force a higher log level than exceptions which are caught here.
isolateLogger.warning(
"""Potentially previously uploaded CRUD entries are still present in the upload queue.
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
The next upload iteration will be delayed.""");
throw Exception(
'Delaying due to previously encountered CRUD item.');
}

checkedCrudItem = nextCrudItem;
await uploadCrud();
_updateStatus(uploadError: _noError);
} else {
// Uploading is completed
await adapter.updateLocalTarget(() => getWriteCheckpoint());
break;
}
} catch (e, stacktrace) {
checkedCrudItem = null;
isolateLogger.warning('Data upload error', e, stacktrace);
_updateStatus(uploading: false, uploadError: e);
await Future.delayed(retryDelay);
if (!isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
}
isolateLogger.warning(
"Caught exception when uploading. Upload will retry after a delay",
e,
stacktrace);
} finally {
_updateStatus(uploading: false);
}

return true;
}
}, timeout: retryDelay);
});
}

Future<String> getWriteCheckpoint() async {
Expand Down
91 changes: 91 additions & 0 deletions packages/powersync/test/upload_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
@TestOn('!browser')

import 'package:powersync/powersync.dart';
import 'package:test/test.dart';

import 'test_server.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();
const testId = "2290de4f-0488-4e50-abed-f8e8eb1d0b42";
const testId2 = "2290de4f-0488-4e50-abed-f8e8eb1d0b43";
const partialWarning =
'Potentially previously uploaded CRUD entries are still present';

class TestConnector extends PowerSyncBackendConnector {
final Function _fetchCredentials;
final Future<void> Function(PowerSyncDatabase database) _uploadData;

TestConnector(this._fetchCredentials, this._uploadData);

@override
Future<PowerSyncCredentials?> fetchCredentials() {
return _fetchCredentials();
}

@override
Future<void> uploadData(PowerSyncDatabase database) async {
return _uploadData(database);
}
}

void main() {
group('CRUD Tests', () {
late PowerSyncDatabase powersync;
late String path;

setUp(() async {
path = testUtils.dbPath();
await testUtils.cleanDb(path: path);
});

tearDown(() async {
// await powersync.disconnectAndClear();
await powersync.close();
});

test('should warn for missing upload operations in uploadData', () async {
var server = await createServer();

credentialsCallback() async {
return PowerSyncCredentials(
endpoint: server.endpoint,
token: 'token',
userId: 'userId',
);
}

uploadData(PowerSyncDatabase db) async {
// Do nothing
}

final records = <String>[];
final sub =
testWarningLogger.onRecord.listen((log) => records.add(log.message));

powersync =
await testUtils.setupPowerSync(path: path, logger: testWarningLogger);
powersync.retryDelay = Duration(milliseconds: 0);
var connector = TestConnector(credentialsCallback, uploadData);
powersync.connect(connector: connector);

// Create something with CRUD in it.
await powersync.execute(
'INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);

// Wait for the uploadData to be called.
await Future.delayed(Duration(milliseconds: 100));

// Create something else with CRUD in it.
await powersync.execute(
'INSERT INTO assets(id, description) VALUES(?, ?)',
[testId2, 'test2']);

sub.cancel();

expect(records, hasLength(2));
expect(records, anyElement(contains(partialWarning)));
});
});
}
10 changes: 6 additions & 4 deletions packages/powersync/test/utils/abstract_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ const defaultSchema = schema;

final testLogger = _makeTestLogger();

Logger _makeTestLogger() {
final testWarningLogger = _makeTestLogger(level: Level.WARNING);

Logger _makeTestLogger({Level level = Level.ALL}) {
final logger = Logger.detached('PowerSync Tests');
logger.level = Level.ALL;
logger.level = level;
logger.onRecord.listen((record) {
print(
'[${record.loggerName}] ${record.level.name}: ${record.time}: ${record.message}');
Expand Down Expand Up @@ -70,9 +72,9 @@ abstract class AbstractTestUtils {

/// Creates a SqliteDatabaseConnection
Future<PowerSyncDatabase> setupPowerSync(
{String? path, Schema? schema}) async {
{String? path, Schema? schema, Logger? logger}) async {
final db = PowerSyncDatabase.withFactory(await testFactory(path: path),
schema: schema ?? defaultSchema, logger: testLogger);
schema: schema ?? defaultSchema, logger: logger ?? testLogger);
await db.initialize();
return db;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/powersync/test/utils/web_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:html';

import 'package:js/js.dart';
import 'package:logging/logging.dart';
import 'package:powersync/powersync.dart';
import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/sqlite_async.dart';
Expand Down Expand Up @@ -51,7 +52,7 @@ class TestUtils extends AbstractTestUtils {

@override
Future<PowerSyncDatabase> setupPowerSync(
{String? path, Schema? schema}) async {
{String? path, Schema? schema, Logger? logger}) async {
await _isInitialized;
return super.setupPowerSync(path: path, schema: schema);
}
Expand Down
Loading