From 8ff6325084d3f0752a5ba1ae163725683bbf983c Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 20 Nov 2023 13:44:07 -0700 Subject: [PATCH] better locks for CRUD uploads --- .changeset/dull-radios-relate.md | 5 +++ .../AbstractStreamingSyncImplementation.ts | 31 +++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 .changeset/dull-radios-relate.md diff --git a/.changeset/dull-radios-relate.md b/.changeset/dull-radios-relate.md new file mode 100644 index 00000000..5e726e90 --- /dev/null +++ b/.changeset/dull-radios-relate.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-common': patch +--- + +Added better locking for CRUD uploads diff --git a/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index d0ec1672..6dc1d9c8 100644 --- a/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -89,21 +89,26 @@ export abstract class AbstractStreamingSyncImplementation extends BaseObserver { - this.isUploadingCrud = true; - while (true) { - try { - const done = await this.uploadCrudBatch(); - if (done) { - this.isUploadingCrud = false; - break; + return this.obtainLock({ + type: LockType.CRUD, + callback: async () => { + this.isUploadingCrud = true; + while (true) { + try { + const done = await this.uploadCrudBatch(); + if (done) { + this.isUploadingCrud = false; + break; + } + } catch (ex) { + this.updateSyncStatus(false); + await this.delayRetry(); + this.isUploadingCrud = false; + break; + } } - } catch (ex) { - this.updateSyncStatus(false); - await this.delayRetry(); - this.isUploadingCrud = false; - break; } - } + }); } protected async uploadCrudBatch(): Promise {