Skip to content

Commit

Permalink
added waitForReady method
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Nov 6, 2023
1 parent 1391a69 commit e2fab7f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 28 deletions.
4 changes: 2 additions & 2 deletions .changeset/five-turtles-dream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
'@journeyapps/powersync-sdk-common': patch
---

- Removed `user-id` header from backend connector and remote headers
- Mark PowerSync client as pending initialization before `init` has been called.
- Removed `user-id` header from backend connector and remote headers.
- Added `waitForReady` method on PowerSyncDatabase client which resolves once initialization is complete.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected static transactionMutex: Mutex = new Mutex();

closed: boolean;
ready: boolean;

currentStatus?: SyncStatus;
syncStreamImplementation?: AbstractStreamingSyncImplementation;
Expand All @@ -71,28 +72,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
private abortController: AbortController | null;
protected bucketStorageAdapter: BucketStorageAdapter;
private syncStatusListenerDisposer?: () => void;
protected initialized: Promise<void>;
protected _isReadyPromise: Promise<void> | null;

constructor(protected options: PowerSyncDatabaseOptions) {
super();
this.currentStatus = null;
this.closed = true;
this.ready = false;
this.options = { ...DEFAULT_POWERSYNC_DB_OPTIONS, ...options };
this.bucketStorageAdapter = this.generateBucketStorageAdapter();
this.sdkVersion = '';
/**
* This will resolve once the DB has been initialized
* This allows methods such as `execute` to await initialization before
* executing statements
*/
this.initialized = new Promise((resolve) => {
const l = this.registerListener({
initialized: () => {
resolve();
l?.();
}
});
});
}

get schema() {
Expand All @@ -113,6 +102,28 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;

/**
* @returns A promise which will resolve once initialization is completed.
*/
async waitForReady(): Promise<void> {
if (this.ready) {
return;
}

return (
this._isReadyPromise ||
(this._isReadyPromise = new Promise((resolve) => {
const l = this.registerListener({
initialized: () => {
this.ready = true;
resolve();
l?.();
}
});
}))
);
}

abstract _init(): Promise<void>;

/**
Expand All @@ -134,7 +145,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// close connection if one is open
await this.disconnect();

await this.initialized;
await this.waitForReady();
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
Expand Down Expand Up @@ -192,7 +203,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* must be constructed.
*/
async close() {
await this.initialized;
await this.waitForReady();

await this.disconnect();
this.database.close();
Expand Down Expand Up @@ -322,31 +333,31 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a statement and optionally return results
*/
async execute(sql: string, parameters?: any[]) {
await this.initialized;
await this.waitForReady();
return this.database.execute(sql, parameters);
}

/**
* Execute a read-only query and return results
*/
async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
await this.initialized;
await this.waitForReady();
return this.database.getAll(sql, parameters);
}

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*/
async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
await this.initialized;
await this.waitForReady();
return this.database.getOptional(sql, parameters);
}

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*/
async get<T>(sql: string, parameters?: any[]): Promise<T> {
await this.initialized;
await this.waitForReady();
return this.database.get(sql, parameters);
}

Expand All @@ -356,7 +367,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* In most cases, [readTransaction] should be used instead.
*/
async readLock<T>(callback: (db: DBAdapter) => Promise<T>) {
await this.initialized;
await this.waitForReady();
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, () => callback(this.database));
}

Expand All @@ -365,7 +376,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* In most cases, [writeTransaction] should be used instead.
*/
async writeLock<T>(callback: (db: DBAdapter) => Promise<T>) {
await this.initialized;
await this.waitForReady();
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => {
const res = await callback(this.database);
_.defer(() => this.syncStreamImplementation?.triggerCrudUpload());
Expand All @@ -377,7 +388,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
callback: (tx: Transaction) => Promise<T>,
lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS
): Promise<T> {
await this.initialized;
await this.waitForReady();
return this.database.readTransaction(
async (tx) => {
const res = await callback({ ...tx });
Expand All @@ -392,7 +403,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
callback: (tx: Transaction) => Promise<T>,
lockTimeout: number = DEFAULT_LOCK_TIMEOUT_MS
): Promise<T> {
await this.initialized;
await this.waitForReady();
return this.database.writeTransaction(
async (tx) => {
const res = await callback(tx);
Expand Down Expand Up @@ -478,7 +489,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
}

private async executeReadOnly(sql: string, params: any[]) {
await this.initialized;
await this.waitForReady();
return this.database.readLock((tx) => tx.execute(sql, params));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
adapter: this.bucketStorageAdapter,
remote,
uploadCrud: async () => {
await this.initialized;
await this.waitForReady();
await connector.uploadData(this);
},
retryDelayMs: this.options.retryDelay
Expand Down

0 comments on commit e2fab7f

Please sign in to comment.