Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added watch() to Drizzle and Kysely integrations #414

Merged
merged 7 commits into from
Dec 2, 2024
Merged
5 changes: 5 additions & 0 deletions .changeset/calm-baboons-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/drizzle-driver': minor
---

Added `watch()` function to Drizzle wrapper to support watched queries. This function invokes `execute()` on the Drizzle query which improves support for complex queries such as those which are relational.
5 changes: 5 additions & 0 deletions .changeset/empty-chefs-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/kysely-driver': minor
---

Added `watch()` function to Kysely wrapper to support watched queries. This function invokes `execute()` on the Kysely query which improves support for complex queries and Kysely plugins.
5 changes: 5 additions & 0 deletions .changeset/gold-beers-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Added `compilableQueryWatch()` utility function which allows any compilable query to be watched.
55 changes: 55 additions & 0 deletions packages/common/src/client/compilableQueryWatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { CompilableQuery } from './../types/types.js';
import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';

export interface CompilableQueryWatchHandler<T> {
onResult: (results: T[]) => void;
onError?: (error: Error) => void;
}

export function compilableQueryWatch<T>(
db: AbstractPowerSyncDatabase,
query: CompilableQuery<T>,
handler: CompilableQueryWatchHandler<T>,
options?: SQLWatchOptions
): void {
const { onResult, onError = (e: Error) => {} } = handler ?? {};
if (!onResult) {
throw new Error('onResult is required');
}

const watchQuery = async (abortSignal: AbortSignal) => {
try {
const toSql = query.compile();
const resolvedTables = await db.resolveTables(toSql.sql, toSql.parameters as [], options);

// Fetch initial data
const result = await query.execute();
onResult(result);

db.onChangeWithCallback(
{
onChange: async () => {
try {
const result = await query.execute();
onResult(result);
} catch (error: any) {
onError(error);
}
},
onError
},
{
...(options ?? {}),
tables: resolvedTables,
// Override the abort signal since we intercept it
signal: abortSignal
}
);
} catch (error: any) {
onError(error);
}
};

runOnSchemaChange(watchQuery, db, options);
}
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './client/connection/PowerSyncBackendConnector.js';
export * from './client/connection/PowerSyncCredentials.js';
export * from './client/sync/bucket/BucketStorageAdapter.js';
export { runOnSchemaChange } from './client/runOnSchemaChange.js';
export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js';
export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js';
export * from './client/sync/bucket/SqliteBucketStorage.js';
export * from './client/sync/bucket/CrudBatch.js';
Expand Down
4 changes: 2 additions & 2 deletions packages/drizzle-driver/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { wrapPowerSyncWithDrizzle, type PowerSyncSQLiteDatabase } from './sqlite/db';
import { wrapPowerSyncWithDrizzle, type DrizzleQuery, type PowerSyncSQLiteDatabase } from './sqlite/db';
import { toCompilableQuery } from './utils/compilableQuery';

export { wrapPowerSyncWithDrizzle, toCompilableQuery, PowerSyncSQLiteDatabase };
export { wrapPowerSyncWithDrizzle, toCompilableQuery, DrizzleQuery, PowerSyncSQLiteDatabase };
79 changes: 52 additions & 27 deletions packages/drizzle-driver/src/sqlite/db.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { AbstractPowerSyncDatabase, QueryResult } from '@powersync/common';
import {
AbstractPowerSyncDatabase,
compilableQueryWatch,
CompilableQueryWatchHandler,
QueryResult,
SQLWatchOptions
} from '@powersync/common';
import { Query } from 'drizzle-orm';
import { DefaultLogger } from 'drizzle-orm/logger';
import {
createTableRelationsHelpers,
Expand All @@ -11,42 +18,60 @@ import { SQLiteTransaction } from 'drizzle-orm/sqlite-core';
import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db';
import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
import type { DrizzleConfig } from 'drizzle-orm/utils';
import { toCompilableQuery } from './../utils/compilableQuery';
import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session';

export interface PowerSyncSQLiteDatabase<TSchema extends Record<string, unknown> = Record<string, never>>
extends BaseSQLiteDatabase<'async', QueryResult, TSchema> {
transaction<T>(
export type DrizzleQuery<T> = { toSQL(): Query; execute(): Promise<T> };

export class PowerSyncSQLiteDatabase<
TSchema extends Record<string, unknown> = Record<string, never>
> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> {
private db: AbstractPowerSyncDatabase;

constructor(db: AbstractPowerSyncDatabase, config: DrizzleConfig<TSchema> = {}) {
const dialect = new SQLiteAsyncDialect({ casing: config.casing });
let logger;
if (config.logger === true) {
logger = new DefaultLogger();
} else if (config.logger !== false) {
logger = config.logger;
}

let schema: RelationalSchemaConfig<TablesRelationalConfig> | undefined;
if (config.schema) {
const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers);
schema = {
fullSchema: config.schema,
schema: tablesConfig.tables,
tableNamesMap: tablesConfig.tableNamesMap
};
}

const session = new PowerSyncSQLiteSession(db, dialect, schema, {
logger
});

super('async', dialect, session as any, schema as any);
this.db = db;
}

override transaction<T>(
transaction: (
tx: SQLiteTransaction<'async', QueryResult, TSchema, ExtractTablesWithRelations<TSchema>>
) => Promise<T>,
config?: PowerSyncSQLiteTransactionConfig
): Promise<T>;
): Promise<T> {
return super.transaction(transaction, config);
}

watch<T>(query: DrizzleQuery<T>, handler: CompilableQueryWatchHandler<T>, options?: SQLWatchOptions): void {
compilableQueryWatch(this.db, toCompilableQuery(query), handler, options);
}
}

export function wrapPowerSyncWithDrizzle<TSchema extends Record<string, unknown> = Record<string, never>>(
db: AbstractPowerSyncDatabase,
config: DrizzleConfig<TSchema> = {}
): PowerSyncSQLiteDatabase<TSchema> {
const dialect = new SQLiteAsyncDialect({casing: config.casing});
let logger;
if (config.logger === true) {
logger = new DefaultLogger();
} else if (config.logger !== false) {
logger = config.logger;
}

let schema: RelationalSchemaConfig<TablesRelationalConfig> | undefined;
if (config.schema) {
const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers);
schema = {
fullSchema: config.schema,
schema: tablesConfig.tables,
tableNamesMap: tablesConfig.tableNamesMap
};
}

const session = new PowerSyncSQLiteSession(db, dialect, schema, {
logger
});
return new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase<TSchema>;
return new PowerSyncSQLiteDatabase<TSchema>(db, config);
}
Loading