From 77a9ed279fa69b4805ab330e3b9c50926d009d2f Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Mon, 2 Dec 2024 10:42:51 +0200 Subject: [PATCH] feat: added `watch()` to Drizzle and Kysely integrations (#414) --- .changeset/calm-baboons-worry.md | 5 + .changeset/empty-chefs-smell.md | 5 + .changeset/gold-beers-smoke.md | 5 + .../common/src/client/compilableQueryWatch.ts | 55 ++++ packages/common/src/index.ts | 1 + packages/drizzle-driver/src/index.ts | 4 +- packages/drizzle-driver/src/sqlite/db.ts | 79 +++-- .../drizzle-driver/tests/sqlite/watch.test.ts | 283 ++++++++++++++++++ packages/kysely-driver/src/index.ts | 3 +- packages/kysely-driver/src/sqlite/db.ts | 36 ++- .../kysely-driver/tests/sqlite/watch.test.ts | 264 ++++++++++++++++ 11 files changed, 702 insertions(+), 38 deletions(-) create mode 100644 .changeset/calm-baboons-worry.md create mode 100644 .changeset/empty-chefs-smell.md create mode 100644 .changeset/gold-beers-smoke.md create mode 100644 packages/common/src/client/compilableQueryWatch.ts create mode 100644 packages/drizzle-driver/tests/sqlite/watch.test.ts create mode 100644 packages/kysely-driver/tests/sqlite/watch.test.ts diff --git a/.changeset/calm-baboons-worry.md b/.changeset/calm-baboons-worry.md new file mode 100644 index 00000000..74383228 --- /dev/null +++ b/.changeset/calm-baboons-worry.md @@ -0,0 +1,5 @@ +--- +'@powersync/drizzle-driver': minor +--- + +Added `watch()` function to Drizzle wrapper to support watched queries. This function invokes `execute()` on the Drizzle query which improves support for complex queries such as those which are relational. diff --git a/.changeset/empty-chefs-smell.md b/.changeset/empty-chefs-smell.md new file mode 100644 index 00000000..735fece9 --- /dev/null +++ b/.changeset/empty-chefs-smell.md @@ -0,0 +1,5 @@ +--- +'@powersync/kysely-driver': minor +--- + +Added `watch()` function to Kysely wrapper to support watched queries. This function invokes `execute()` on the Kysely query which improves support for complex queries and Kysely plugins. diff --git a/.changeset/gold-beers-smoke.md b/.changeset/gold-beers-smoke.md new file mode 100644 index 00000000..52c73bae --- /dev/null +++ b/.changeset/gold-beers-smoke.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': minor +--- + +Added `compilableQueryWatch()` utility function which allows any compilable query to be watched. diff --git a/packages/common/src/client/compilableQueryWatch.ts b/packages/common/src/client/compilableQueryWatch.ts new file mode 100644 index 00000000..0b973567 --- /dev/null +++ b/packages/common/src/client/compilableQueryWatch.ts @@ -0,0 +1,55 @@ +import { CompilableQuery } from './../types/types.js'; +import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js'; +import { runOnSchemaChange } from './runOnSchemaChange.js'; + +export interface CompilableQueryWatchHandler { + onResult: (results: T[]) => void; + onError?: (error: Error) => void; +} + +export function compilableQueryWatch( + db: AbstractPowerSyncDatabase, + query: CompilableQuery, + handler: CompilableQueryWatchHandler, + options?: SQLWatchOptions +): void { + const { onResult, onError = (e: Error) => {} } = handler ?? {}; + if (!onResult) { + throw new Error('onResult is required'); + } + + const watchQuery = async (abortSignal: AbortSignal) => { + try { + const toSql = query.compile(); + const resolvedTables = await db.resolveTables(toSql.sql, toSql.parameters as [], options); + + // Fetch initial data + const result = await query.execute(); + onResult(result); + + db.onChangeWithCallback( + { + onChange: async () => { + try { + const result = await query.execute(); + onResult(result); + } catch (error: any) { + onError(error); + } + }, + onError + }, + { + ...(options ?? {}), + tables: resolvedTables, + // Override the abort signal since we intercept it + signal: abortSignal + } + ); + } catch (error: any) { + onError(error); + } + }; + + runOnSchemaChange(watchQuery, db, options); +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index b55cca84..99497d15 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -5,6 +5,7 @@ export * from './client/connection/PowerSyncBackendConnector.js'; export * from './client/connection/PowerSyncCredentials.js'; export * from './client/sync/bucket/BucketStorageAdapter.js'; export { runOnSchemaChange } from './client/runOnSchemaChange.js'; +export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js'; export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js'; export * from './client/sync/bucket/SqliteBucketStorage.js'; export * from './client/sync/bucket/CrudBatch.js'; diff --git a/packages/drizzle-driver/src/index.ts b/packages/drizzle-driver/src/index.ts index 6f5c1537..6dd870c8 100644 --- a/packages/drizzle-driver/src/index.ts +++ b/packages/drizzle-driver/src/index.ts @@ -1,4 +1,4 @@ -import { wrapPowerSyncWithDrizzle, type PowerSyncSQLiteDatabase } from './sqlite/db'; +import { wrapPowerSyncWithDrizzle, type DrizzleQuery, type PowerSyncSQLiteDatabase } from './sqlite/db'; import { toCompilableQuery } from './utils/compilableQuery'; -export { wrapPowerSyncWithDrizzle, toCompilableQuery, PowerSyncSQLiteDatabase }; +export { wrapPowerSyncWithDrizzle, toCompilableQuery, DrizzleQuery, PowerSyncSQLiteDatabase }; diff --git a/packages/drizzle-driver/src/sqlite/db.ts b/packages/drizzle-driver/src/sqlite/db.ts index 77d2e54c..e9276a4f 100644 --- a/packages/drizzle-driver/src/sqlite/db.ts +++ b/packages/drizzle-driver/src/sqlite/db.ts @@ -1,4 +1,11 @@ -import { AbstractPowerSyncDatabase, QueryResult } from '@powersync/common'; +import { + AbstractPowerSyncDatabase, + compilableQueryWatch, + CompilableQueryWatchHandler, + QueryResult, + SQLWatchOptions +} from '@powersync/common'; +import { Query } from 'drizzle-orm'; import { DefaultLogger } from 'drizzle-orm/logger'; import { createTableRelationsHelpers, @@ -11,42 +18,60 @@ import { SQLiteTransaction } from 'drizzle-orm/sqlite-core'; import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db'; import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect'; import type { DrizzleConfig } from 'drizzle-orm/utils'; +import { toCompilableQuery } from './../utils/compilableQuery'; import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session'; -export interface PowerSyncSQLiteDatabase = Record> - extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { - transaction( +export type DrizzleQuery = { toSQL(): Query; execute(): Promise }; + +export class PowerSyncSQLiteDatabase< + TSchema extends Record = Record +> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> { + private db: AbstractPowerSyncDatabase; + + constructor(db: AbstractPowerSyncDatabase, config: DrizzleConfig = {}) { + const dialect = new SQLiteAsyncDialect({ casing: config.casing }); + let logger; + if (config.logger === true) { + logger = new DefaultLogger(); + } else if (config.logger !== false) { + logger = config.logger; + } + + let schema: RelationalSchemaConfig | undefined; + if (config.schema) { + const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers); + schema = { + fullSchema: config.schema, + schema: tablesConfig.tables, + tableNamesMap: tablesConfig.tableNamesMap + }; + } + + const session = new PowerSyncSQLiteSession(db, dialect, schema, { + logger + }); + + super('async', dialect, session as any, schema as any); + this.db = db; + } + + override transaction( transaction: ( tx: SQLiteTransaction<'async', QueryResult, TSchema, ExtractTablesWithRelations> ) => Promise, config?: PowerSyncSQLiteTransactionConfig - ): Promise; + ): Promise { + return super.transaction(transaction, config); + } + + watch(query: DrizzleQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions): void { + compilableQueryWatch(this.db, toCompilableQuery(query), handler, options); + } } export function wrapPowerSyncWithDrizzle = Record>( db: AbstractPowerSyncDatabase, config: DrizzleConfig = {} ): PowerSyncSQLiteDatabase { - const dialect = new SQLiteAsyncDialect({casing: config.casing}); - let logger; - if (config.logger === true) { - logger = new DefaultLogger(); - } else if (config.logger !== false) { - logger = config.logger; - } - - let schema: RelationalSchemaConfig | undefined; - if (config.schema) { - const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers); - schema = { - fullSchema: config.schema, - schema: tablesConfig.tables, - tableNamesMap: tablesConfig.tableNamesMap - }; - } - - const session = new PowerSyncSQLiteSession(db, dialect, schema, { - logger - }); - return new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase; + return new PowerSyncSQLiteDatabase(db, config); } diff --git a/packages/drizzle-driver/tests/sqlite/watch.test.ts b/packages/drizzle-driver/tests/sqlite/watch.test.ts new file mode 100644 index 00000000..54ed2b40 --- /dev/null +++ b/packages/drizzle-driver/tests/sqlite/watch.test.ts @@ -0,0 +1,283 @@ +import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common'; +import { PowerSyncDatabase } from '@powersync/web'; +import { count, eq, sql } from 'drizzle-orm'; +import { integer, sqliteTable, text, uniqueIndex } from 'drizzle-orm/sqlite-core'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import * as SUT from '../../src/sqlite/db'; + +vi.useRealTimers(); + +const assetsPs = new Table( + { + created_at: column.text, + make: column.text, + model: column.text, + serial_number: column.text, + quantity: column.integer, + user_id: column.text, + customer_id: column.text, + description: column.text + }, + { indexes: { makemodel: ['make, model'] } } +); + +const customersPs = new Table({ + name: column.text, + email: column.text +}); + +const PsSchema = new Schema({ assets: assetsPs, customers: customersPs }); + +const assets = sqliteTable( + 'assets', + { + id: text('id'), + created_at: text('created_at'), + make: text('make'), + model: text('model'), + serial_number: text('serial_number'), + quantity: integer('quantity'), + user_id: text('user_id'), + customer_id: text('customer_id'), + description: text('description') + }, + (table) => ({ + makemodelIndex: uniqueIndex('makemodel').on(table.make, table.model) + }) +); + +const customers = sqliteTable('customers', { + id: text('id'), + name: text('name'), + email: text('email') +}); + +const DrizzleSchema = { assets, customers }; + +/** + * There seems to be an issue with Vitest browser mode's setTimeout and + * fake timer functionality. + * e.g. calling: + * await new Promise((resolve) => setTimeout(resolve, 10)); + * waits for 1 second instead of 10ms. + * Setting this to 1 second as a work around. + */ +const throttleDuration = 1000; + +describe('Watch Tests', () => { + let powerSyncDb: AbstractPowerSyncDatabase; + let db: SUT.PowerSyncSQLiteDatabase; + + beforeEach(async () => { + powerSyncDb = new PowerSyncDatabase({ + database: { + dbFilename: 'test.db' + }, + schema: PsSchema + }); + db = SUT.wrapPowerSyncWithDrizzle(powerSyncDb, { schema: DrizzleSchema, logger: { logQuery: () => {} } }); + + await powerSyncDb.init(); + }); + + afterEach(async () => { + await powerSyncDb.disconnectAndClear(); + }); + + it('watch outside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 2; + let receivedUpdatesCount = 0; + + /** + * Promise which resolves once we received the same amount of update + * notifications as there are inserts. + */ + const receivedUpdates = new Promise((resolve) => { + const onUpdate = () => { + receivedUpdatesCount++; + + if (receivedUpdatesCount == updatesCount) { + abortController.abort(); + resolve(); + } + }; + + const query = db + .select({ count: count() }) + .from(assets) + .innerJoin(customers, eq(customers.id, assets.customer_id)); + + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + // Wait the throttle duration, ensuring a watch update for each insert + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + } + + await receivedUpdates; + expect(receivedUpdatesCount).equals(updatesCount); + }); + + it('watch inside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 5; + let receivedUpdatesCount = 0; + + const onUpdate = () => { + receivedUpdatesCount++; + }; + const query = db.select({ count: count() }).from(assets).innerJoin(customers, eq(customers.id, assets.customer_id)); + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + + // Create the inserts as fast as possible + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + abortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedUpdatesCount).equals(2); + }); + + it('should only watch tables inside query', async () => { + const assetsAbortController = new AbortController(); + + let receivedAssetsUpdatesCount = 0; + const onWatchAssets = () => { + receivedAssetsUpdatesCount++; + }; + + const queryAssets = db.select({ count: count() }).from(assets); + + db.watch( + queryAssets, + { onResult: onWatchAssets }, + { + signal: assetsAbortController.signal + } + ); + + const customersAbortController = new AbortController(); + + let receivedCustomersUpdatesCount = 0; + const onWatchCustomers = () => { + receivedCustomersUpdatesCount++; + }; + + const queryCustomers = db.select({ count: count() }).from(customers); + db.watch( + queryCustomers, + { onResult: onWatchCustomers }, + { + signal: customersAbortController.signal + } + ); + + // Ensures insert doesn't form part of initial result + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + + await db + .insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + assetsAbortController.abort(); + customersAbortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedAssetsUpdatesCount).equals(2); + + // Only the initial result should have yielded. + expect(receivedCustomersUpdatesCount).equals(1); + }); + + it('should handle watch onError', async () => { + const abortController = new AbortController(); + const onResult = () => {}; // no-op + let receivedErrorCount = 0; + + const receivedError = new Promise(async (resolve) => { + const onError = () => { + receivedErrorCount++; + resolve(); + }; + + const query = db + .select({ + id: sql`fakeFunction()` // Simulate an error with invalid function + }) + .from(assets); + + db.watch(query, { onResult, onError }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + abortController.abort(); + + await receivedError; + expect(receivedErrorCount).equals(1); + }); + + it('should throttle watch overflow', async () => { + const overflowAbortController = new AbortController(); + const updatesCount = 25; + + let receivedWithManagedOverflowCount = 0; + const firstResultReceived = new Promise((resolve) => { + const onResultOverflow = () => { + if (receivedWithManagedOverflowCount === 0) { + resolve(); + } + receivedWithManagedOverflowCount++; + }; + const query = db.select({ count: count() }).from(assets); + db.watch(query, { onResult: onResultOverflow }, { signal: overflowAbortController.signal, throttleMs: 1 }); + }); + + await firstResultReceived; + + // Perform a large number of inserts to trigger overflow + for (let i = 0; i < updatesCount; i++) { + db.insert(assets) + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, 1 * throttleDuration)); + + overflowAbortController.abort(); + + // This fluctuates between 3 and 4 based on timing, but should never be 25 + expect(receivedWithManagedOverflowCount).greaterThan(2); + expect(receivedWithManagedOverflowCount).toBeLessThanOrEqual(4); + }); +}); diff --git a/packages/kysely-driver/src/index.ts b/packages/kysely-driver/src/index.ts index 12f341e6..750e3047 100644 --- a/packages/kysely-driver/src/index.ts +++ b/packages/kysely-driver/src/index.ts @@ -1,4 +1,4 @@ -import { wrapPowerSyncWithKysely } from './sqlite/db'; +import { wrapPowerSyncWithKysely, type PowerSyncKyselyDatabase } from './sqlite/db'; import { type ColumnType, type Insertable, @@ -19,5 +19,6 @@ export { KyselyConfig, sql, Kysely, + PowerSyncKyselyDatabase, wrapPowerSyncWithKysely }; diff --git a/packages/kysely-driver/src/sqlite/db.ts b/packages/kysely-driver/src/sqlite/db.ts index 15e209a7..f0c9f509 100644 --- a/packages/kysely-driver/src/sqlite/db.ts +++ b/packages/kysely-driver/src/sqlite/db.ts @@ -1,4 +1,10 @@ -import { type AbstractPowerSyncDatabase } from '@powersync/common'; +import { + CompilableQuery, + compilableQueryWatch, + CompilableQueryWatchHandler, + SQLWatchOptions, + type AbstractPowerSyncDatabase +} from '@powersync/common'; import { Dialect, Kysely, type KyselyConfig } from 'kysely'; import { PowerSyncDialect } from './sqlite-dialect'; @@ -9,11 +15,25 @@ export type PowerSyncKyselyOptions = Omit & { dialect?: Dialect; }; -export const wrapPowerSyncWithKysely = (db: AbstractPowerSyncDatabase, options?: PowerSyncKyselyOptions) => { - return new Kysely({ - dialect: new PowerSyncDialect({ - db - }), - ...options - }); +export class PowerSyncKyselyDatabase extends Kysely { + private db: AbstractPowerSyncDatabase; + + constructor(db: AbstractPowerSyncDatabase, options?: PowerSyncKyselyOptions) { + super({ + dialect: new PowerSyncDialect({ db }), + ...options + }); + this.db = db; + } + + watch(query: CompilableQuery, handler: CompilableQueryWatchHandler, options?: SQLWatchOptions): void { + compilableQueryWatch(this.db, query, handler, options); + } +} + +export const wrapPowerSyncWithKysely = ( + db: AbstractPowerSyncDatabase, + options?: PowerSyncKyselyOptions +): PowerSyncKyselyDatabase => { + return new PowerSyncKyselyDatabase(db, options); }; diff --git a/packages/kysely-driver/tests/sqlite/watch.test.ts b/packages/kysely-driver/tests/sqlite/watch.test.ts new file mode 100644 index 00000000..1c39a32e --- /dev/null +++ b/packages/kysely-driver/tests/sqlite/watch.test.ts @@ -0,0 +1,264 @@ +import { AbstractPowerSyncDatabase, column, Schema, Table } from '@powersync/common'; +import { PowerSyncDatabase } from '@powersync/web'; +import { sql } from 'kysely'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import * as SUT from '../../src/sqlite/db'; + +vi.useRealTimers(); + +const assetsPs = new Table( + { + created_at: column.text, + make: column.text, + model: column.text, + serial_number: column.text, + quantity: column.integer, + user_id: column.text, + customer_id: column.text, + description: column.text + }, + { indexes: { makemodel: ['make, model'] } } +); + +const customersPs = new Table({ + name: column.text, + email: column.text +}); + +const PsSchema = new Schema({ assets: assetsPs, customers: customersPs }); +export type Database = (typeof PsSchema)['types']; + +/** + * There seems to be an issue with Vitest browser mode's setTimeout and + * fake timer functionality. + * e.g. calling: + * await new Promise((resolve) => setTimeout(resolve, 10)); + * waits for 1 second instead of 10ms. + * Setting this to 1 second as a work around. + */ +const throttleDuration = 1000; + +describe('Watch Tests', () => { + let powerSyncDb: AbstractPowerSyncDatabase; + let db: SUT.PowerSyncKyselyDatabase; + + beforeEach(async () => { + powerSyncDb = new PowerSyncDatabase({ + database: { + dbFilename: 'test.db' + }, + schema: PsSchema + }); + db = SUT.wrapPowerSyncWithKysely(powerSyncDb); + + await powerSyncDb.init(); + }); + + afterEach(async () => { + await powerSyncDb.disconnectAndClear(); + }); + + it('watch outside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 2; + let receivedUpdatesCount = 0; + + /** + * Promise which resolves once we received the same amount of update + * notifications as there are inserts. + */ + const receivedUpdates = new Promise((resolve) => { + const onUpdate = () => { + receivedUpdatesCount++; + + if (receivedUpdatesCount == updatesCount) { + abortController.abort(); + resolve(); + } + }; + + const query = db + .selectFrom('assets') + .innerJoin('customers', 'customers.id', 'assets.customer_id') + .select(db.fn.count('assets.id').as('count')); + + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + // Wait the throttle duration, ensuring a watch update for each insert + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + } + + await receivedUpdates; + expect(receivedUpdatesCount).equals(updatesCount); + }); + + it('watch inside throttle limits', async () => { + const abortController = new AbortController(); + + const updatesCount = 5; + let receivedUpdatesCount = 0; + + const onUpdate = () => { + receivedUpdatesCount++; + }; + + const query = db + .selectFrom('assets') + .innerJoin('customers', 'customers.id', 'assets.customer_id') + .select(db.fn.count('assets.id').as('count')); + + db.watch(query, { onResult: onUpdate }, { signal: abortController.signal, throttleMs: throttleDuration }); + + // Create the inserts as fast as possible + for (let updateCount = 0; updateCount < updatesCount; updateCount++) { + await db + .insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + abortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedUpdatesCount).equals(2); + }); + + it('should only watch tables inside query', async () => { + const assetsAbortController = new AbortController(); + + let receivedAssetsUpdatesCount = 0; + const onWatchAssets = () => { + receivedAssetsUpdatesCount++; + }; + + const queryAssets = db.selectFrom('assets').select(db.fn.count('assets.id').as('count')); + db.watch( + queryAssets, + { onResult: onWatchAssets }, + { + signal: assetsAbortController.signal + } + ); + + const customersAbortController = new AbortController(); + + let receivedCustomersUpdatesCount = 0; + const onWatchCustomers = () => { + receivedCustomersUpdatesCount++; + }; + + const queryCustomers = db.selectFrom('customers').select(db.fn.count('customers.id').as('count')); + + db.watch( + queryCustomers, + { onResult: onWatchCustomers }, + { + signal: customersAbortController.signal + } + ); + + // Ensures insert doesn't form part of initial result + await new Promise((resolve) => setTimeout(resolve, throttleDuration)); + + await db + .insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + + await new Promise((resolve) => setTimeout(resolve, throttleDuration * 2)); + assetsAbortController.abort(); + customersAbortController.abort(); + + // There should be one initial result plus one throttled result + expect(receivedAssetsUpdatesCount).equals(2); + + // Only the initial result should have yielded. + expect(receivedCustomersUpdatesCount).equals(1); + }); + + it('should handle watch onError', async () => { + const abortController = new AbortController(); + const onResult = () => {}; // no-op + let receivedErrorCount = 0; + + const receivedError = new Promise(async (resolve) => { + const onError = () => { + receivedErrorCount++; + resolve(); + }; + + const query = db.selectFrom('assets').select([ + () => { + const fullName = sql`fakeFunction()`; // Simulate an error with invalid function + return fullName.as('full_name'); + } + ]); + + db.watch(query, { onResult, onError }, { signal: abortController.signal, throttleMs: throttleDuration }); + }); + abortController.abort(); + + await receivedError; + expect(receivedErrorCount).equals(1); + }); + + it('should throttle watch overflow', async () => { + const overflowAbortController = new AbortController(); + const updatesCount = 25; + + let receivedWithManagedOverflowCount = 0; + const firstResultReceived = new Promise((resolve) => { + const onResultOverflow = () => { + if (receivedWithManagedOverflowCount === 0) { + resolve(); + } + receivedWithManagedOverflowCount++; + }; + + const query = db.selectFrom('assets').select(db.fn.count('assets.id').as('count')); + db.watch(query, { onResult: onResultOverflow }, { signal: overflowAbortController.signal, throttleMs: 1 }); + }); + + await firstResultReceived; + + // Perform a large number of inserts to trigger overflow + for (let i = 0; i < updatesCount; i++) { + db.insertInto('assets') + .values({ + id: sql`uuid()`, + make: 'test', + customer_id: sql`uuid()` + }) + .execute(); + } + + await new Promise((resolve) => setTimeout(resolve, 1 * throttleDuration)); + + overflowAbortController.abort(); + + // This fluctuates between 3 and 4 based on timing, but should never be 25 + expect(receivedWithManagedOverflowCount).greaterThan(2); + expect(receivedWithManagedOverflowCount).toBeLessThanOrEqual(4); + }); +});