Skip to content

Commit

Permalink
Merge pull request #102 from sqlitecloud/pub-sub
Browse files Browse the repository at this point in the history
feat(pubsub): initial implementation
  • Loading branch information
marcobambini authored Jul 12, 2024
2 parents 76b1bef + a59d2a8 commit cf44c29
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 3 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ We aim for full compatibility with the established [sqlite3 API](https://www.npm

The package is developed entirely in TypeScript and is fully compatible with JavaScript. It doesn't require any native libraries. This makes it a straightforward and effective tool for managing cloud-based databases in a familiar SQLite environment.

## Publish / Subscribe (Pub/Sub)

```ts
import { Database } from '@sqlitecloud/drivers'
import { PubSub, PUBSUB_ENTITY_TYPE } from '@sqlitecloud/drivers/lib/drivers/pubsub'

let database = new Database('sqlitecloud://user:[email protected]:8860/chinook.sqlite')
// or use sqlitecloud://xxx.sqlite.cloud:8860?apikey=xxxxxxx

const pubSub: PubSub = await database.getPubSub()

await pubSub.listen(PUBSUB_ENTITY_TYPE.TABLE, 'albums', (error, results, data) => {
if (results) {
// Changes on albums table will be received here as JSON object
console.log('Received message:', results)
}
})

await database.sql`INSERT INTO albums (Title, ArtistId) values ('Brand new song', 1)`

// Stop listening changes on the table
await pubSub.unlisten(PUBSUB_ENTITY_TYPE.TABLE, 'albums')
```

Pub/Sub is a messaging pattern that allows multiple applications to communicate with each other asynchronously. In the context of SQLiteCloud, Pub/Sub can be used to provide real-time updates and notifications to subscribed applications whenever data changes in the database or it can be used to send payloads (messages) to anyone subscribed to a channel.

Pub/Sub Documentation: [https://docs.sqlitecloud.io/docs/pub-sub](https://docs.sqlitecloud.io/docs/pub-sub)

## More

How do I deploy SQLite in the cloud?
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@sqlitecloud/drivers",
"version": "1.0.178",
"version": "1.0.193",
"description": "SQLiteCloud drivers for Typescript/Javascript in edge, web and node clients",
"main": "./lib/index.js",
"types": "./lib/index.d.ts",
Expand Down
3 changes: 2 additions & 1 deletion src/drivers/connection-tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {

if (this.processCallback) {
this.processCallback(error, result)
// this.processCallback = undefined
}

this.buffer = Buffer.alloc(0)
}

/** Disconnect immediately, release connection, no events. */
Expand Down
21 changes: 21 additions & 0 deletions src/drivers/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Statement } from './statement'
import { ErrorCallback, ResultsCallback, RowCallback, RowsCallback } from './types'
import EventEmitter from 'eventemitter3'
import { isBrowser } from './utilities'
import { PubSub } from './pubsub'

// Uses eventemitter3 instead of node events for browser compatibility
// https://github.com/primus/eventemitter3
Expand Down Expand Up @@ -483,4 +484,24 @@ export class Database extends EventEmitter {
})
})
}

/**
* PubSub class provides a Pub/Sub real-time updates and notifications system to
* allow multiple applications to communicate with each other asynchronously.
* It allows applications to subscribe to tables and receive notifications whenever
* data changes in the database table. It also enables sending messages to anyone
* subscribed to a specific channel.
* @returns {PubSub} A PubSub object
*/
public async getPubSub(): Promise<PubSub> {
return new Promise((resolve, reject) => {
this.getConnection((error, connection) => {
if (error || !connection) {
reject(error)
} else {
resolve(new PubSub(connection))
}
})
})
}
}
4 changes: 3 additions & 1 deletion src/drivers/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const CMD_COMPRESSED = '%'
export const CMD_COMMAND = '^'
export const CMD_ARRAY = '='
// const CMD_RAWJSON = '{'
// const CMD_PUBSUB = '|'
export const CMD_PUBSUB = '|'
// const CMD_RECONNECT = '@'

