Skip to content

Commit

Permalink
feat: added watch() to Drizzle and Kysely integrations (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chriztiaan authored Dec 2, 2024
1 parent a919243 commit 77a9ed2
Show file tree
Hide file tree
Showing 11 changed files with 702 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .changeset/calm-baboons-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/drizzle-driver': minor
---

Added `watch()` function to Drizzle wrapper to support watched queries. This function invokes `execute()` on the Drizzle query which improves support for complex queries such as those which are relational.
5 changes: 5 additions & 0 deletions .changeset/empty-chefs-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/kysely-driver': minor
---

Added `watch()` function to Kysely wrapper to support watched queries. This function invokes `execute()` on the Kysely query which improves support for complex queries and Kysely plugins.
5 changes: 5 additions & 0 deletions .changeset/gold-beers-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Added `compilableQueryWatch()` utility function which allows any compilable query to be watched.
55 changes: 55 additions & 0 deletions packages/common/src/client/compilableQueryWatch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { CompilableQuery } from './../types/types.js';
import { AbstractPowerSyncDatabase, SQLWatchOptions } from './AbstractPowerSyncDatabase.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';

export interface CompilableQueryWatchHandler<T> {
onResult: (results: T[]) => void;
onError?: (error: Error) => void;
}

export function compilableQueryWatch<T>(
db: AbstractPowerSyncDatabase,
query: CompilableQuery<T>,
handler: CompilableQueryWatchHandler<T>,
options?: SQLWatchOptions
): void {
const { onResult, onError = (e: Error) => {} } = handler ?? {};
if (!onResult) {
throw new Error('onResult is required');
}

const watchQuery = async (abortSignal: AbortSignal) => {
try {
const toSql = query.compile();
const resolvedTables = await db.resolveTables(toSql.sql, toSql.parameters as [], options);

// Fetch initial data
const result = await query.execute();
onResult(result);

db.onChangeWithCallback(
{
onChange: async () => {
try {
const result = await query.execute();
onResult(result);
} catch (error: any) {
onError(error);
}
},
onError
},
{
...(options ?? {}),
tables: resolvedTables,
// Override the abort signal since we intercept it
signal: abortSignal
}
);
} catch (error: any) {
onError(error);
}
};

runOnSchemaChange(watchQuery, db, options);
}
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './client/connection/PowerSyncBackendConnector.js';
export * from './client/connection/PowerSyncCredentials.js';
export * from './client/sync/bucket/BucketStorageAdapter.js';
export { runOnSchemaChange } from './client/runOnSchemaChange.js';
export { CompilableQueryWatchHandler, compilableQueryWatch } from './client/compilableQueryWatch.js';
export { UpdateType, CrudEntry, OpId } from './client/sync/bucket/CrudEntry.js';
export * from './client/sync/bucket/SqliteBucketStorage.js';
export * from './client/sync/bucket/CrudBatch.js';
Expand Down
4 changes: 2 additions & 2 deletions packages/drizzle-driver/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { wrapPowerSyncWithDrizzle, type PowerSyncSQLiteDatabase } from './sqlite/db';
import { wrapPowerSyncWithDrizzle, type DrizzleQuery, type PowerSyncSQLiteDatabase } from './sqlite/db';
import { toCompilableQuery } from './utils/compilableQuery';

export { wrapPowerSyncWithDrizzle, toCompilableQuery, PowerSyncSQLiteDatabase };
export { wrapPowerSyncWithDrizzle, toCompilableQuery, DrizzleQuery, PowerSyncSQLiteDatabase };
79 changes: 52 additions & 27 deletions packages/drizzle-driver/src/sqlite/db.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { AbstractPowerSyncDatabase, QueryResult } from '@powersync/common';
import {
AbstractPowerSyncDatabase,
compilableQueryWatch,
CompilableQueryWatchHandler,
QueryResult,
SQLWatchOptions
} from '@powersync/common';
import { Query } from 'drizzle-orm';
import { DefaultLogger } from 'drizzle-orm/logger';
import {
createTableRelationsHelpers,
Expand All @@ -11,42 +18,60 @@ import { SQLiteTransaction } from 'drizzle-orm/sqlite-core';
import { BaseSQLiteDatabase } from 'drizzle-orm/sqlite-core/db';
import { SQLiteAsyncDialect } from 'drizzle-orm/sqlite-core/dialect';
import type { DrizzleConfig } from 'drizzle-orm/utils';
import { toCompilableQuery } from './../utils/compilableQuery';
import { PowerSyncSQLiteSession, PowerSyncSQLiteTransactionConfig } from './sqlite-session';

export interface PowerSyncSQLiteDatabase<TSchema extends Record<string, unknown> = Record<string, never>>
extends BaseSQLiteDatabase<'async', QueryResult, TSchema> {
transaction<T>(
export type DrizzleQuery<T> = { toSQL(): Query; execute(): Promise<T> };

export class PowerSyncSQLiteDatabase<
TSchema extends Record<string, unknown> = Record<string, never>
> extends BaseSQLiteDatabase<'async', QueryResult, TSchema> {
private db: AbstractPowerSyncDatabase;

constructor(db: AbstractPowerSyncDatabase, config: DrizzleConfig<TSchema> = {}) {
const dialect = new SQLiteAsyncDialect({ casing: config.casing });
let logger;
if (config.logger === true) {
logger = new DefaultLogger();
} else if (config.logger !== false) {
logger = config.logger;
}

let schema: RelationalSchemaConfig<TablesRelationalConfig> | undefined;
if (config.schema) {
const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers);
schema = {
fullSchema: config.schema,
schema: tablesConfig.tables,
tableNamesMap: tablesConfig.tableNamesMap
};
}

const session = new PowerSyncSQLiteSession(db, dialect, schema, {
logger
});

super('async', dialect, session as any, schema as any);
this.db = db;
}

override transaction<T>(
transaction: (
tx: SQLiteTransaction<'async', QueryResult, TSchema, ExtractTablesWithRelations<TSchema>>
) => Promise<T>,
config?: PowerSyncSQLiteTransactionConfig
): Promise<T>;
): Promise<T> {
return super.transaction(transaction, config);
}

watch<T>(query: DrizzleQuery<T>, handler: CompilableQueryWatchHandler<T>, options?: SQLWatchOptions): void {
compilableQueryWatch(this.db, toCompilableQuery(query), handler, options);
}
}

export function wrapPowerSyncWithDrizzle<TSchema extends Record<string, unknown> = Record<string, never>>(
db: AbstractPowerSyncDatabase,
config: DrizzleConfig<TSchema> = {}
): PowerSyncSQLiteDatabase<TSchema> {
const dialect = new SQLiteAsyncDialect({casing: config.casing});
let logger;
if (config.logger === true) {
logger = new DefaultLogger();
} else if (config.logger !== false) {
logger = config.logger;
}

let schema: RelationalSchemaConfig<TablesRelationalConfig> | undefined;
if (config.schema) {
const tablesConfig = extractTablesRelationalConfig(config.schema, createTableRelationsHelpers);
schema = {
fullSchema: config.schema,
schema: tablesConfig.tables,
tableNamesMap: tablesConfig.tableNamesMap
};
}

const session = new PowerSyncSQLiteSession(db, dialect, schema, {
logger
});
return new BaseSQLiteDatabase('async', dialect, session, schema) as PowerSyncSQLiteDatabase<TSchema>;
return new PowerSyncSQLiteDatabase<TSchema>(db, config);
}
Loading

0 comments on commit 77a9ed2

Please sign in to comment.