From 42a6294bc47a4cdd88327eccaf90aa64eb5c6c86 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 19 Nov 2024 15:30:05 +0200 Subject: [PATCH 1/6] Added watch function to drizzle db API. --- .changeset/calm-baboons-worry.md | 5 + packages/drizzle-driver/src/sqlite/db.ts | 63 +++- .../drizzle-driver/tests/sqlite/watch.test.ts | 283 ++++++++++++++++++ 3 files changed, 348 insertions(+), 3 deletions(-) create mode 100644 .changeset/calm-baboons-worry.md create mode 100644 packages/drizzle-driver/tests/sqlite/watch.test.ts diff --git a/.changeset/calm-baboons-worry.md b/.changeset/calm-baboons-worry.md new file mode 100644 index 00000000..37c26d19 --- /dev/null +++ b/.changeset/calm-baboons-worry.md @@ -0,0 +1,5 @@ +--- +'@powersync/drizzle-driver': minor +--- + +Added `watch()` function to support watched queries. This function invokes `execute()` on the Drizzle query which improves support for complex queries such as those which are relational. diff --git a/packages/drizzle-driver/src/sqlite/db.ts b/packages/drizzle-driver/src/sqlite/db.ts index 77d2e54c..5d70121e 100644 --- a/packages/drizzle-driver/src/sqlite/db.ts +++ b/packages/drizzle-driver/src/sqlite/db.ts @@ -1,4 +1,11 @@ -import { AbstractPowerSyncDatabase, QueryResult } from '@powersync/common'; +import { + AbstractPowerSyncDatabase, + QueryResult, + runOnSchemaChange, + SQLWatchOptions, + WatchHandler +} from '@powersync/common'; +import { Query } from 'drizzle-orm'; import { DefaultLogger } from 'drizzle-orm/logger'; import { createTableRelationsHelpers, @@ -13,6 +20,8 @@ import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect'; import type { DrizzleConfig } from 'drizzle-orm/utils'; import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session'; +type WatchQuery = { toSQL(): Query; execute(): Promise }; + export interface PowerSyncSQLiteDatabase = Record> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { transaction( @@ -21,13 +30,15 @@ export interface PowerSyncSQLiteDatabase ) => Promise, config?: PowerSyncSQLiteTransactionConfig ): Promise; + + watch(query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions): void; } export function wrapPowerSyncWithDrizzle = Record>( db: AbstractPowerSyncDatabase, config: DrizzleConfig = {} ): PowerSyncSQLiteDatabase { - const dialect = new SQLiteAsyncDialect({casing: config.casing}); + const dialect = new SQLiteAsyncDialect({ casing: config.casing }); let logger; if (config.logger === true) { logger = new DefaultLogger(); @@ -48,5 +59,51 @@ export function wrapPowerSyncWithDrizzle const session = new PowerSyncSQLiteSession(db, dialect, schema, { logger }); - return new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase; + + const watch = (query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions): void => { + const { onResult, onError = (e: Error) => {} } = handler ?? {}; + if (!onResult) { + throw new Error('onResult is required'); + } + + const watchQuery = async (abortSignal: AbortSignal) => { + try { + const toSql = query.toSQL(); + const resolvedTables = await db.resolveTables(toSql.sql, toSql.params, options); + + // Fetch initial data + const result = await query.execute(); + onResult(result); + + db.onChangeWithCallback( + { + onChange: async () => { + try { + const result = await query.execute(); + onResult(result); + } catch (error: any) { + onError(error); + } + }, + onError + }, + { + ...(options ?? {}), + tables: resolvedTables, + // Override the abort signal since we intercept it + signal: abortSignal + } + ); + } catch (error: any) { + onError(error); + } + }; + + runOnSchemaChange(watchQuery, db, options); + }; + + const baseDatabase = new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase; + return Object.assign(baseDatabase, { + watch: (query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions) => watch(query, handler, options) + }); } diff --git a/packages/drizzle-driver/tests/sqlite/watch.test.ts b/packages/drizzle-driver/tests/sqlite/watch.test.ts new file mode 100644 index 00000000..54ed2b40 --- /dev/null +++ b/packages/drizzle-driver/tests/sqlite/watch.test.ts @@ -0,0 +1,283 @@ +import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common'; +import { PowerSyncDatabase } from '@powersync/web'; +import { count, eq, sql } from 'drizzle-orm'; +import { integer, sqliteTable, text, uniqueIndex } from 'drizzle-orm/sqlite-core'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import * as SUT from '../../src/sqlite/db'; + +vi.useRealTimers(); + +const assetsPs = new Table( + { + created_at: column.text, + make: column.text, + model: column.text, + serial_number: column.text, + quantity: column.integer, + user_id: column.text, + customer_id: column.text, + description: column.text + }, + { indexes: { makemodel: ['make, model'] } } +); + +const customersPs = new Table({ + name: column.text, + email: column.text +}); + +const PsSchema = new Schema({ assets: assetsPs, customers: customersPs }); + +const assets = sqliteTable( + 'assets', + { + id: text('id'), + created_at: text('created_at'), + make: text('make'), + model: text('model'), + serial_number: text('serial_number'), + quantity: integer('quantity'), + user_id: text('user_id'), + customer_id: text('customer_id'), + description: text('description') + }, + (table) => ({ + makemodelIndex: uniqueIndex('makemodel').on(table.make, table.model) + }) +); + +const customers = sqliteTable('customers', { + id: text('id'), + name: text('name'), + email: text('email') +}); + +const DrizzleSchema = { assets, customers }; + +/** + * There seems to be an issue with Vitest browser mode's setTimeout and + * fake timer functionality. + * e.g. calling: + * await new Promise((resolve) => setTimeout(resolve, 10)); + * waits for 1 second instead of 10ms. + * Setting this to 1 second as a work around. + */ +const throttleDuration = 1000; + +describe('Watch Tests', () => { + let powerSyncDb: AbstractPowerSyncDatabase; + let db: SUT.PowerSyncSQLiteDatabase; + + beforeEach(async () => { + powerSyncDb = new PowerSyncDatabase({ + database: { + dbFilename: 'test.db' + }, + schema: PsSchema + }); + db = SUT.wrapPowerSyncWithDrizzle(powerSyncDb, { schema: DrizzleSchema, logger: { logQuery: () => {} } }); + + await powerSyncDb.init(); + }); + + afterEach(async () => { + await powerSyncDb.disconnectAndClear(); + }); + + it('watch outside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 2; + let receivedUpdatesCount = 0; + + /** + * Promise which resolves once we received the same amount of update + * notifications as there are inserts. + */ + const receivedUpdates = new Promise((resolve) => { + const onUpdate = () => { + receivedUpdatesCount++; + + if (receivedUpdatesCount == updatesCount) { + abortController.abort(); + resolve(); + } + }; + + const query = db + .select({ count: count() }) + .from(assets) + .innerJoin(customers, eq(customers.id, assets.customer_id)); + + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + // Wait the throttle duration, ensuring a watch update for each insert + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + } + + await receivedUpdates; + expect(receivedUpdatesCount).equals(updatesCount); + }); + + it('watch inside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 5; + let receivedUpdatesCount = 0; + + const onUpdate = () => { + receivedUpdatesCount++; + }; + const query = db.select({ count: count() }).from(assets).innerJoin(customers, eq(customers.id, assets.customer_id)); + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + + // Create the inserts as fast as possible + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + abortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedUpdatesCount).equals(2); + }); + + it('should only watch tables inside query', async () => { + const assetsAbortController = new AbortController(); + + let receivedAssetsUpdatesCount = 0; + const onWatchAssets = () => { + receivedAssetsUpdatesCount++; + }; + + const queryAssets = db.select({ count: count() }).from(assets); + + db.watch( + queryAssets, + { onResult: onWatchAssets }, + { + signal: assetsAbortController.signal + } + ); + + const customersAbortController = new AbortController(); + + let receivedCustomersUpdatesCount = 0; + const onWatchCustomers = () => { + receivedCustomersUpdatesCount++; + }; + + const queryCustomers = db.select({ count: count() }).from(customers); + db.watch( + queryCustomers, + { onResult: onWatchCustomers }, + { + signal: customersAbortController.signal + } + ); + + // Ensures insert doesn't form part of initial result + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + + await db + .insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + assetsAbortController.abort(); + customersAbortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedAssetsUpdatesCount).equals(2); + + // Only the initial result should have yielded. + expect(receivedCustomersUpdatesCount).equals(1); + }); + + it('should handle watch onError', async () => { + const abortController = new AbortController(); + const onResult = () => {}; // no-op + let receivedErrorCount = 0; + + const receivedError = new Promise(async (resolve) => { + const onError = () => { + receivedErrorCount++; + resolve(); + }; + + const query = db + .select({ + id: sql`fakeFunction()` // Simulate an error with invalid function + }) + .from(assets); + + db.watch(query, { onResult, onError }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + abortController.abort(); + + await receivedError; + expect(receivedErrorCount).equals(1); + }); + + it('should throttle watch overflow', async () => { + const overflowAbortController = new AbortController(); + const updatesCount = 25; + + let receivedWithManagedOverflowCount = 0; + const firstResultReceived = new Promise((resolve) => { + const onResultOverflow = () => { + if (receivedWithManagedOverflowCount === 0) { + resolve(); + } + receivedWithManagedOverflowCount++; + }; + const query = db.select({ count: count() }).from(assets); + db.watch(query, { onResult: onResultOverflow }, { signal: overflowAbortController.signal, throttleMs: 1 }); + }); + + await firstResultReceived; + + // Perform a large number of inserts to trigger overflow + for (let i = 0; i < updatesCount; i++) { + db.insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, 1 * throttleDuration)); + + overflowAbortController.abort(); + + // This fluctuates between 3 and 4 based on timing, but should never be 25 + expect(receivedWithManagedOverflowCount).greaterThan(2); + expect(receivedWithManagedOverflowCount).toBeLessThanOrEqual(4); + }); +}); From ff11c5ae6686d76a24df4cdde4d9ea5ccc19f682 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 21 Nov 2024 13:56:24 +0200 Subject: [PATCH 2/6] Introducing compilableQueryWatch() util to common, using in Drizzle. --- .../common/src/client/compilableQueryWatch.ts | 55 ++++++++++++++++++ packages/common/src/index.ts | 1 + packages/drizzle-driver/src/sqlite/db.ts | 57 +++---------------- 3 files changed, 65 insertions(+), 48 deletions(-) create mode 100644 packages/common/src/client/compilableQueryWatch.ts diff --git a/packages/common/src/client/compilableQueryWatch.ts b/packages/common/src/client/compilableQueryWatch.ts new file mode 100644 index 00000000..0b973567 --- /dev/null +++ b/packages/common/src/client/compilableQueryWatch.ts @@ -0,0 +1,55 @@ +import { CompilableQuery } from './../types/types.js'; +import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js'; +import { runOnSchemaChange } from './runOnSchemaChange.js'; + +export interface CompilableQueryWatchHandler { + onResult: (results: T[]) => void; + onError?: (error: Error) => void; +} + +export function compilableQueryWatch( + db: AbstractPowerSyncDatabase, + query: CompilableQuery, + handler: CompilableQueryWatchHandler, + options?: SQLWatchOptions +): void { + const { onResult, onError = (e: Error) => {} } = handler ?? {}; + if (!onResult) { + throw new Error('onResult is required'); + } + + const watchQuery = async (abortSignal: AbortSignal) => { + try { + const toSql = query.compile(); + const resolvedTables = await db.resolveTables(toSql.sql, toSql.parameters as [], options); + + // Fetch initial data + const result = await query.execute(); + onResult(result); + + db.onChangeWithCallback( + { + onChange: async () => { + try { + const result = await query.execute(); + onResult(result); + } catch (error: any) { + onError(error); + } + }, + onError + }, + { + ...(options ?? {}), + tables: resolvedTables, + // Override the abort signal since we intercept it + signal: abortSignal + } + ); + } catch (error: any) { + onError(error); + } + }; + + runOnSchemaChange(watchQuery, db, options); +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index b55cca84..99497d15 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -5,6 +5,7 @@ export * from './client/connection/PowerSyncBackendConnector.js'; export * from './client/connection/PowerSyncCredentials.js'; export * from './client/sync/bucket/BucketStorageAdapter.js'; export { runOnSchemaChange } from './client/runOnSchemaChange.js'; +export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js'; export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js'; export * from './client/sync/bucket/SqliteBucketStorage.js'; export * from './client/sync/bucket/CrudBatch.js'; diff --git a/packages/drizzle-driver/src/sqlite/db.ts b/packages/drizzle-driver/src/sqlite/db.ts index 5d70121e..29eeb015 100644 --- a/packages/drizzle-driver/src/sqlite/db.ts +++ b/packages/drizzle-driver/src/sqlite/db.ts @@ -1,9 +1,9 @@ import { AbstractPowerSyncDatabase, + compilableQueryWatch, + CompilableQueryWatchHandler, QueryResult, - runOnSchemaChange, - SQLWatchOptions, - WatchHandler + SQLWatchOptions } from '@powersync/common'; import { Query } from 'drizzle-orm'; import { DefaultLogger } from 'drizzle-orm/logger'; @@ -18,9 +18,10 @@ import { SQLiteTransaction } from 'drizzle-orm/sqlite-core'; import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db'; import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect'; import type { DrizzleConfig } from 'drizzle-orm/utils'; +import { toCompilableQuery } from './../utils/compilableQuery'; import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session'; -type WatchQuery = { toSQL(): Query; execute(): Promise }; +type WatchQuery = { toSQL(): Query; execute(): Promise }; export interface PowerSyncSQLiteDatabase = Record> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { @@ -31,7 +32,7 @@ export interface PowerSyncSQLiteDatabase config?: PowerSyncSQLiteTransactionConfig ): Promise; - watch(query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions): void; + watch(query: WatchQuery, handler?: CompilableQueryWatchHandler, options?: SQLWatchOptions): void; } export function wrapPowerSyncWithDrizzle = Record>( @@ -60,50 +61,10 @@ export function wrapPowerSyncWithDrizzle logger }); - const watch = (query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions): void => { - const { onResult, onError = (e: Error) => {} } = handler ?? {}; - if (!onResult) { - throw new Error('onResult is required'); - } - - const watchQuery = async (abortSignal: AbortSignal) => { - try { - const toSql = query.toSQL(); - const resolvedTables = await db.resolveTables(toSql.sql, toSql.params, options); - - // Fetch initial data - const result = await query.execute(); - onResult(result); - - db.onChangeWithCallback( - { - onChange: async () => { - try { - const result = await query.execute(); - onResult(result); - } catch (error: any) { - onError(error); - } - }, - onError - }, - { - ...(options ?? {}), - tables: resolvedTables, - // Override the abort signal since we intercept it - signal: abortSignal - } - ); - } catch (error: any) { - onError(error); - } - }; - - runOnSchemaChange(watchQuery, db, options); - }; - const baseDatabase = new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase; return Object.assign(baseDatabase, { - watch: (query: WatchQuery, handler?: WatchHandler, options?: SQLWatchOptions) => watch(query, handler, options) + watch: (query: WatchQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => { + compilableQueryWatch(db, toCompilableQuery(query), handler, options); + } }); } From c84161234cfe2bd47657d8154016f112250de9ef Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 21 Nov 2024 16:02:28 +0200 Subject: [PATCH 3/6] Renamed WatchQuery type in Drizzle DB to DrizzleQuery. Exporting this type. --- packages/drizzle-driver/src/index.ts | 4 ++-- packages/drizzle-driver/src/sqlite/db.ts | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/drizzle-driver/src/index.ts b/packages/drizzle-driver/src/index.ts index 6f5c1537..6dd870c8 100644 --- a/packages/drizzle-driver/src/index.ts +++ b/packages/drizzle-driver/src/index.ts @@ -1,4 +1,4 @@ -import { wrapPowerSyncWithDrizzle, type PowerSyncSQLiteDatabase } from './sqlite/db'; +import { wrapPowerSyncWithDrizzle, type DrizzleQuery, type PowerSyncSQLiteDatabase } from './sqlite/db'; import { toCompilableQuery } from './utils/compilableQuery'; -export { wrapPowerSyncWithDrizzle, toCompilableQuery, PowerSyncSQLiteDatabase }; +export { wrapPowerSyncWithDrizzle, toCompilableQuery, DrizzleQuery, PowerSyncSQLiteDatabase }; diff --git a/packages/drizzle-driver/src/sqlite/db.ts b/packages/drizzle-driver/src/sqlite/db.ts index 29eeb015..66879e49 100644 --- a/packages/drizzle-driver/src/sqlite/db.ts +++ b/packages/drizzle-driver/src/sqlite/db.ts @@ -21,7 +21,7 @@ import type { DrizzleConfig } from 'drizzle-orm/utils'; import { toCompilableQuery } from './../utils/compilableQuery'; import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session'; -type WatchQuery = { toSQL(): Query; execute(): Promise }; +export type DrizzleQuery = { toSQL(): Query; execute(): Promise }; export interface PowerSyncSQLiteDatabase = Record> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { @@ -32,7 +32,7 @@ export interface PowerSyncSQLiteDatabase config?: PowerSyncSQLiteTransactionConfig ): Promise; - watch(query: WatchQuery, handler?: CompilableQueryWatchHandler, options?: SQLWatchOptions): void; + watch(query: DrizzleQuery, handler?: CompilableQueryWatchHandler, options?: SQLWatchOptions): void; } export function wrapPowerSyncWithDrizzle = Record>( @@ -63,7 +63,7 @@ export function wrapPowerSyncWithDrizzle const baseDatabase = new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase; return Object.assign(baseDatabase, { - watch: (query: WatchQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => { + watch: (query: DrizzleQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => { compilableQueryWatch(db, toCompilableQuery(query), handler, options); } }); From febcd221be5041d829297425421e68fbc1005fde Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 21 Nov 2024 16:03:49 +0200 Subject: [PATCH 4/6] Added watch method to Kysely DB Wrapper API. --- packages/kysely-driver/src/index.ts | 3 +- packages/kysely-driver/src/sqlite/db.ts | 25 +- .../kysely-driver/tests/sqlite/watch.test.ts | 264 ++++++++++++++++++ 3 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 packages/kysely-driver/tests/sqlite/watch.test.ts diff --git a/packages/kysely-driver/src/index.ts b/packages/kysely-driver/src/index.ts index 12f341e6..750e3047 100644 --- a/packages/kysely-driver/src/index.ts +++ b/packages/kysely-driver/src/index.ts @@ -1,4 +1,4 @@ -import { wrapPowerSyncWithKysely } from './sqlite/db'; +import { wrapPowerSyncWithKysely, type PowerSyncKyselyDatabase } from './sqlite/db'; import { type ColumnType, type Insertable, @@ -19,5 +19,6 @@ export { KyselyConfig, sql, Kysely, + PowerSyncKyselyDatabase, wrapPowerSyncWithKysely }; diff --git a/packages/kysely-driver/src/sqlite/db.ts b/packages/kysely-driver/src/sqlite/db.ts index 15e209a7..0fe213d1 100644 --- a/packages/kysely-driver/src/sqlite/db.ts +++ b/packages/kysely-driver/src/sqlite/db.ts @@ -1,4 +1,10 @@ -import { type AbstractPowerSyncDatabase } from '@powersync/common'; +import { + CompilableQuery, + compilableQueryWatch, + CompilableQueryWatchHandler, + SQLWatchOptions, + type AbstractPowerSyncDatabase +} from '@powersync/common'; import { Dialect, Kysely, type KyselyConfig } from 'kysely'; import { PowerSyncDialect } from './sqlite-dialect'; @@ -9,11 +15,24 @@ export type PowerSyncKyselyOptions = Omit & { dialect?: Dialect; }; -export const wrapPowerSyncWithKysely = (db: AbstractPowerSyncDatabase, options?: PowerSyncKyselyOptions) => { - return new Kysely({ +export type PowerSyncKyselyDatabase = Kysely & { + watch: (query: CompilableQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => void; +}; + +export const wrapPowerSyncWithKysely = ( + db: AbstractPowerSyncDatabase, + options?: PowerSyncKyselyOptions +): PowerSyncKyselyDatabase => { + const kysely = new Kysely({ dialect: new PowerSyncDialect({ db }), ...options }); + + return Object.assign(kysely, { + watch: (query: CompilableQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => { + compilableQueryWatch(db, query, handler, options); + } + }); }; diff --git a/packages/kysely-driver/tests/sqlite/watch.test.ts b/packages/kysely-driver/tests/sqlite/watch.test.ts new file mode 100644 index 00000000..1c39a32e --- /dev/null +++ b/packages/kysely-driver/tests/sqlite/watch.test.ts @@ -0,0 +1,264 @@ +import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common'; +import { PowerSyncDatabase } from '@powersync/web'; +import { sql } from 'kysely'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import * as SUT from '../../src/sqlite/db'; + +vi.useRealTimers(); + +const assetsPs = new Table( + { + created_at: column.text, + make: column.text, + model: column.text, + serial_number: column.text, + quantity: column.integer, + user_id: column.text, + customer_id: column.text, + description: column.text + }, + { indexes: { makemodel: ['make, model'] } } +); + +const customersPs = new Table({ + name: column.text, + email: column.text +}); + +const PsSchema = new Schema({ assets: assetsPs, customers: customersPs }); +export type Database = (typeof PsSchema)['types']; + +/** + * There seems to be an issue with Vitest browser mode's setTimeout and + * fake timer functionality. + * e.g. calling: + * await new Promise((resolve) => setTimeout(resolve, 10)); + * waits for 1 second instead of 10ms. + * Setting this to 1 second as a work around. + */ +const throttleDuration = 1000; + +describe('Watch Tests', () => { + let powerSyncDb: AbstractPowerSyncDatabase; + let db: SUT.PowerSyncKyselyDatabase; + + beforeEach(async () => { + powerSyncDb = new PowerSyncDatabase({ + database: { + dbFilename: 'test.db' + }, + schema: PsSchema + }); + db = SUT.wrapPowerSyncWithKysely(powerSyncDb); + + await powerSyncDb.init(); + }); + + afterEach(async () => { + await powerSyncDb.disconnectAndClear(); + }); + + it('watch outside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 2; + let receivedUpdatesCount = 0; + + /** + * Promise which resolves once we received the same amount of update + * notifications as there are inserts. + */ + const receivedUpdates = new Promise((resolve) => { + const onUpdate = () => { + receivedUpdatesCount++; + + if (receivedUpdatesCount == updatesCount) { + abortController.abort(); + resolve(); + } + }; + + const query = db + .selectFrom('assets') + .innerJoin('customers', 'customers.id', 'assets.customer_id') + .select(db.fn.count('assets.id').as('count')); + + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + // Wait the throttle duration, ensuring a watch update for each insert + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + } + + await receivedUpdates; + expect(receivedUpdatesCount).equals(updatesCount); + }); + + it('watch inside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 5; + let receivedUpdatesCount = 0; + + const onUpdate = () => { + receivedUpdatesCount++; + }; + + const query = db + .selectFrom('assets') + .innerJoin('customers', 'customers.id', 'assets.customer_id') + .select(db.fn.count('assets.id').as('count')); + + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + + // Create the inserts as fast as possible + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + abortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedUpdatesCount).equals(2); + }); + + it('should only watch tables inside query', async () => { + const assetsAbortController = new AbortController(); + + let receivedAssetsUpdatesCount = 0; + const onWatchAssets = () => { + receivedAssetsUpdatesCount++; + }; + + const queryAssets = db.selectFrom('assets').select(db.fn.count('assets.id').as('count')); + db.watch( + queryAssets, + { onResult: onWatchAssets }, + { + signal: assetsAbortController.signal + } + ); + + const customersAbortController = new AbortController(); + + let receivedCustomersUpdatesCount = 0; + const onWatchCustomers = () => { + receivedCustomersUpdatesCount++; + }; + + const queryCustomers = db.selectFrom('customers').select(db.fn.count('customers.id').as('count')); + + db.watch( + queryCustomers, + { onResult: onWatchCustomers }, + { + signal: customersAbortController.signal + } + ); + + // Ensures insert doesn't form part of initial result + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + + await db + .insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + assetsAbortController.abort(); + customersAbortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedAssetsUpdatesCount).equals(2); + + // Only the initial result should have yielded. + expect(receivedCustomersUpdatesCount).equals(1); + }); + + it('should handle watch onError', async () => { + const abortController = new AbortController(); + const onResult = () => {}; // no-op + let receivedErrorCount = 0; + + const receivedError = new Promise(async (resolve) => { + const onError = () => { + receivedErrorCount++; + resolve(); + }; + + const query = db.selectFrom('assets').select([ + () => { + const fullName = sql`fakeFunction()`; // Simulate an error with invalid function + return fullName.as('full_name'); + } + ]); + + db.watch(query, { onResult, onError }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + abortController.abort(); + + await receivedError; + expect(receivedErrorCount).equals(1); + }); + + it('should throttle watch overflow', async () => { + const overflowAbortController = new AbortController(); + const updatesCount = 25; + + let receivedWithManagedOverflowCount = 0; + const firstResultReceived = new Promise((resolve) => { + const onResultOverflow = () => { + if (receivedWithManagedOverflowCount === 0) { + resolve(); + } + receivedWithManagedOverflowCount++; + }; + + const query = db.selectFrom('assets').select(db.fn.count('assets.id').as('count')); + db.watch(query, { onResult: onResultOverflow }, { signal: overflowAbortController.signal, throttleMs: 1 }); + }); + + await firstResultReceived; + + // Perform a large number of inserts to trigger overflow + for (let i = 0; i < updatesCount; i++) { + db.insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, 1 * throttleDuration)); + + overflowAbortController.abort(); + + // This fluctuates between 3 and 4 based on timing, but should never be 25 + expect(receivedWithManagedOverflowCount).greaterThan(2); + expect(receivedWithManagedOverflowCount).toBeLessThanOrEqual(4); + }); +}); From 451dbc3789cf6008676e66aab1c1f845b0402257 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Thu, 21 Nov 2024 16:08:47 +0200 Subject: [PATCH 5/6] Added changeset entries. --- .changeset/calm-baboons-worry.md | 2 +- .changeset/empty-chefs-smell.md | 5 +++++ .changeset/gold-beers-smoke.md | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 .changeset/empty-chefs-smell.md create mode 100644 .changeset/gold-beers-smoke.md diff --git a/.changeset/calm-baboons-worry.md b/.changeset/calm-baboons-worry.md index 37c26d19..74383228 100644 --- a/.changeset/calm-baboons-worry.md +++ b/.changeset/calm-baboons-worry.md @@ -2,4 +2,4 @@ '@powersync/drizzle-driver': minor --- -Added `watch()` function to support watched queries. This function invokes `execute()` on the Drizzle query which improves support for complex queries such as those which are relational. +Added `watch()` function to Drizzle wrapper to support watched queries. This function invokes `execute()` on the Drizzle query which improves support for complex queries such as those which are relational. diff --git a/.changeset/empty-chefs-smell.md b/.changeset/empty-chefs-smell.md new file mode 100644 index 00000000..735fece9 --- /dev/null +++ b/.changeset/empty-chefs-smell.md @@ -0,0 +1,5 @@ +--- +'@powersync/kysely-driver': minor +--- + +Added `watch()` function to Kysely wrapper to support watched queries. This function invokes `execute()` on the Kysely query which improves support for complex queries and Kysely plugins. diff --git a/.changeset/gold-beers-smoke.md b/.changeset/gold-beers-smoke.md new file mode 100644 index 00000000..52c73bae --- /dev/null +++ b/.changeset/gold-beers-smoke.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': minor +--- + +Added `compilableQueryWatch()` utility function which allows any compilable query to be watched. From d6d9c103de03953a3d524ed81cbd302846ffa093 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 27 Nov 2024 14:19:33 +0200 Subject: [PATCH 6/6] Dropped Object.assign usage in favour of extending the classes. --- packages/drizzle-driver/src/sqlite/db.ts | 73 +++++++++++++----------- packages/kysely-driver/src/sqlite/db.ts | 31 +++++----- 2 files changed, 56 insertions(+), 48 deletions(-) diff --git a/packages/drizzle-driver/src/sqlite/db.ts b/packages/drizzle-driver/src/sqlite/db.ts index 66879e49..e9276a4f 100644 --- a/packages/drizzle-driver/src/sqlite/db.ts +++ b/packages/drizzle-driver/src/sqlite/db.ts @@ -23,48 +23,55 @@ import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqli export type DrizzleQuery = { toSQL(): Query; execute(): Promise }; -export interface PowerSyncSQLiteDatabase = Record> - extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { - transaction( +export class PowerSyncSQLiteDatabase< + TSchema extends Record = Record +> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { + private db: AbstractPowerSyncDatabase; + + constructor(db: AbstractPowerSyncDatabase, config: DrizzleConfig = {}) { + const dialect = new SQLiteAsyncDialect({ casing: config.casing }); + let logger; + if (config.logger === true) { + logger = new DefaultLogger(); + } else if (config.logger !== false) { + logger = config.logger; + } + + let schema: RelationalSchemaConfig | undefined; + if (config.schema) { + const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers); + schema = { + fullSchema: config.schema, + schema: tablesConfig.tables, + tableNamesMap: tablesConfig.tableNamesMap + }; + } + + const session = new PowerSyncSQLiteSession(db, dialect, schema, { + logger + }); + + super('async', dialect, session as any, schema as any); + this.db = db; + } + + override transaction( transaction: ( tx: SQLiteTransaction<'async', QueryResult, TSchema, ExtractTablesWithRelations> ) => Promise, config?: PowerSyncSQLiteTransactionConfig - ): Promise; + ): Promise { + return super.transaction(transaction, config); + } - watch(query: DrizzleQuery, handler?: CompilableQueryWatchHandler, options?: SQLWatchOptions): void; + watch(query: DrizzleQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions): void { + compilableQueryWatch(this.db, toCompilableQuery(query), handler, options); + } } export function wrapPowerSyncWithDrizzle = Record>( db: AbstractPowerSyncDatabase, config: DrizzleConfig = {} ): PowerSyncSQLiteDatabase { - const dialect = new SQLiteAsyncDialect({ casing: config.casing }); - let logger; - if (config.logger === true) { - logger = new DefaultLogger(); - } else if (config.logger !== false) { - logger = config.logger; - } - - let schema: RelationalSchemaConfig | undefined; - if (config.schema) { - const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers); - schema = { - fullSchema: config.schema, - schema: tablesConfig.tables, - tableNamesMap: tablesConfig.tableNamesMap - }; - } - - const session = new PowerSyncSQLiteSession(db, dialect, schema, { - logger - }); - - const baseDatabase = new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase; - return Object.assign(baseDatabase, { - watch: (query: DrizzleQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => { - compilableQueryWatch(db, toCompilableQuery(query), handler, options); - } - }); + return new PowerSyncSQLiteDatabase(db, config); } diff --git a/packages/kysely-driver/src/sqlite/db.ts b/packages/kysely-driver/src/sqlite/db.ts index 0fe213d1..f0c9f509 100644 --- a/packages/kysely-driver/src/sqlite/db.ts +++ b/packages/kysely-driver/src/sqlite/db.ts @@ -15,24 +15,25 @@ export type PowerSyncKyselyOptions = Omit & { dialect?: Dialect; }; -export type PowerSyncKyselyDatabase = Kysely & { - watch: (query: CompilableQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => void; -}; +export class PowerSyncKyselyDatabase extends Kysely { + private db: AbstractPowerSyncDatabase; + + constructor(db: AbstractPowerSyncDatabase, options?: PowerSyncKyselyOptions) { + super({ + dialect: new PowerSyncDialect({ db }), + ...options + }); + this.db = db; + } + + watch(query: CompilableQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions): void { + compilableQueryWatch(this.db, query, handler, options); + } +} export const wrapPowerSyncWithKysely = ( db: AbstractPowerSyncDatabase, options?: PowerSyncKyselyOptions ): PowerSyncKyselyDatabase => { - const kysely = new Kysely({ - dialect: new PowerSyncDialect({ - db - }), - ...options - }); - - return Object.assign(kysely, { - watch: (query: CompilableQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions) => { - compilableQueryWatch(db, query, handler, options); - } - }); + return new PowerSyncKyselyDatabase(db, options); };