// To mark the end of the Rowset, the special string /LEN 0 0 0 is sent (LEN is always 6 in this case)
Expand Down Expand Up @@ -298,6 +298,8 @@ export function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCl
return popResults(buffer.subarray(spaceIndex + 1, commandEnd - 1).toString('utf8'))
case CMD_COMMAND:
return popResults(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8'))
case CMD_PUBSUB:
return popResults(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8'))
case CMD_JSON:
return popResults(JSON.parse(buffer.subarray(spaceIndex + 1, commandEnd).toString('utf8')))
case CMD_BLOB:
Expand Down
109 changes: 109 additions & 0 deletions src/drivers/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { SQLiteCloudConnection } from './connection'
import SQLiteCloudTlsConnection from './connection-tls'
import { PubSubCallback } from './types'

export enum PUBSUB_ENTITY_TYPE {
TABLE = 'TABLE',
CHANNEL = 'CHANNEL'
}

/**
* Pub/Sub class to receive changes on database tables or to send messages to channels.
*/
export class PubSub {
constructor(connection: SQLiteCloudConnection) {
this.connection = connection
this.connectionPubSub = new SQLiteCloudTlsConnection(connection.getConfig())
}

private connection: SQLiteCloudConnection
private connectionPubSub: SQLiteCloudConnection

/**
* Listen for a table or channel and start to receive messages to the provided callback.
* @param entityType One of TABLE or CHANNEL'
* @param entityName Name of the table or the channel
* @param callback Callback to be called when a message is received
* @param data Extra data to be passed to the callback
*/
public async listen(entityType: PUBSUB_ENTITY_TYPE, entityName: string, callback: PubSubCallback, data?: any): Promise<any> {
const entity = entityType === 'TABLE' ? 'TABLE ' : ''

const authCommand: string = await this.connection.sql(`LISTEN ${entity}${entityName};`)

return new Promise((resolve, reject) => {
this.connectionPubSub.sendCommands(authCommand, (error, results) => {
if (error) {
callback.call(this, error, null, data)
reject(error)
} else {
// skip results from pubSub auth command
if (results !== 'OK') {
callback.call(this, null, results, data)
}
resolve(results)
}
})
})
}

/**
* Stop receive messages from a table or channel.
* @param entityType One of TABLE or CHANNEL
* @param entityName Name of the table or the channel
*/
public async unlisten(entityType: string, entityName: string): Promise<any> {
const subject = entityType === 'TABLE' ? 'TABLE ' : ''

return this.connection.sql(`UNLISTEN ${subject}?;`, entityName)
}

/**
* Create a channel to send messages to.
* @param name Channel name
* @param failIfExists Raise an error if the channel already exists
*/
public async createChannel(name: string, failIfExists: boolean = true): Promise<any> {
let notExistsCommand = ''
if (!failIfExists) {
notExistsCommand = 'IF NOT EXISTS;'
}

return this.connection.sql(`CREATE CHANNEL ? ${notExistsCommand}`, name)
}

/**
* Send a message to the channel.
*/
public notifyChannel(channelName: string, message: string): Promise<any> {
return this.connection.sql`NOTIFY ${channelName} ${message};`
}

/**
* Ask the server to close the connection to the database and
* to keep only open the Pub/Sub connection.
* Only interaction with Pub/Sub commands will be allowed.
*/
public setPubSubOnly(): Promise<any> {
return new Promise((resolve, reject) => {
this.connection.sendCommands('PUBSUB ONLY;', (error, results) => {
if (error) {
reject(error)
} else {
this.connection.close()
resolve(results)
}
})
})
}

/** True if Pub/Sub connection is open. */
public connected(): boolean {
return this.connectionPubSub.connected
}

/** Close Pub/Sub connection. */
public close(): void {
this.connectionPubSub.close()
}
}
1 change: 1 addition & 0 deletions src/drivers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export type ResultsCallback<T = any> = (error: Error | null, results?: T) => voi
export type RowsCallback<T = Record<string, any>> = (error: Error | null, rows?: T[]) => void
export type RowCallback<T = Record<string, any>> = (error: Error | null, row?: T) => void
export type RowCountCallback = (error: Error | null, rowCount?: number) => void
export type PubSubCallback<T = any> = (error: Error | null, results?: T, extraData?: T) => void

/**
* Certain responses include arrays with various types of metadata.
Expand Down
Loading

0 comments on commit cf44c29

Please sign in to comment.