From 557a4e35cce1dff6dd99f66a94a1cae394100b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Wed, 20 Sep 2023 17:25:36 +0200 Subject: [PATCH 01/10] bi --- package.json | 1 - src/core/actions.ts | 9 +++++- src/core/bridge.ts | 11 +++++++- src/core/message.ts | 17 ++++++++++-- src/core/types.ts | 5 ++++ src/source/can.ts | 17 ++++++++++-- src/source/console.ts | 3 +- src/source/socketio.ts | 16 ++++++++++- src/source/source.ts | 8 ++++-- src/target/can.ts | 16 +++++++++-- src/target/socketio.ts | 11 +++++++- src/target/target.ts | 9 ++++++ yarn.lock | 63 ++---------------------------------------- 13 files changed, 110 insertions(+), 76 deletions(-) create mode 100644 src/core/types.ts diff --git a/package.json b/package.json index ea9e2b8..a191bf4 100644 --- a/package.json +++ b/package.json @@ -82,7 +82,6 @@ "eslint-plugin-import": "^2.27.5", "license-checker": "^25.0.1", "mocha": "^10.2.0", - "nodemon": "^2.0.22", "nyc": "^15.1.0", "prettier": "^2.8.7", "prettier-plugin-organize-imports": "^3.2.2", diff --git a/src/core/actions.ts b/src/core/actions.ts index 40603c1..2626c53 100644 --- a/src/core/actions.ts +++ b/src/core/actions.ts @@ -66,7 +66,11 @@ export async function stopVCAN(options: VCANOptions) { } function createSource(options: BridgeOptions) { - if (options.source === 'can') return new CANSource({name: options.sourceName ?? 'can2x'}) + if (options.source === 'can') + return new CANSource({ + name: options.sourceName ?? 'can2x', + bidirectional: true, // TODO: implement, docs + }) if (options.source === 'console') { assert.isDefined(options.sourceId, '--source-id undefined') @@ -103,6 +107,7 @@ function createSource(options: BridgeOptions) { port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', event: options.sourceEvent ?? 'can2x', + bidirectional: true, // TODO: implement, docs }) if (options.source === 'ws') @@ -118,6 +123,7 @@ function createTarget(options: BridgeOptions) { if (options.target === 'can') return new CANTarget({ name: options.targetName ?? 'can2x', + bidirectional: true, // TODO: implement, docs }) if (options.target === 'console') return new ConsoleTarget() @@ -149,6 +155,7 @@ function createTarget(options: BridgeOptions) { return new SocketIOTarget({ endpoint: options.targetEndpoint, event: options.targetEvent ?? 'can2x', + bidirectional: true, // TODO: implement, docs }) } diff --git a/src/core/bridge.ts b/src/core/bridge.ts index 8062dd2..b1a20f3 100644 --- a/src/core/bridge.ts +++ b/src/core/bridge.ts @@ -1,6 +1,7 @@ import {Source} from '#/source/source' import {Target} from '#/target/target' import * as assert from '#assert' +import {validateMessage} from '#core/message' import std from '#std' import hae from '#utils/hae' @@ -29,7 +30,7 @@ export class Bridge { std.log('bridge started') await this.source.receive( hae.log(async message => { - std.log('bridging', {message}) + std.log('bridging forward', {message}) assert.isNumber(message.id) assert.isNumbers(message.data) @@ -38,6 +39,14 @@ export class Bridge { if (!this.source.continuous) await this.stop() }) ) + + await this.target.receive( + hae.log(async message => { + std.log('bridging backward', {message}) + validateMessage(message) + await this.source.send(message) + }) + ) } async stop() { diff --git a/src/core/message.ts b/src/core/message.ts index 6808643..977b32a 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -1,9 +1,20 @@ +import * as assert from '#assert' +import {Message as CANMessage} from '*can.node' + export type Message = { id: number data: number[] } -export type BufferMessage = { - id: number - data: Buffer +export function fromCAN(message: CANMessage): Message { + return {id: message.id, data: Array.from(message.data)} +} + +export function toCAN(message: Message): CANMessage { + return {id: message.id, data: Buffer.from(message.data), ext: false, rtr: false} +} + +export function validateMessage(message: Message) { + assert.isNumber(message.id) + assert.isNumbers(message.data) } diff --git a/src/core/types.ts b/src/core/types.ts new file mode 100644 index 0000000..472fcb3 --- /dev/null +++ b/src/core/types.ts @@ -0,0 +1,5 @@ +import {Message} from '#core/message' + +export type Processor = (message: Message) => Promise + +// TODO: merge this message.ts? diff --git a/src/source/can.ts b/src/source/can.ts index 703e9e2..61c4999 100644 --- a/src/source/can.ts +++ b/src/source/can.ts @@ -1,11 +1,13 @@ import {Source} from '#/source/source' import * as check from '#check' +import {Message, fromCAN, toCAN} from '#core/message' import std from '#std' import {Message as CANMessage, RawChannel} from '*can.node' import * as can from 'socketcan' export type CANSourceOptions = { name: string + bidirectional: boolean } export class CANSource extends Source { @@ -20,13 +22,14 @@ export class CANSource extends Source { async start() { std.log('starting can source', {options: this.options}) - this.source = can.createRawChannel(this.options.name) + // TODO: non_block_send: false!? + this.source = can.createRawChannelWithOptions(this.options.name, {non_block_send: true}) this.source.start() this.source.addListener('onMessage', (message: CANMessage) => { std.log('can source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor({id: message.id, data: Array.from(message.data)}) + this.processor(fromCAN(message)) }) this.readyPromise.resolve() @@ -39,4 +42,14 @@ export class CANSource extends Source { this.source.stop() std.log('can source stopped') } + + async send(message: Message) { + std.log('sending can source') + if (!this.options.bidirectional) return std.log('can source not bidirectional') + + if (check.isUndefined(this.source)) return std.log('can source not defined') + this.source.send(toCAN(message)) + + std.log('can source sent') + } } diff --git a/src/source/console.ts b/src/source/console.ts index c828a30..49f4619 100644 --- a/src/source/console.ts +++ b/src/source/console.ts @@ -1,5 +1,6 @@ -import {Processor, Source} from '#/source/source' +import {Source} from '#/source/source' import {Message} from '#core/message' +import {Processor} from '#core/types' import std from '#std' export type ConsoleSourceOptions = Message diff --git a/src/source/socketio.ts b/src/source/socketio.ts index 31b5b18..2bb08b2 100644 --- a/src/source/socketio.ts +++ b/src/source/socketio.ts @@ -9,12 +9,14 @@ export type SocketIOSourceOptions = { port: number host: string event: string + bidirectional: boolean } export class SocketIOSource extends Source { io?: SocketIO.Server server?: http.Server options: SocketIOSourceOptions + socket?: SocketIO.Socket constructor(options: SocketIOSourceOptions) { super() @@ -30,7 +32,8 @@ export class SocketIOSource extends Source { this.io.on('connection', socket => { std.log(`socketio source connected`, {id: socket.id}) - socket.on(this.options.event, (message: Message) => { + this.socket = socket + this.socket.on(this.options.event, (message: Message) => { std.log('socketio source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') this.processor(message) @@ -47,6 +50,17 @@ export class SocketIOSource extends Source { }) } + async send(message: Message) { + std.log('sending socketio source') + if (!this.options.bidirectional) return std.log('socketio source not bidirectional') + + // TODO: broadcast? + if (check.isUndefined(this.socket)) return std.log('socketio socket not defined') + this.socket.emit(this.options.event, message) + + std.log('socketio source sent') + } + async stop() { std.log('stopping socketio source') await this.stopServer() diff --git a/src/source/source.ts b/src/source/source.ts index a78bf3f..176258e 100644 --- a/src/source/source.ts +++ b/src/source/source.ts @@ -1,10 +1,10 @@ import {Message} from '#core/message' +import {Processor} from '#core/types' import * as utils from '#utils' -export type Processor = (message: Message) => Promise - export abstract class Source { processor?: Processor + // TODO: remove this? options = {} protected readyPromise @@ -29,5 +29,9 @@ export abstract class Source { this.processor = processor } + async send(message: Message): Promise { + throw new Error(`Not Implemented`) + } + continuous = true } diff --git a/src/target/can.ts b/src/target/can.ts index 2591596..0c6ea60 100644 --- a/src/target/can.ts +++ b/src/target/can.ts @@ -1,13 +1,14 @@ import {Target} from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {Message} from '#core/message' +import {fromCAN, Message, toCAN} from '#core/message' import std from '#std' -import {RawChannel} from '*can.node' +import {Message as CANMessage, RawChannel} from '*can.node' import * as can from 'socketcan' export type CANTargetOptions = { name: string + bidirectional: boolean } export class CANTarget extends Target { @@ -24,6 +25,15 @@ export class CANTarget extends Target { this.target = can.createRawChannel(this.options.name) // TODO: does this have a site-effect on the os? this.target.start() + + if (this.options.bidirectional) { + this.target.addListener('onMessage', (message: CANMessage) => { + std.log('can source received', {message}) + if (check.isUndefined(this.processor)) return std.log('no processor defined') + this.processor(fromCAN(message)) + }) + } + this.readyPromise.resolve() std.log('can target started') } @@ -31,7 +41,7 @@ export class CANTarget extends Target { async send(message: Message) { std.log('can target sending', {message}) assert.isDefined(this.target, 'can target not started') - this.target.send({ext: false, rtr: false, id: message.id, data: Buffer.from(message.data)}) + this.target.send(toCAN(message)) std.log('can target sent') } diff --git a/src/target/socketio.ts b/src/target/socketio.ts index 39c446c..fc251e5 100644 --- a/src/target/socketio.ts +++ b/src/target/socketio.ts @@ -8,6 +8,7 @@ import SocketIOClient, {Socket} from 'socket.io-client' export type SocketIOTargetOptions = { endpoint: string event: string + bidirectional: boolean } export class SocketIOTarget extends Target { @@ -27,7 +28,7 @@ export class SocketIOTarget extends Target { this.target.on('connect', () => { std.log(`socketio target connected`, {id: this.target!.id}) this.readyPromise.resolve() - std.log('websocket target started') + std.log('socketio target started') }) this.target.on('connect_error', error => { @@ -41,6 +42,14 @@ export class SocketIOTarget extends Target { this.target.on('disconnect', reason => { std.log(`socketio target disconnected`, {reason}) }) + + if (this.options.bidirectional) { + this.target.on(this.options.event, (message: Message) => { + std.log('socketio target received') + if (check.isUndefined(this.processor)) return std.log('no processor defined') + this.processor(message) + }) + } } async send(message: Message) { diff --git a/src/target/target.ts b/src/target/target.ts index 7aad0bd..4a2de2f 100644 --- a/src/target/target.ts +++ b/src/target/target.ts @@ -1,7 +1,12 @@ import {Message} from '#core/message' +import {Processor} from '#core/types' import * as utils from '#utils' +// TODO: merge this class with source? + export abstract class Target { + processor?: Processor + protected readyPromise protected constructor() { this.readyPromise = utils.createDecomposedPromise() @@ -20,4 +25,8 @@ export abstract class Target { } abstract send(message: Message): Promise + + async receive(processor: Processor) { + this.processor = processor + } } diff --git a/yarn.lock b/yarn.lock index 4f59423..4588f03 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1138,7 +1138,7 @@ check-error@^1.0.2: resolved "https://registry.yarnpkg.com/check-error/-/check-error-1.0.2.tgz#574d312edd88bb5dd8912e9286dd6c0aed4aac82" integrity sha512-BrgHpW9NURQgzoNyjfq0Wu6VFO6D7IZEmJNdtgNqpzGG8RuNFHt2jQxWlAs4HMe119chBnv+34syEZtc6IhLtA== -chokidar@3.5.3, chokidar@^3.5.2, chokidar@^3.5.3: +chokidar@3.5.3, chokidar@^3.5.3: version "3.5.3" resolved "https://registry.yarnpkg.com/chokidar/-/chokidar-3.5.3.tgz#1cf37c8707b932bd1af1ae22c0432e2acd1903bd" integrity sha512-Dr3sfKRP6oTcjf2JmUmFJfeVMvXBdegxB0iVQ5eb2V10uFJUCAS8OByZdVAyVb8xXNz3GjjTgj9kLWsZTqE6kw== @@ -2387,11 +2387,6 @@ ieee754@^1.1.13, ieee754@^1.2.1: resolved "https://registry.yarnpkg.com/ieee754/-/ieee754-1.2.1.tgz#8eb7a10a63fff25d15a57b001586d177d1b0d352" integrity sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA== -ignore-by-default@^1.0.1: - version "1.0.1" - resolved "https://registry.yarnpkg.com/ignore-by-default/-/ignore-by-default-1.0.1.tgz#48ca6d72f6c6a3af00a9ad4ae6876be3889e2b09" - integrity sha512-Ius2VYcGNk7T90CppJqcIkS5ooHUZyIQK+ClZfMfMNFEF9VSE73Fq+906u/CWu92x4gzZMWOwfFYckPObzdEbA== - ignore@^5.2.0: version "5.2.4" resolved "https://registry.yarnpkg.com/ignore/-/ignore-5.2.4.tgz#a291c0c6178ff1b960befe47fcdec301674a6324" @@ -3209,22 +3204,6 @@ node-releases@^2.0.13: resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.13.tgz#d5ed1627c23e3461e819b02e57b75e4899b1c81d" integrity sha512-uYr7J37ae/ORWdZeQ1xxMJe3NtdmqMC/JZK+geofDrkLUApKRHPd18/TxtBOJ4A0/+uUIliorNrfYV6s1b02eQ== -nodemon@^2.0.22: - version "2.0.22" - resolved "https://registry.yarnpkg.com/nodemon/-/nodemon-2.0.22.tgz#182c45c3a78da486f673d6c1702e00728daf5258" - integrity sha512-B8YqaKMmyuCO7BowF1Z1/mkPqLk6cs/l63Ojtd6otKjMx47Dq1utxfRxcavH1I7VSaL8n5BUaoutadnsX3AAVQ== - dependencies: - chokidar "^3.5.2" - debug "^3.2.7" - ignore-by-default "^1.0.1" - minimatch "^3.1.2" - pstree.remy "^1.1.8" - semver "^5.7.1" - simple-update-notifier "^1.0.7" - supports-color "^5.5.0" - touch "^3.1.0" - undefsafe "^2.0.5" - nopt@^4.0.1: version "4.0.3" resolved "https://registry.yarnpkg.com/nopt/-/nopt-4.0.3.tgz#a375cad9d02fd921278d954c2254d5aa57e15e48" @@ -3240,13 +3219,6 @@ nopt@^6.0.0: dependencies: abbrev "^1.0.0" -nopt@~1.0.10: - version "1.0.10" - resolved "https://registry.yarnpkg.com/nopt/-/nopt-1.0.10.tgz#6ddd21bd2a31417b92727dd585f8a6f37608ebee" - integrity sha512-NWmpvLSqUrgrAC9HCuxEvb+PSloHpqVu+FqcO4eeF2h5qYRhA7ev6KvelyQAKtegUbC6RypJnlEOhd8vloNKYg== - dependencies: - abbrev "1" - normalize-package-data@^2.0.0: version "2.5.0" resolved "https://registry.yarnpkg.com/normalize-package-data/-/normalize-package-data-2.5.0.tgz#e66db1838b200c1dfc233225d12cb36520e234a8" @@ -3613,11 +3585,6 @@ proxy-addr@~2.0.7: forwarded "0.2.0" ipaddr.js "1.9.1" -pstree.remy@^1.1.8: - version "1.1.8" - resolved "https://registry.yarnpkg.com/pstree.remy/-/pstree.remy-1.1.8.tgz#c242224f4a67c21f686839bbdb4ac282b8373d3a" - integrity sha512-77DZwxQmxKnu3aR542U+X8FypNzbfJ+C5XQDk3uWjWxn6151aIMGthWYRXTqT1E5oJvg+ljaa2OJi+VfvCOQ8w== - punycode@^2.1.0: version "2.3.0" resolved "https://registry.yarnpkg.com/punycode/-/punycode-2.3.0.tgz#f67fa67c94da8f4d0cfff981aee4118064199b8f" @@ -3851,7 +3818,7 @@ sax@>=0.6.0: resolved "https://registry.yarnpkg.com/sax/-/sax-1.2.4.tgz#2816234e2378bddc4e5354fab5caa895df7100d9" integrity sha512-NqVDv9TpANUjFm0N8uM5GxL36UgKi9/atZw+x7YFnQ8ckwFGKrl4xX4yWtrey3UJm5nP1kUbnYgLopqWNSRhWw== -"semver@2 || 3 || 4 || 5", semver@^5.5.0, semver@^5.7.1: +"semver@2 || 3 || 4 || 5", semver@^5.5.0: version "5.7.2" resolved "https://registry.yarnpkg.com/semver/-/semver-5.7.2.tgz#48d55db737c3287cd4835e17fa13feace1c41ef8" integrity sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g== @@ -3868,11 +3835,6 @@ semver@^7.3.5, semver@^7.3.7, semver@^7.5.3: dependencies: lru-cache "^6.0.0" -semver@~7.0.0: - version "7.0.0" - resolved "https://registry.yarnpkg.com/semver/-/semver-7.0.0.tgz#5f3ca35761e47e05b206c6daff2cf814f0316b8e" - integrity sha512-+GB6zVA9LWh6zovYQLALHwv5rb2PHGlJi3lfiqIHxR0uuwCgefcOJc59v9fv1w8GbStwxuuqqAjI9NMAOOgq1A== - send@0.18.0: version "0.18.0" resolved "https://registry.yarnpkg.com/send/-/send-0.18.0.tgz#670167cc654b05f5aa4a767f9113bb371bc706be" @@ -3950,13 +3912,6 @@ signal-exit@^4.0.1: resolved "https://registry.yarnpkg.com/signal-exit/-/signal-exit-4.1.0.tgz#952188c1cbd546070e2dd20d0f41c0ae0530cb04" integrity sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw== -simple-update-notifier@^1.0.7: - version "1.1.0" - resolved "https://registry.yarnpkg.com/simple-update-notifier/-/simple-update-notifier-1.1.0.tgz#67694c121de354af592b347cdba798463ed49c82" - integrity sha512-VpsrsJSUcJEseSbMHkrsrAVSdvVS5I96Qo1QAQ4FxQ9wXFcB+pjj7FB7/us9+GcgfW4ziHtYMc1J0PLczb55mg== - dependencies: - semver "~7.0.0" - slash@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/slash/-/slash-3.0.0.tgz#6539be870c165adbd5240220dbe361f1bc4d4634" @@ -4230,7 +4185,7 @@ supports-color@8.1.1: dependencies: has-flag "^4.0.0" -supports-color@^5.3.0, supports-color@^5.5.0: +supports-color@^5.3.0: version "5.5.0" resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-5.5.0.tgz#e2e69a44ac8772f78a1ec0b35b689df6530efc8f" integrity sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow== @@ -4297,13 +4252,6 @@ toidentifier@1.0.1: resolved "https://registry.yarnpkg.com/toidentifier/-/toidentifier-1.0.1.tgz#3be34321a88a820ed1bd80dfaa33e479fbb8dd35" integrity sha512-o5sSPKEkg/DIQNmH43V0/uerLrpzVedkUh8tGNvaeXpfpuwjKenlSox/2O/BTlZUtEe+JG7s5YhEz608PlAHRA== -touch@^3.1.0: - version "3.1.0" - resolved "https://registry.yarnpkg.com/touch/-/touch-3.1.0.tgz#fe365f5f75ec9ed4e56825e0bb76d24ab74af83b" - integrity sha512-WBx8Uy5TLtOSRtIq+M03/sKDrXCLHxwDcquSP2c43Le03/9serjQBIztjRz6FkJez9D/hleyAXTBGLwwZUw9lA== - dependencies: - nopt "~1.0.10" - tr46@~0.0.3: version "0.0.3" resolved "https://registry.yarnpkg.com/tr46/-/tr46-0.0.3.tgz#8184fd347dac9cdc185992f3a6622e14b9d9ab6a" @@ -4472,11 +4420,6 @@ unbox-primitive@^1.0.2: has-symbols "^1.0.3" which-boxed-primitive "^1.0.2" -undefsafe@^2.0.5: - version "2.0.5" - resolved "https://registry.yarnpkg.com/undefsafe/-/undefsafe-2.0.5.tgz#38733b9327bdcd226db889fb723a6efd162e6e2c" - integrity sha512-WxONCrssBM8TSPRqN5EmsjVrsv4A8X12J4ArBiiayv3DyyG3ZlIg6yysuuSYdZsVz3TKcTg2fd//Ujd4CHV1iA== - unique-filename@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/unique-filename/-/unique-filename-3.0.0.tgz#48ba7a5a16849f5080d26c760c86cf5cf05770ea" From 82d082c4c2f133ce3a8b51f4323f57b811f291f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Wed, 20 Sep 2023 18:57:47 +0200 Subject: [PATCH 02/10] bi ws --- src/core/actions.ts | 14 ++++++++++---- src/core/message.ts | 26 ++++++++++++++++++++++++++ src/index.ts | 2 ++ src/source/mqtt.ts | 11 +++++++++-- src/source/ws.ts | 22 ++++++++++++++++++---- src/target/mqtt.ts | 11 ++++++++++- src/target/ws.ts | 13 +++++++++++-- src/utils/assert.ts | 4 ++++ src/utils/check.ts | 4 ++++ 9 files changed, 94 insertions(+), 13 deletions(-) diff --git a/src/core/actions.ts b/src/core/actions.ts index 2626c53..3b583a3 100644 --- a/src/core/actions.ts +++ b/src/core/actions.ts @@ -27,12 +27,14 @@ export type BridgeOptions = { sourceId?: string sourceData?: string[] sourceFile?: string + sourceBidirectional?: boolean target?: string targetEndpoint?: string targetEvent?: string targetTopic?: string targetName?: string targetFile?: string + targetBidirectional?: boolean } export async function startBridge(options: BridgeOptions) { @@ -69,7 +71,7 @@ function createSource(options: BridgeOptions) { if (options.source === 'can') return new CANSource({ name: options.sourceName ?? 'can2x', - bidirectional: true, // TODO: implement, docs + bidirectional: options.sourceBidirectional ?? true, // TODO: docs }) if (options.source === 'console') { @@ -100,6 +102,7 @@ function createSource(options: BridgeOptions) { port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', topic: options.sourceTopic ?? 'can2x', + bidirectional: options.sourceBidirectional ?? true, // TODO: docs }) if (options.source === 'socketio') @@ -107,13 +110,14 @@ function createSource(options: BridgeOptions) { port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', event: options.sourceEvent ?? 'can2x', - bidirectional: true, // TODO: implement, docs + bidirectional: options.sourceBidirectional ?? true, // TODO: docs }) if (options.source === 'ws') return new WSSource({ port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', + bidirectional: options.sourceBidirectional ?? true, // TODO: docs }) throw new Error(`Source of type "${options.source}" unknown`) @@ -123,7 +127,7 @@ function createTarget(options: BridgeOptions) { if (options.target === 'can') return new CANTarget({ name: options.targetName ?? 'can2x', - bidirectional: true, // TODO: implement, docs + bidirectional: options.targetBidirectional ?? true, // TODO: docs }) if (options.target === 'console') return new ConsoleTarget() @@ -147,6 +151,7 @@ function createTarget(options: BridgeOptions) { return new MQTTTarget({ endpoint: options.targetEndpoint, topic: options.targetTopic ?? 'can2x', + bidirectional: options.targetBidirectional ?? true, // TODO: docs }) } @@ -155,7 +160,7 @@ function createTarget(options: BridgeOptions) { return new SocketIOTarget({ endpoint: options.targetEndpoint, event: options.targetEvent ?? 'can2x', - bidirectional: true, // TODO: implement, docs + bidirectional: options.targetBidirectional ?? true, // TODO: docs }) } @@ -163,6 +168,7 @@ function createTarget(options: BridgeOptions) { assert.isDefined(options.targetEndpoint, '--target-endpoint must be defined') return new WSTarget({ endpoint: options.targetEndpoint, + bidirectional: options.targetBidirectional ?? true, // TODO: docs }) } diff --git a/src/core/message.ts b/src/core/message.ts index 977b32a..499c38d 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -6,6 +6,25 @@ export type Message = { data: number[] } +export function fromJSON(message: Message) { + // TODO + return message +} + +export function toJSON(message: Message) { + // TODO + return message +} + +export function fromString(message: string): Message { + assert.isString(message) + return JSON.parse(message) +} + +export function toString(message: Message) { + return JSON.stringify(message) +} + export function fromCAN(message: CANMessage): Message { return {id: message.id, data: Array.from(message.data)} } @@ -18,3 +37,10 @@ export function validateMessage(message: Message) { assert.isNumber(message.id) assert.isNumbers(message.data) } + +export function fromBuffer(message: ArrayBuffer | ArrayBuffer[]) { + assert.isBuffer(message) + return fromString(new TextDecoder().decode(message)) +} + +// TODO: class Message diff --git a/src/index.ts b/src/index.ts index dd12c45..c3117df 100644 --- a/src/index.ts +++ b/src/index.ts @@ -30,6 +30,7 @@ bridge .option('--source-name [string]', '', 'can2x') .option('--source-id [number]', '') .option('--source-data [numbers...]', '') + .option('--source-bidirectional [boolean]', '', true) .addOption( new Option('--target [string]', '') .default('console') @@ -40,6 +41,7 @@ bridge .option('--target-topic [string]', '', 'can2x') .option('--target-name [string]', '', 'can2x') .option('--target-file [string]', '') + .option('--target-bidirectional [boolean]', '', true) .action( hae.exit(async options => { await actions.startBridge(options) diff --git a/src/source/mqtt.ts b/src/source/mqtt.ts index 3db363f..112640d 100644 --- a/src/source/mqtt.ts +++ b/src/source/mqtt.ts @@ -1,5 +1,5 @@ import {Source} from '#/source/source' -import {Message} from '#core/message' +import {fromString} from '#core/message' import std from '#std' import * as check from '#utils/check' import Aedes from 'aedes' @@ -9,6 +9,7 @@ export type MQTTSourceOptions = { port: number host: string topic: string + bidirectional: boolean // TODO } export class MQTTSource extends Source { @@ -53,7 +54,7 @@ export class MQTTSource extends Source { if (topic !== this.options.topic) return std.log('topic unknown', {topic}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(JSON.parse(message) as Message) + this.processor(fromString(message)) }) this.server.listen({port: this.options.port, host: this.options.host}, () => { @@ -66,6 +67,12 @@ export class MQTTSource extends Source { }) } + // TODO: send + /** + async send(message: Message) { + } + **/ + async stop() { std.log('stopping mqtt source') diff --git a/src/source/ws.ts b/src/source/ws.ts index 987164e..47fbd3c 100644 --- a/src/source/ws.ts +++ b/src/source/ws.ts @@ -1,18 +1,20 @@ import {Source} from '#/source/source' -import {Message} from '#core/message' +import {Message, toString} from '#core/message' import std from '#std' import * as check from '#utils/check' import http from 'http' -import * as ws from 'ws' +import WebSocket, * as ws from 'ws' export type WSSourceOptions = { port: number host: string + bidirectional: boolean } export class WSSource extends Source { server?: http.Server options: WSSourceOptions + ws?: WebSocket constructor(options: WSSourceOptions) { super() @@ -30,11 +32,13 @@ export class WSSource extends Source { wss.on('connection', ws => { std.log('websocket source connected') - ws.on('error', error => { + this.ws = ws + + this.ws.on('error', error => { std.log('websocket source error', {error}) }) - ws.on('message', (message: string) => { + this.ws.on('message', (message: string) => { std.log('websocket source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') this.processor(JSON.parse(message) as Message) @@ -57,6 +61,16 @@ export class WSSource extends Source { std.log('websocket source stopped') } + async send(message: Message) { + std.log('sending can source') + if (!this.options.bidirectional) return std.log('websocket source not bidirectional') + + if (check.isUndefined(this.ws)) return std.log('websocket source not defined') + this.ws.send(toString(message)) + + std.log('can source sent') + } + private async stopServer() { if (check.isUndefined(this.server)) return std.log('websocket http server not defined') const server = this.server diff --git a/src/target/mqtt.ts b/src/target/mqtt.ts index d705c66..fa0fe6f 100644 --- a/src/target/mqtt.ts +++ b/src/target/mqtt.ts @@ -1,13 +1,14 @@ import {Target} from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {Message} from '#core/message' +import {fromString, Message} from '#core/message' import std from '#std' import * as mqtt from 'mqtt' export type MQTTTargetOptions = { endpoint: string topic: string + bidirectional: boolean } export class MQTTTarget extends Target { @@ -48,6 +49,14 @@ export class MQTTTarget extends Target { std.log(`mqtt target ended`) }) + if (this.options.bidirectional) { + this.target.on('message', (message: string) => { + std.log('mqtt target received', {message}) + if (check.isUndefined(this.processor)) return std.log('no processor defined') + this.processor(fromString(message)) + }) + } + this.readyPromise.resolve() std.log('mqtt target started') } diff --git a/src/target/ws.ts b/src/target/ws.ts index b95c319..efad762 100644 --- a/src/target/ws.ts +++ b/src/target/ws.ts @@ -1,12 +1,13 @@ import {Target} from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {Message} from '#core/message' +import {fromBuffer, Message, toString} from '#core/message' import std from '#std' import WebSocket from 'ws' export type WSTargetOptions = { endpoint: string + bidirectional: boolean } export class WSTarget extends Target { @@ -36,12 +37,20 @@ export class WSTarget extends Target { this.target.on('close', reason => { std.log(`websocket target closed`, {reason}) }) + + if (this.options.bidirectional) { + this.target.on('message', message => { + std.log('websocket target received', {message}) + if (check.isUndefined(this.processor)) return std.log('no processor defined') + this.processor(fromBuffer(message)) + }) + } } async send(message: Message) { std.log('websocket target sending', {message}) assert.isDefined(this.target, 'websocket target not started') - this.target.send(JSON.stringify(message)) + this.target.send(toString(message)) std.log('websocket target sent') } diff --git a/src/utils/assert.ts b/src/utils/assert.ts index b0c46d9..a36bece 100644 --- a/src/utils/assert.ts +++ b/src/utils/assert.ts @@ -47,3 +47,7 @@ export function isName(name: string) { if (!check.isName(name)) throw new Error(`Name "${name}" not allowed. Only small characters, numbers, hyphens, and dots are allowed.`) } + +export function isBuffer(element: unknown): asserts element is Buffer { + if (!check.isBuffer(element)) throw new Error(`Element "${element} is not a buffer`) +} diff --git a/src/utils/check.ts b/src/utils/check.ts index 86c2ec3..0d7c39f 100644 --- a/src/utils/check.ts +++ b/src/utils/check.ts @@ -37,3 +37,7 @@ export function isObject(element: unknown): element is object { export function isName(name: string) { return name.match(/^[a-z-0-9.]+$/) } + +export function isBuffer(element: unknown): element is Buffer { + return Buffer.isBuffer(element) +} From 7b9bcce5356b216d2e25bcfe968281c6989deabe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Wed, 20 Sep 2023 19:18:03 +0200 Subject: [PATCH 03/10] bi ws and mqtt --- src/core/message.ts | 7 ++++++- src/source/mqtt.ts | 28 +++++++++++++++++++++++----- src/target/mqtt.ts | 13 ++++++++----- src/target/ws.ts | 4 ++-- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/core/message.ts b/src/core/message.ts index 499c38d..581292d 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -38,9 +38,14 @@ export function validateMessage(message: Message) { assert.isNumbers(message.data) } -export function fromBuffer(message: ArrayBuffer | ArrayBuffer[]) { +export function fromArrayBuffer(message: ArrayBuffer | ArrayBuffer[]) { assert.isBuffer(message) return fromString(new TextDecoder().decode(message)) } +export function fromBuffer(message: Buffer) { + assert.isBuffer(message) + return fromString(message.toString('utf-8')) +} + // TODO: class Message diff --git a/src/source/mqtt.ts b/src/source/mqtt.ts index 112640d..27e09c8 100644 --- a/src/source/mqtt.ts +++ b/src/source/mqtt.ts @@ -1,5 +1,5 @@ import {Source} from '#/source/source' -import {fromString} from '#core/message' +import {fromString, Message, toString} from '#core/message' import std from '#std' import * as check from '#utils/check' import Aedes from 'aedes' @@ -9,7 +9,7 @@ export type MQTTSourceOptions = { port: number host: string topic: string - bidirectional: boolean // TODO + bidirectional: boolean } export class MQTTSource extends Source { @@ -67,11 +67,29 @@ export class MQTTSource extends Source { }) } - // TODO: send - /** async send(message: Message) { + std.log('sending mqtt source') + if (!this.options.bidirectional) return std.log('mqtt source not bidirectional') + + return new Promise((resolve, reject) => { + if (check.isUndefined(this.aedes)) return std.log('mqtt source not defined') + this.aedes.publish( + { + qos: 0, + cmd: 'publish', + dup: false, + payload: toString(message), + retain: false, + topic: this.options.topic, + }, + error => { + if (check.isDefined(error)) return reject(error) + std.log('mqtt source sent') + return resolve() + } + ) + }) } - **/ async stop() { std.log('stopping mqtt source') diff --git a/src/target/mqtt.ts b/src/target/mqtt.ts index fa0fe6f..1bf19b0 100644 --- a/src/target/mqtt.ts +++ b/src/target/mqtt.ts @@ -1,7 +1,7 @@ import {Target} from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {fromString, Message} from '#core/message' +import {fromBuffer, Message, toString} from '#core/message' import std from '#std' import * as mqtt from 'mqtt' @@ -50,10 +50,13 @@ export class MQTTTarget extends Target { }) if (this.options.bidirectional) { - this.target.on('message', (message: string) => { - std.log('mqtt target received', {message}) + this.target.on('message', (topic, message) => { + std.log('mqtt target received', {message, topic}) + if (topic.startsWith('$SYS')) return + if (topic !== this.options.topic) return std.log('topic unknown', {topic}) + if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromString(message)) + this.processor(fromBuffer(message)) }) } @@ -64,7 +67,7 @@ export class MQTTTarget extends Target { async send(message: Message) { std.log('mqtt target publishing', {message}) assert.isDefined(this.target, 'mqtt target not started') - await this.target.publishAsync(this.options.topic, JSON.stringify(message)) + await this.target.publishAsync(this.options.topic, toString(message)) std.log('mqtt target published') } diff --git a/src/target/ws.ts b/src/target/ws.ts index efad762..598587f 100644 --- a/src/target/ws.ts +++ b/src/target/ws.ts @@ -1,7 +1,7 @@ import {Target} from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {fromBuffer, Message, toString} from '#core/message' +import {fromArrayBuffer, Message, toString} from '#core/message' import std from '#std' import WebSocket from 'ws' @@ -42,7 +42,7 @@ export class WSTarget extends Target { this.target.on('message', message => { std.log('websocket target received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromBuffer(message)) + this.processor(fromArrayBuffer(message)) }) } } From 12bd06073182a7b2ef74ba11d4ee1d994e14018b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Wed, 20 Sep 2023 19:46:45 +0200 Subject: [PATCH 04/10] Message --- src/core/bridge.ts | 11 +---- src/core/message.ts | 74 +++++++++++++++++---------------- src/core/types.ts | 2 +- src/source/can.ts | 8 ++-- src/source/console.ts | 8 ++-- src/source/file.ts | 8 ++-- src/source/http.ts | 4 +- src/source/mqtt.ts | 8 ++-- src/source/socketio.ts | 10 ++--- src/source/source.ts | 4 +- src/source/ws.ts | 8 ++-- src/target/can.ts | 8 ++-- src/target/console.ts | 4 +- src/target/file.ts | 6 +-- src/target/http.ts | 6 +-- src/target/mqtt.ts | 8 ++-- src/target/socketio.ts | 8 ++-- src/target/target.ts | 4 +- src/target/ws.ts | 8 ++-- tests/can/can.test.ts | 6 +-- tests/complex/complex.test.ts | 6 +-- tests/file/file.test.ts | 9 ++-- tests/http/http.test.ts | 6 +-- tests/mqtt/mqtt.test.ts | 6 +-- tests/socketio/socketio.test.ts | 6 +-- tests/ws/ws.test.ts | 6 +-- 26 files changed, 117 insertions(+), 125 deletions(-) diff --git a/src/core/bridge.ts b/src/core/bridge.ts index b1a20f3..80990a0 100644 --- a/src/core/bridge.ts +++ b/src/core/bridge.ts @@ -1,7 +1,5 @@ -import {Source} from '#/source/source' -import {Target} from '#/target/target' -import * as assert from '#assert' -import {validateMessage} from '#core/message' +import Source from '#/source/source' +import Target from '#/target/target' import std from '#std' import hae from '#utils/hae' @@ -31,10 +29,6 @@ export class Bridge { await this.source.receive( hae.log(async message => { std.log('bridging forward', {message}) - - assert.isNumber(message.id) - assert.isNumbers(message.data) - await this.target.send(message) if (!this.source.continuous) await this.stop() }) @@ -43,7 +37,6 @@ export class Bridge { await this.target.receive( hae.log(async message => { std.log('bridging backward', {message}) - validateMessage(message) await this.source.send(message) }) ) diff --git a/src/core/message.ts b/src/core/message.ts index 581292d..e9f46c5 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -1,51 +1,53 @@ import * as assert from '#assert' import {Message as CANMessage} from '*can.node' -export type Message = { +export default class Message { id: number data: number[] -} -export function fromJSON(message: Message) { - // TODO - return message -} + // TODO: ext + // TODO: rtr -export function toJSON(message: Message) { - // TODO - return message -} + private constructor(id: number, data: number[]) { + assert.isNumber(id) + assert.isNumbers(data) -export function fromString(message: string): Message { - assert.isString(message) - return JSON.parse(message) -} + this.id = id + this.data = data + } -export function toString(message: Message) { - return JSON.stringify(message) -} + static fromJSON(message: {id: number; data: number[]}) { + return new Message(message.id, message.data) + } -export function fromCAN(message: CANMessage): Message { - return {id: message.id, data: Array.from(message.data)} -} + toJSON() { + return {id: this.id, data: this.id} + } -export function toCAN(message: Message): CANMessage { - return {id: message.id, data: Buffer.from(message.data), ext: false, rtr: false} -} + static fromString(message: string) { + assert.isString(message) + return this.fromJSON(JSON.parse(message)) + } -export function validateMessage(message: Message) { - assert.isNumber(message.id) - assert.isNumbers(message.data) -} + toString() { + return JSON.stringify(this.toJSON()) + } -export function fromArrayBuffer(message: ArrayBuffer | ArrayBuffer[]) { - assert.isBuffer(message) - return fromString(new TextDecoder().decode(message)) -} + static fromCAN(message: CANMessage): Message { + return this.fromJSON({id: message.id, data: Array.from(message.data)}) + } -export function fromBuffer(message: Buffer) { - assert.isBuffer(message) - return fromString(message.toString('utf-8')) -} + toCAN(): CANMessage { + return {id: this.id, data: Buffer.from(this.data), ext: false, rtr: false} + } + + static fromArrayBuffer(message: ArrayBuffer | ArrayBuffer[]) { + assert.isBuffer(message) + return this.fromString(new TextDecoder().decode(message)) + } -// TODO: class Message + static fromBuffer(message: Buffer) { + assert.isBuffer(message) + return this.fromString(message.toString('utf-8')) + } +} diff --git a/src/core/types.ts b/src/core/types.ts index 472fcb3..ba0da76 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -1,4 +1,4 @@ -import {Message} from '#core/message' +import Message from '#core/message' export type Processor = (message: Message) => Promise diff --git a/src/source/can.ts b/src/source/can.ts index 61c4999..1bddaa8 100644 --- a/src/source/can.ts +++ b/src/source/can.ts @@ -1,6 +1,6 @@ -import {Source} from '#/source/source' +import Source from '#/source/source' import * as check from '#check' -import {Message, fromCAN, toCAN} from '#core/message' +import Message from '#core/message' import std from '#std' import {Message as CANMessage, RawChannel} from '*can.node' import * as can from 'socketcan' @@ -29,7 +29,7 @@ export class CANSource extends Source { this.source.addListener('onMessage', (message: CANMessage) => { std.log('can source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromCAN(message)) + this.processor(Message.fromCAN(message)) }) this.readyPromise.resolve() @@ -48,7 +48,7 @@ export class CANSource extends Source { if (!this.options.bidirectional) return std.log('can source not bidirectional') if (check.isUndefined(this.source)) return std.log('can source not defined') - this.source.send(toCAN(message)) + this.source.send(message.toCAN()) std.log('can source sent') } diff --git a/src/source/console.ts b/src/source/console.ts index 49f4619..3560e50 100644 --- a/src/source/console.ts +++ b/src/source/console.ts @@ -1,9 +1,9 @@ -import {Source} from '#/source/source' -import {Message} from '#core/message' +import Source from '#/source/source' +import Message from '#core/message' import {Processor} from '#core/types' import std from '#std' -export type ConsoleSourceOptions = Message +export type ConsoleSourceOptions = {id: number; data: number[]} export class ConsoleSource extends Source { options: ConsoleSourceOptions @@ -14,7 +14,7 @@ export class ConsoleSource extends Source { } async receive(processor: Processor) { - const message = {id: this.options.id, data: this.options.data} + const message = Message.fromJSON({id: this.options.id, data: this.options.data}) std.log('console received', {message}) this.processor = processor this.processor(message) diff --git a/src/source/file.ts b/src/source/file.ts index bda5f1b..8df0d11 100644 --- a/src/source/file.ts +++ b/src/source/file.ts @@ -1,5 +1,5 @@ -import {Source} from '#/source/source' -import {Message} from '#core/message' +import Source from '#/source/source' +import Message from '#core/message' import std from '#std' import * as check from '#utils/check' import {Tail} from 'tail' @@ -22,10 +22,10 @@ export class FileSource extends Source { this.source = new Tail(this.options.file) - this.source.on('line', message => { + this.source.on('line', (message: string) => { std.log('file source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(JSON.parse(message) as Message) + this.processor(Message.fromString(message)) }) this.source.on('error', error => { diff --git a/src/source/http.ts b/src/source/http.ts index fe87c7f..d7c0369 100644 --- a/src/source/http.ts +++ b/src/source/http.ts @@ -1,5 +1,5 @@ -import {Source} from '#/source/source' -import {Message} from '#core/message' +import Source from '#/source/source' +import Message from '#core/message' import std from '#std' import * as check from '#utils/check' import hae from '#utils/hae' diff --git a/src/source/mqtt.ts b/src/source/mqtt.ts index 27e09c8..d8248c2 100644 --- a/src/source/mqtt.ts +++ b/src/source/mqtt.ts @@ -1,5 +1,5 @@ -import {Source} from '#/source/source' -import {fromString, Message, toString} from '#core/message' +import Source from '#/source/source' +import Message from '#core/message' import std from '#std' import * as check from '#utils/check' import Aedes from 'aedes' @@ -54,7 +54,7 @@ export class MQTTSource extends Source { if (topic !== this.options.topic) return std.log('topic unknown', {topic}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromString(message)) + this.processor(Message.fromString(message)) }) this.server.listen({port: this.options.port, host: this.options.host}, () => { @@ -78,7 +78,7 @@ export class MQTTSource extends Source { qos: 0, cmd: 'publish', dup: false, - payload: toString(message), + payload: message.toString(), retain: false, topic: this.options.topic, }, diff --git a/src/source/socketio.ts b/src/source/socketio.ts index 2bb08b2..895c0f6 100644 --- a/src/source/socketio.ts +++ b/src/source/socketio.ts @@ -1,5 +1,5 @@ -import {Source} from '#/source/source' -import {Message} from '#core/message' +import Source from '#/source/source' +import Message from '#core/message' import std from '#std' import * as check from '#utils/check' import http from 'http' @@ -33,10 +33,10 @@ export class SocketIOSource extends Source { std.log(`socketio source connected`, {id: socket.id}) this.socket = socket - this.socket.on(this.options.event, (message: Message) => { + this.socket.on(this.options.event, (message: any) => { std.log('socketio source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(message) + this.processor(Message.fromJSON(message)) }) }) @@ -56,7 +56,7 @@ export class SocketIOSource extends Source { // TODO: broadcast? if (check.isUndefined(this.socket)) return std.log('socketio socket not defined') - this.socket.emit(this.options.event, message) + this.socket.emit(this.options.event, message.toJSON()) std.log('socketio source sent') } diff --git a/src/source/source.ts b/src/source/source.ts index 176258e..cfa0ddb 100644 --- a/src/source/source.ts +++ b/src/source/source.ts @@ -1,8 +1,8 @@ -import {Message} from '#core/message' +import Message from '#core/message' import {Processor} from '#core/types' import * as utils from '#utils' -export abstract class Source { +export default abstract class Source { processor?: Processor // TODO: remove this? options = {} diff --git a/src/source/ws.ts b/src/source/ws.ts index 47fbd3c..2cbee49 100644 --- a/src/source/ws.ts +++ b/src/source/ws.ts @@ -1,5 +1,5 @@ -import {Source} from '#/source/source' -import {Message, toString} from '#core/message' +import Source from '#/source/source' +import Message from '#core/message' import std from '#std' import * as check from '#utils/check' import http from 'http' @@ -41,7 +41,7 @@ export class WSSource extends Source { this.ws.on('message', (message: string) => { std.log('websocket source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(JSON.parse(message) as Message) + this.processor(Message.fromString(message)) }) }) @@ -66,7 +66,7 @@ export class WSSource extends Source { if (!this.options.bidirectional) return std.log('websocket source not bidirectional') if (check.isUndefined(this.ws)) return std.log('websocket source not defined') - this.ws.send(toString(message)) + this.ws.send(message.toString()) std.log('can source sent') } diff --git a/src/target/can.ts b/src/target/can.ts index 0c6ea60..e3bb442 100644 --- a/src/target/can.ts +++ b/src/target/can.ts @@ -1,7 +1,7 @@ -import {Target} from '#/target/target' +import Target from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {fromCAN, Message, toCAN} from '#core/message' +import Message from '#core/message' import std from '#std' import {Message as CANMessage, RawChannel} from '*can.node' import * as can from 'socketcan' @@ -30,7 +30,7 @@ export class CANTarget extends Target { this.target.addListener('onMessage', (message: CANMessage) => { std.log('can source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromCAN(message)) + this.processor(Message.fromCAN(message)) }) } @@ -41,7 +41,7 @@ export class CANTarget extends Target { async send(message: Message) { std.log('can target sending', {message}) assert.isDefined(this.target, 'can target not started') - this.target.send(toCAN(message)) + this.target.send(message.toCAN()) std.log('can target sent') } diff --git a/src/target/console.ts b/src/target/console.ts index 6baaebe..13f0687 100644 --- a/src/target/console.ts +++ b/src/target/console.ts @@ -1,5 +1,5 @@ -import {Target} from '#/target/target' -import {Message} from '#core/message' +import Target from '#/target/target' +import Message from '#core/message' import std from '#std' export class ConsoleTarget extends Target { diff --git a/src/target/file.ts b/src/target/file.ts index 009c5bc..37b2606 100644 --- a/src/target/file.ts +++ b/src/target/file.ts @@ -1,5 +1,5 @@ -import {Target} from '#/target/target' -import {Message} from '#core/message' +import Target from '#/target/target' +import Message from '#core/message' import * as files from '#files' import std from '#std' @@ -18,7 +18,7 @@ export class FileTarget extends Target { async send(message: Message) { std.log('file target sending', {message}) await files.createFile(this.options.file) - await files.appendFile(this.options.file, JSON.stringify(message) + '\n') + await files.appendFile(this.options.file, message.toString() + '\n') std.log('file target sent') } } diff --git a/src/target/http.ts b/src/target/http.ts index c534740..96da8ff 100644 --- a/src/target/http.ts +++ b/src/target/http.ts @@ -1,5 +1,5 @@ -import {Target} from '#/target/target' -import {Message} from '#core/message' +import Target from '#/target/target' +import Message from '#core/message' import std from '#std' import fetch from 'cross-fetch' @@ -19,7 +19,7 @@ export class HTTPTarget extends Target { std.log('http target sending', {message}) await fetch(this.options.endpoint, { method: 'POST', - body: JSON.stringify(message), + body: message.toString(), headers: {'Content-Type': 'application/json'}, }) std.log('http target sent') diff --git a/src/target/mqtt.ts b/src/target/mqtt.ts index 1bf19b0..3faafa1 100644 --- a/src/target/mqtt.ts +++ b/src/target/mqtt.ts @@ -1,7 +1,7 @@ -import {Target} from '#/target/target' +import Target from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {fromBuffer, Message, toString} from '#core/message' +import Message from '#core/message' import std from '#std' import * as mqtt from 'mqtt' @@ -56,7 +56,7 @@ export class MQTTTarget extends Target { if (topic !== this.options.topic) return std.log('topic unknown', {topic}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromBuffer(message)) + this.processor(Message.fromBuffer(message)) }) } @@ -67,7 +67,7 @@ export class MQTTTarget extends Target { async send(message: Message) { std.log('mqtt target publishing', {message}) assert.isDefined(this.target, 'mqtt target not started') - await this.target.publishAsync(this.options.topic, toString(message)) + await this.target.publishAsync(this.options.topic, message.toString()) std.log('mqtt target published') } diff --git a/src/target/socketio.ts b/src/target/socketio.ts index fc251e5..207cba3 100644 --- a/src/target/socketio.ts +++ b/src/target/socketio.ts @@ -1,7 +1,7 @@ -import {Target} from '#/target/target' +import Target from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {Message} from '#core/message' +import Message from '#core/message' import std from '#std' import SocketIOClient, {Socket} from 'socket.io-client' @@ -44,10 +44,10 @@ export class SocketIOTarget extends Target { }) if (this.options.bidirectional) { - this.target.on(this.options.event, (message: Message) => { + this.target.on(this.options.event, (message: any) => { std.log('socketio target received') if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(message) + this.processor(Message.fromJSON(message)) }) } } diff --git a/src/target/target.ts b/src/target/target.ts index 4a2de2f..380168e 100644 --- a/src/target/target.ts +++ b/src/target/target.ts @@ -1,10 +1,10 @@ -import {Message} from '#core/message' +import Message from '#core/message' import {Processor} from '#core/types' import * as utils from '#utils' // TODO: merge this class with source? -export abstract class Target { +export default abstract class Target { processor?: Processor protected readyPromise diff --git a/src/target/ws.ts b/src/target/ws.ts index 598587f..4adbe50 100644 --- a/src/target/ws.ts +++ b/src/target/ws.ts @@ -1,7 +1,7 @@ -import {Target} from '#/target/target' +import Target from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import {fromArrayBuffer, Message, toString} from '#core/message' +import Message from '#core/message' import std from '#std' import WebSocket from 'ws' @@ -42,7 +42,7 @@ export class WSTarget extends Target { this.target.on('message', message => { std.log('websocket target received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(fromArrayBuffer(message)) + this.processor(Message.fromArrayBuffer(message)) }) } } @@ -50,7 +50,7 @@ export class WSTarget extends Target { async send(message: Message) { std.log('websocket target sending', {message}) assert.isDefined(this.target, 'websocket target not started') - this.target.send(toString(message)) + this.target.send(message.toString()) std.log('websocket target sent') } diff --git a/tests/can/can.test.ts b/tests/can/can.test.ts index aa2d372..505151b 100644 --- a/tests/can/can.test.ts +++ b/tests/can/can.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -20,7 +20,7 @@ describe.skip('can', () => { }) it('source-target', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const output = files.temporary() // Start can source with file target @@ -43,7 +43,7 @@ describe.skip('can', () => { std.log('waiting for message being bridged') await utils.sleep(25) - expect(files.loadFile(output).trim()).to.equal(JSON.stringify(message)) + expect(files.loadFile(output).trim()).to.equal(message.toString()) await files.deleteFile(output) await target.stop() diff --git a/tests/complex/complex.test.ts b/tests/complex/complex.test.ts index c420533..bf67d27 100644 --- a/tests/complex/complex.test.ts +++ b/tests/complex/complex.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -29,7 +29,7 @@ describe.skip('complex', () => { }) it('source-target', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const output = files.temporary() const can2file = await actions.startBridge({ @@ -63,7 +63,7 @@ describe.skip('complex', () => { std.log('waiting for message being bridged') await utils.sleep(50) - expect(files.loadFile(output).trim()).to.equal(JSON.stringify(message)) + expect(files.loadFile(output).trim()).to.equal(message.toString()) await files.deleteFile(output) await console2can.stop() diff --git a/tests/file/file.test.ts b/tests/file/file.test.ts index 57f7dea..a151b9f 100644 --- a/tests/file/file.test.ts +++ b/tests/file/file.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('file', () => { it('file2file', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const input = files.temporary() await files.createFile(input) @@ -41,10 +41,7 @@ describe('file', () => { std.log('waiting for message being bridged') await utils.sleep(25) - expect(files.loadFile(output).trim().split('\n')).to.deep.equal([ - JSON.stringify(message), - JSON.stringify(message), - ]) + expect(files.loadFile(output).trim().split('\n')).to.deep.equal([message.toString(), message.toString()]) await second.stop() await first.stop() diff --git a/tests/http/http.test.ts b/tests/http/http.test.ts index 39b7f76..3178486 100644 --- a/tests/http/http.test.ts +++ b/tests/http/http.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('http', () => { it('source-target', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const output = files.temporary() const port = 2999 @@ -31,7 +31,7 @@ describe('http', () => { std.log('waiting for message being bridged') await utils.sleep(25) - expect(files.loadFile(output).trim()).to.equal(JSON.stringify(message)) + expect(files.loadFile(output).trim()).to.equal(message.toString()) await files.deleteFile(output) await target.stop() diff --git a/tests/mqtt/mqtt.test.ts b/tests/mqtt/mqtt.test.ts index c16afe0..34f9ba8 100644 --- a/tests/mqtt/mqtt.test.ts +++ b/tests/mqtt/mqtt.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('mqtt', () => { it('source-target', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const output = files.temporary() const port = 3000 @@ -31,7 +31,7 @@ describe('mqtt', () => { std.log('waiting for message being bridged') await utils.sleep(25) - expect(files.loadFile(output).trim()).to.equal(JSON.stringify(message)) + expect(files.loadFile(output).trim()).to.equal(message.toString()) await files.deleteFile(output) await target.stop() diff --git a/tests/socketio/socketio.test.ts b/tests/socketio/socketio.test.ts index 466ca90..adc1923 100644 --- a/tests/socketio/socketio.test.ts +++ b/tests/socketio/socketio.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('socketio', () => { it('source-target', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const output = files.temporary() const port = 3001 @@ -32,7 +32,7 @@ describe('socketio', () => { std.log('waiting for message being bridged') await utils.sleep(25) - expect(files.loadFile(output).trim()).to.equal(JSON.stringify(message)) + expect(files.loadFile(output).trim()).to.equal(message.toString()) await files.deleteFile(output) await target.stop() diff --git a/tests/ws/ws.test.ts b/tests/ws/ws.test.ts index 0eb3751..29e7327 100644 --- a/tests/ws/ws.test.ts +++ b/tests/ws/ws.test.ts @@ -1,5 +1,5 @@ import * as actions from '#core/actions' -import {Message} from '#core/message' +import Message from '#core/message' import * as files from '#files' import std from '#std' import * as utils from '#utils' @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('websocket', () => { it('source-target', async () => { - const message: Message = {id: 69, data: [1, 2, 3]} + const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) const output = files.temporary() const port = 3002 @@ -31,7 +31,7 @@ describe('websocket', () => { std.log('waiting for message being bridged') await utils.sleep(25) - expect(files.loadFile(output).trim()).to.equal(JSON.stringify(message)) + expect(files.loadFile(output).trim()).to.equal(message.toString()) await files.deleteFile(output) await target.stop() From 9df3e4528782d5806d2cd02cff6f2ca0844c32d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Thu, 21 Sep 2023 00:01:58 +0200 Subject: [PATCH 05/10] fix --- src/core/actions.ts | 4 ++++ src/core/bridge.ts | 3 +++ src/core/message.ts | 19 ++++++++++--------- src/index.ts | 2 ++ src/source/console.ts | 9 +++++++-- src/source/http.ts | 2 +- src/source/ws.ts | 4 ++-- src/target/console.ts | 2 +- src/utils/assert.ts | 5 +++++ 9 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/core/actions.ts b/src/core/actions.ts index 3b583a3..67395fb 100644 --- a/src/core/actions.ts +++ b/src/core/actions.ts @@ -26,6 +26,8 @@ export type BridgeOptions = { sourceName?: string sourceId?: string sourceData?: string[] + sourceExt?: boolean + sourceRtr?: boolean sourceFile?: string sourceBidirectional?: boolean target?: string @@ -81,6 +83,8 @@ function createSource(options: BridgeOptions) { return new ConsoleSource({ id: Number(options.sourceId), data: options.sourceData.map(Number), + ext: options.sourceExt ?? false, + rtr: options.sourceRtr ?? false, }) } diff --git a/src/core/bridge.ts b/src/core/bridge.ts index 80990a0..cee8cd3 100644 --- a/src/core/bridge.ts +++ b/src/core/bridge.ts @@ -1,5 +1,6 @@ import Source from '#/source/source' import Target from '#/target/target' +import * as assert from '#assert' import std from '#std' import hae from '#utils/hae' @@ -29,6 +30,7 @@ export class Bridge { await this.source.receive( hae.log(async message => { std.log('bridging forward', {message}) + assert.isMessage(message) await this.target.send(message) if (!this.source.continuous) await this.stop() }) @@ -37,6 +39,7 @@ export class Bridge { await this.target.receive( hae.log(async message => { std.log('bridging backward', {message}) + assert.isMessage(message) await this.source.send(message) }) ) diff --git a/src/core/message.ts b/src/core/message.ts index e9f46c5..c400ee4 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -4,24 +4,25 @@ import {Message as CANMessage} from '*can.node' export default class Message { id: number data: number[] + ext: boolean + rtr: boolean - // TODO: ext - // TODO: rtr - - private constructor(id: number, data: number[]) { + private constructor(id: number, data: number[], ext: boolean, rtr: boolean) { assert.isNumber(id) assert.isNumbers(data) this.id = id this.data = data + this.ext = ext + this.rtr = rtr } - static fromJSON(message: {id: number; data: number[]}) { - return new Message(message.id, message.data) + static fromJSON(message: {id: number; data: number[]; ext: boolean; rtr: boolean}) { + return new Message(message.id, message.data, message.ext, message.rtr) } toJSON() { - return {id: this.id, data: this.id} + return {id: this.id, data: this.data} } static fromString(message: string) { @@ -34,11 +35,11 @@ export default class Message { } static fromCAN(message: CANMessage): Message { - return this.fromJSON({id: message.id, data: Array.from(message.data)}) + return this.fromJSON({id: message.id, data: Array.from(message.data), ext: message.ext, rtr: message.rtr}) } toCAN(): CANMessage { - return {id: this.id, data: Buffer.from(this.data), ext: false, rtr: false} + return {id: this.id, data: Buffer.from(this.data), ext: this.ext, rtr: this.rtr} } static fromArrayBuffer(message: ArrayBuffer | ArrayBuffer[]) { diff --git a/src/index.ts b/src/index.ts index c3117df..1354673 100644 --- a/src/index.ts +++ b/src/index.ts @@ -30,6 +30,8 @@ bridge .option('--source-name [string]', '', 'can2x') .option('--source-id [number]', '') .option('--source-data [numbers...]', '') + .option('--source-ext [boolean]', '', false) // TODO: docs + .option('--source-rtr [boolean]', '', false) // TODO: docs .option('--source-bidirectional [boolean]', '', true) .addOption( new Option('--target [string]', '') diff --git a/src/source/console.ts b/src/source/console.ts index 3560e50..0f58187 100644 --- a/src/source/console.ts +++ b/src/source/console.ts @@ -3,7 +3,7 @@ import Message from '#core/message' import {Processor} from '#core/types' import std from '#std' -export type ConsoleSourceOptions = {id: number; data: number[]} +export type ConsoleSourceOptions = {id: number; data: number[]; ext: boolean; rtr: boolean} export class ConsoleSource extends Source { options: ConsoleSourceOptions @@ -14,7 +14,12 @@ export class ConsoleSource extends Source { } async receive(processor: Processor) { - const message = Message.fromJSON({id: this.options.id, data: this.options.data}) + const message = Message.fromJSON({ + id: this.options.id, + data: this.options.data, + ext: this.options.ext, + rtr: this.options.rtr, + }) std.log('console received', {message}) this.processor = processor this.processor(message) diff --git a/src/source/http.ts b/src/source/http.ts index d7c0369..177014f 100644 --- a/src/source/http.ts +++ b/src/source/http.ts @@ -40,7 +40,7 @@ export class HTTPSource extends Source { std.log('http source received', {message: req.body}) if (check.isDefined(this.processor)) { - this.processor(req.body) + this.processor(Message.fromJSON(req.body)) } else { std.log('no processor defined') } diff --git a/src/source/ws.ts b/src/source/ws.ts index 2cbee49..3bb1d7e 100644 --- a/src/source/ws.ts +++ b/src/source/ws.ts @@ -38,10 +38,10 @@ export class WSSource extends Source { std.log('websocket source error', {error}) }) - this.ws.on('message', (message: string) => { + this.ws.on('message', (message: Buffer) => { std.log('websocket source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') - this.processor(Message.fromString(message)) + this.processor(Message.fromBuffer(message)) }) }) diff --git a/src/target/console.ts b/src/target/console.ts index 13f0687..d44db68 100644 --- a/src/target/console.ts +++ b/src/target/console.ts @@ -9,7 +9,7 @@ export class ConsoleTarget extends Target { async send(message: Message) { std.log('console target sending', {message}) - std.out(message.id, message.data) + std.out(message.toString()) std.log('console target sent') } } diff --git a/src/utils/assert.ts b/src/utils/assert.ts index a36bece..f8c2812 100644 --- a/src/utils/assert.ts +++ b/src/utils/assert.ts @@ -1,4 +1,5 @@ import * as check from '#check' +import Message from '#core/message' import * as utils from '#utils' export function isDefined(element: T | undefined | null, msg: string): asserts element is T { @@ -51,3 +52,7 @@ export function isName(name: string) { export function isBuffer(element: unknown): asserts element is Buffer { if (!check.isBuffer(element)) throw new Error(`Element "${element} is not a buffer`) } + +export function isMessage(element: any): asserts element is Message { + if (!(element instanceof Message)) throw new Error(`Object "${element}" is not a message"`) +} From 55236266c5b4cebc00964fe375fe334f479b10d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Thu, 21 Sep 2023 00:07:33 +0200 Subject: [PATCH 06/10] fix --- src/core/message.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/message.ts b/src/core/message.ts index c400ee4..2542671 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -17,8 +17,8 @@ export default class Message { this.rtr = rtr } - static fromJSON(message: {id: number; data: number[]; ext: boolean; rtr: boolean}) { - return new Message(message.id, message.data, message.ext, message.rtr) + static fromJSON(message: {id: number; data: number[]; ext?: boolean; rtr?: boolean}) { + return new Message(message.id, message.data, message.ext ?? false, message.rtr ?? false) } toJSON() { From 2ee2a4efc17c0f7b7ff89219a477d8f742095852 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Thu, 21 Sep 2023 11:10:19 +0200 Subject: [PATCH 07/10] docs --- README.md | 108 ++++++++++++++++++++++++-------------------- src/core/actions.ts | 16 +++---- src/index.ts | 5 +- 3 files changed, 70 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 45f6ba4..ad9c3dd 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ > This project is a research prototype and should not be used in production. -`can2x` is a simple utility for connecting a CAN bus unidirectional with another CAN bus over the network using common web protocols, such as HTTP, MQTT, Socket.IO, and WebSockets. +`can2x` is a simple utility for connecting a CAN bus bidirectional with another CAN bus over the network using common web protocols, such as HTTP, MQTT, Socket.IO, and WebSockets. ## Overview @@ -98,23 +98,27 @@ can2x bridge start [options] The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--source` | `can`, `console`, `file`, `http`, `mqtt`, `socketio`, `ws` | `can` | false | | -| `--source-port` | number | `3000` | false | | -| `--source-host` | string | `localhost` | false | | -| `--source-event` | string | `can2x` | false | | -| `--source-topic` | string | `can2x` | false | | -| `--source-name` | string | `can2x` | false | | -| `--source-id` | number | none | false | | -| `--source-data` | number[] | none | false | | -| `--source-file` | string | none | false | | -| `--target` | `can`, `console`, `file`, `http`, `mqtt`, `socketio`, `ws` | `console` | false | | -| `--target-endpoint` | string | none | false | | -| `--target-event` | string | `can2x` | false | | -| `--target-topic` | string | `can2x` | false | | -| `--target-name` | string | `can2x` | false | | -| `--target-file` | string | none | false | | +| Option | Type | Default | Required | Description | +|----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--source` | `can`, `console`, `file`, `http`, `mqtt`, `socketio`, `ws` | `can` | false | | +| `--source-port` | number | `3000` | false | | +| `--source-host` | string | `localhost` | false | | +| `--source-event` | string | `can2x` | false | | +| `--source-topic` | string | `can2x` | false | | +| `--source-name` | string | `can2x` | false | | +| `--source-id` | number | none | false | | +| `--source-data` | number[] | none | false | | +| `--source-ext` | boolean | none | false | | +| `--source-rtr` | boolean | none | false | | +| `--source-file` | string | none | false | | +| `--source-bidirectional` | boolean | `true` | false | | +| `--target` | `can`, `console`, `file`, `http`, `mqtt`, `socketio`, `ws` | `console` | false | | +| `--target-endpoint` | string | none | false | | +| `--target-event` | string | `can2x` | false | | +| `--target-topic` | string | `can2x` | false | | +| `--target-name` | string | `can2x` | false | | +| `--target-file` | string | none | false | | +| `--target-bidirectional` | boolean | `true` | false | | ### vCAN Start @@ -171,10 +175,11 @@ The following options are supported. `can2x` supports a `console2x` bridge, i.e., `--source console`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--source-id` | number | none | true | | -| `--source-data` | number[] | none | true | | +| Option | Type | Default | Required | Description | +|----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--source-id` | number | none | true | | +| `--source-data` | number[] | none | true | | +| `--source-bidirectional` | boolean | `true` | false | | ### File @@ -200,32 +205,35 @@ The following options are supported. `can2x` supports a `mqtt2x` bridge, i.e., `--source mqtt`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--source-port` | number | `3000` | false | | -| `--source-host` | string | `localhost` | false | | -| `--source-topic` | string | `can2x` | false | | +| Option | Type | Default | Required | Description | +|----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--source-port` | number | `3000` | false | | +| `--source-host` | string | `localhost` | false | | +| `--source-topic` | string | `can2x` | false | | +| `--source-bidirectional` | boolean | `true` | false | | ### Socket.IO `can2x` supports a `socketio2x` bridge, i.e., `--source socketio`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--source-port` | number | `3000` | false | | -| `--source-host` | string | `localhost` | false | | -| `--source-event` | string | `can2x` | false | | +| Option | Type | Default | Required | Description | +|----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--source-port` | number | `3000` | false | | +| `--source-host` | string | `localhost` | false | | +| `--source-event` | string | `can2x` | false | | +| `--source-bidirectional` | boolean | `true` | false | | ### WebSocket `can2x` supports a `ws2x` bridge, i.e., `--source ws`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--source-port` | number | `3000` | false | | -| `--source-host` | string | `localhost` | false | | +| Option | Type | Default | Required | Description | +|-----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--source-port` | number | `3000` | false | | +| `--source-host` | string | `localhost` | false | | +| `--source-bidirectional` | boolean | `true` | false | | ## Targets @@ -268,40 +276,42 @@ The following options are supported. `can2x` supports a `x2mqtt` bridge, i.e., `--target mqtt`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--target-endpoint` | string | none | true | | -| `--target-topic` | string | `can2x` | false | | +| Option | Type | Default | Required | Description | +|--------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--target-endpoint` | string | none | true | | +| `--target-topic` | string | `can2x` | false | | +| `--target-bidirectional` | boolean | `true` | false | | ### Socket.IO `can2x` supports a `x2socketio` bridge, i.e., `--target socketio`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--target-endpoint` | string | none | true | | -| `--target-event` | string | `can2x` | false | | +| Option | Type | Default | Required | Description | +|----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--target-endpoint` | string | none | true | | +| `--target-event` | string | `can2x` | false | | +| `--target-bidirectional` | boolean | `true` | false | | ### Websocket `can2x` supports a `x2ws` bridge, i.e., `--target ws`. The following options are supported. -| Option | Type | Default | Required | Description | -|---------------------|------------------------------------------------------------|-------------|----------|-------------| -| `--target-endpoint` | string | none | true | | +| Option | Type | Default | Required | Description | +|----------------------------|------------------------------------------------------------|-------------|----------|-------------| +| `--target-endpoint` | string | none | true | | +| `--target-bidirectional` | boolean | `true` | false | | ## Limitations - `can2x` currently only supports `id` and `data` of a CAN message. -- bridges are unidirectional - security aspects, such as encryption, authentication, and authorization, are not supported - messages are not guaranteed to be delivered ## Similar Projects -It is worth to checkout the following projects. +It is worth to check out the following projects. - [`can2udp`](https://opensource.lely.com/canopen/docs/can2udp) - [`can2mqtt`](https://github.com/c3re/can2mqtt) diff --git a/src/core/actions.ts b/src/core/actions.ts index 67395fb..1baa207 100644 --- a/src/core/actions.ts +++ b/src/core/actions.ts @@ -73,7 +73,7 @@ function createSource(options: BridgeOptions) { if (options.source === 'can') return new CANSource({ name: options.sourceName ?? 'can2x', - bidirectional: options.sourceBidirectional ?? true, // TODO: docs + bidirectional: options.sourceBidirectional ?? true, }) if (options.source === 'console') { @@ -106,7 +106,7 @@ function createSource(options: BridgeOptions) { port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', topic: options.sourceTopic ?? 'can2x', - bidirectional: options.sourceBidirectional ?? true, // TODO: docs + bidirectional: options.sourceBidirectional ?? true, }) if (options.source === 'socketio') @@ -114,14 +114,14 @@ function createSource(options: BridgeOptions) { port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', event: options.sourceEvent ?? 'can2x', - bidirectional: options.sourceBidirectional ?? true, // TODO: docs + bidirectional: options.sourceBidirectional ?? true, }) if (options.source === 'ws') return new WSSource({ port: options.sourcePort ? Number(options.sourcePort) : 3000, host: options.sourceHost ?? 'localhost', - bidirectional: options.sourceBidirectional ?? true, // TODO: docs + bidirectional: options.sourceBidirectional ?? true, }) throw new Error(`Source of type "${options.source}" unknown`) @@ -131,7 +131,7 @@ function createTarget(options: BridgeOptions) { if (options.target === 'can') return new CANTarget({ name: options.targetName ?? 'can2x', - bidirectional: options.targetBidirectional ?? true, // TODO: docs + bidirectional: options.targetBidirectional ?? true, }) if (options.target === 'console') return new ConsoleTarget() @@ -155,7 +155,7 @@ function createTarget(options: BridgeOptions) { return new MQTTTarget({ endpoint: options.targetEndpoint, topic: options.targetTopic ?? 'can2x', - bidirectional: options.targetBidirectional ?? true, // TODO: docs + bidirectional: options.targetBidirectional ?? true, }) } @@ -164,7 +164,7 @@ function createTarget(options: BridgeOptions) { return new SocketIOTarget({ endpoint: options.targetEndpoint, event: options.targetEvent ?? 'can2x', - bidirectional: options.targetBidirectional ?? true, // TODO: docs + bidirectional: options.targetBidirectional ?? true, }) } @@ -172,7 +172,7 @@ function createTarget(options: BridgeOptions) { assert.isDefined(options.targetEndpoint, '--target-endpoint must be defined') return new WSTarget({ endpoint: options.targetEndpoint, - bidirectional: options.targetBidirectional ?? true, // TODO: docs + bidirectional: options.targetBidirectional ?? true, }) } diff --git a/src/index.ts b/src/index.ts index 1354673..57fe3b8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -30,8 +30,9 @@ bridge .option('--source-name [string]', '', 'can2x') .option('--source-id [number]', '') .option('--source-data [numbers...]', '') - .option('--source-ext [boolean]', '', false) // TODO: docs - .option('--source-rtr [boolean]', '', false) // TODO: docs + .option('--source-ext [boolean]', '', false) + .option('--source-rtr [boolean]', '', false) + .option('--source-file [boolean]', '') .option('--source-bidirectional [boolean]', '', true) .addOption( new Option('--target [string]', '') From c95f8f8c43e5584cc0405fce2a03b549c5753c3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Thu, 21 Sep 2023 11:23:35 +0200 Subject: [PATCH 08/10] JSONMessage --- src/core/message.ts | 15 +++++++++++---- src/core/types.ts | 2 +- src/source/console.ts | 11 +++-------- src/source/socketio.ts | 4 ++-- src/source/source.ts | 2 -- src/target/socketio.ts | 4 ++-- src/target/target.ts | 3 +-- tests/can/can.test.ts | 2 +- tests/complex/complex.test.ts | 2 +- tests/file/file.test.ts | 2 +- tests/http/http.test.ts | 2 +- tests/mqtt/mqtt.test.ts | 2 +- tests/socketio/socketio.test.ts | 2 +- tests/ws/ws.test.ts | 2 +- 14 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/core/message.ts b/src/core/message.ts index 2542671..b6d4b83 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -1,6 +1,13 @@ import * as assert from '#assert' import {Message as CANMessage} from '*can.node' +export type JSONMessage = { + id: number + data: number[] + ext: boolean + rtr: boolean +} + export default class Message { id: number data: number[] @@ -17,12 +24,12 @@ export default class Message { this.rtr = rtr } - static fromJSON(message: {id: number; data: number[]; ext?: boolean; rtr?: boolean}) { - return new Message(message.id, message.data, message.ext ?? false, message.rtr ?? false) + static fromJSON(message: JSONMessage) { + return new Message(message.id, message.data, message.ext, message.rtr) } - toJSON() { - return {id: this.id, data: this.data} + toJSON(): JSONMessage { + return {id: this.id, data: this.data, ext: this.ext, rtr: this.rtr} } static fromString(message: string) { diff --git a/src/core/types.ts b/src/core/types.ts index ba0da76..3af09da 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -2,4 +2,4 @@ import Message from '#core/message' export type Processor = (message: Message) => Promise -// TODO: merge this message.ts? +export type JSONObject = string | number | boolean | JSONObject[] | {[key: string]: JSONObject} diff --git a/src/source/console.ts b/src/source/console.ts index 0f58187..d6120f7 100644 --- a/src/source/console.ts +++ b/src/source/console.ts @@ -1,9 +1,9 @@ import Source from '#/source/source' -import Message from '#core/message' +import Message, {JSONMessage} from '#core/message' import {Processor} from '#core/types' import std from '#std' -export type ConsoleSourceOptions = {id: number; data: number[]; ext: boolean; rtr: boolean} +export type ConsoleSourceOptions = JSONMessage export class ConsoleSource extends Source { options: ConsoleSourceOptions @@ -14,12 +14,7 @@ export class ConsoleSource extends Source { } async receive(processor: Processor) { - const message = Message.fromJSON({ - id: this.options.id, - data: this.options.data, - ext: this.options.ext, - rtr: this.options.rtr, - }) + const message = Message.fromJSON(this.options) std.log('console received', {message}) this.processor = processor this.processor(message) diff --git a/src/source/socketio.ts b/src/source/socketio.ts index 895c0f6..cd10d9e 100644 --- a/src/source/socketio.ts +++ b/src/source/socketio.ts @@ -1,5 +1,5 @@ import Source from '#/source/source' -import Message from '#core/message' +import Message, {JSONMessage} from '#core/message' import std from '#std' import * as check from '#utils/check' import http from 'http' @@ -33,7 +33,7 @@ export class SocketIOSource extends Source { std.log(`socketio source connected`, {id: socket.id}) this.socket = socket - this.socket.on(this.options.event, (message: any) => { + this.socket.on(this.options.event, (message: JSONMessage) => { std.log('socketio source received', {message}) if (check.isUndefined(this.processor)) return std.log('no processor defined') this.processor(Message.fromJSON(message)) diff --git a/src/source/source.ts b/src/source/source.ts index cfa0ddb..40493c3 100644 --- a/src/source/source.ts +++ b/src/source/source.ts @@ -4,8 +4,6 @@ import * as utils from '#utils' export default abstract class Source { processor?: Processor - // TODO: remove this? - options = {} protected readyPromise diff --git a/src/target/socketio.ts b/src/target/socketio.ts index 207cba3..5d39852 100644 --- a/src/target/socketio.ts +++ b/src/target/socketio.ts @@ -1,7 +1,7 @@ import Target from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import Message from '#core/message' +import Message, {JSONMessage} from '#core/message' import std from '#std' import SocketIOClient, {Socket} from 'socket.io-client' @@ -44,7 +44,7 @@ export class SocketIOTarget extends Target { }) if (this.options.bidirectional) { - this.target.on(this.options.event, (message: any) => { + this.target.on(this.options.event, (message: JSONMessage) => { std.log('socketio target received') if (check.isUndefined(this.processor)) return std.log('no processor defined') this.processor(Message.fromJSON(message)) diff --git a/src/target/target.ts b/src/target/target.ts index 380168e..f880927 100644 --- a/src/target/target.ts +++ b/src/target/target.ts @@ -2,12 +2,11 @@ import Message from '#core/message' import {Processor} from '#core/types' import * as utils from '#utils' -// TODO: merge this class with source? - export default abstract class Target { processor?: Processor protected readyPromise + protected constructor() { this.readyPromise = utils.createDecomposedPromise() } diff --git a/tests/can/can.test.ts b/tests/can/can.test.ts index 505151b..f6a01a8 100644 --- a/tests/can/can.test.ts +++ b/tests/can/can.test.ts @@ -20,7 +20,7 @@ describe.skip('can', () => { }) it('source-target', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const output = files.temporary() // Start can source with file target diff --git a/tests/complex/complex.test.ts b/tests/complex/complex.test.ts index bf67d27..295153a 100644 --- a/tests/complex/complex.test.ts +++ b/tests/complex/complex.test.ts @@ -29,7 +29,7 @@ describe.skip('complex', () => { }) it('source-target', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const output = files.temporary() const can2file = await actions.startBridge({ diff --git a/tests/file/file.test.ts b/tests/file/file.test.ts index a151b9f..756db9b 100644 --- a/tests/file/file.test.ts +++ b/tests/file/file.test.ts @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('file', () => { it('file2file', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const input = files.temporary() await files.createFile(input) diff --git a/tests/http/http.test.ts b/tests/http/http.test.ts index 3178486..064fa48 100644 --- a/tests/http/http.test.ts +++ b/tests/http/http.test.ts @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('http', () => { it('source-target', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const output = files.temporary() const port = 2999 diff --git a/tests/mqtt/mqtt.test.ts b/tests/mqtt/mqtt.test.ts index 34f9ba8..8e03f7a 100644 --- a/tests/mqtt/mqtt.test.ts +++ b/tests/mqtt/mqtt.test.ts @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('mqtt', () => { it('source-target', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const output = files.temporary() const port = 3000 diff --git a/tests/socketio/socketio.test.ts b/tests/socketio/socketio.test.ts index adc1923..45912d9 100644 --- a/tests/socketio/socketio.test.ts +++ b/tests/socketio/socketio.test.ts @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('socketio', () => { it('source-target', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const output = files.temporary() const port = 3001 diff --git a/tests/ws/ws.test.ts b/tests/ws/ws.test.ts index 29e7327..eacfc74 100644 --- a/tests/ws/ws.test.ts +++ b/tests/ws/ws.test.ts @@ -7,7 +7,7 @@ import {expect} from 'chai' describe('websocket', () => { it('source-target', async () => { - const message = Message.fromJSON({id: 69, data: [1, 2, 3]}) + const message = Message.fromJSON({id: 69, data: [1, 2, 3], ext: false, rtr: false}) const output = files.temporary() const port = 3002 From 4373ce8b1ff4a163b18f0a61ecd5e3aaa12a10b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Thu, 21 Sep 2023 11:35:55 +0200 Subject: [PATCH 09/10] minor --- src/core/message.ts | 6 +++++- src/source/can.ts | 4 ++-- src/source/ws.ts | 4 ++-- src/target/can.ts | 4 ++-- src/utils/assert.ts | 2 +- src/utils/check.ts | 5 +++++ 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/core/message.ts b/src/core/message.ts index b6d4b83..57a0fdc 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -1,5 +1,7 @@ import * as assert from '#assert' -import {Message as CANMessage} from '*can.node' +import {Message as _CANMessage} from '*can.node' + +export type CANMessage = _CANMessage export type JSONMessage = { id: number @@ -17,6 +19,8 @@ export default class Message { private constructor(id: number, data: number[], ext: boolean, rtr: boolean) { assert.isNumber(id) assert.isNumbers(data) + assert.isBoolean(ext) + assert.isBoolean(rtr) this.id = id this.data = data diff --git a/src/source/can.ts b/src/source/can.ts index 1bddaa8..6ef6778 100644 --- a/src/source/can.ts +++ b/src/source/can.ts @@ -1,8 +1,8 @@ import Source from '#/source/source' import * as check from '#check' -import Message from '#core/message' +import Message, {CANMessage} from '#core/message' import std from '#std' -import {Message as CANMessage, RawChannel} from '*can.node' +import {RawChannel} from '*can.node' import * as can from 'socketcan' export type CANSourceOptions = { diff --git a/src/source/ws.ts b/src/source/ws.ts index 3bb1d7e..975d6ff 100644 --- a/src/source/ws.ts +++ b/src/source/ws.ts @@ -3,7 +3,7 @@ import Message from '#core/message' import std from '#std' import * as check from '#utils/check' import http from 'http' -import WebSocket, * as ws from 'ws' +import {WebSocket, WebSocketServer} from 'ws' export type WSSourceOptions = { port: number @@ -25,7 +25,7 @@ export class WSSource extends Source { std.log('starting websocket source', {options: this.options}) this.server = http.createServer() - const wss = new ws.WebSocketServer({ + const wss = new WebSocketServer({ server: this.server, }) diff --git a/src/target/can.ts b/src/target/can.ts index e3bb442..1b08130 100644 --- a/src/target/can.ts +++ b/src/target/can.ts @@ -1,9 +1,9 @@ import Target from '#/target/target' import * as assert from '#assert' import * as check from '#check' -import Message from '#core/message' +import Message, {CANMessage} from '#core/message' import std from '#std' -import {Message as CANMessage, RawChannel} from '*can.node' +import {RawChannel} from '*can.node' import * as can from 'socketcan' export type CANTargetOptions = { diff --git a/src/utils/assert.ts b/src/utils/assert.ts index f8c2812..d83bc5f 100644 --- a/src/utils/assert.ts +++ b/src/utils/assert.ts @@ -54,5 +54,5 @@ export function isBuffer(element: unknown): asserts element is Buffer { } export function isMessage(element: any): asserts element is Message { - if (!(element instanceof Message)) throw new Error(`Object "${element}" is not a message"`) + if (!check.isMessage(element)) throw new Error(`Object "${element}" is not a message"`) } diff --git a/src/utils/check.ts b/src/utils/check.ts index 0d7c39f..3d3aa0f 100644 --- a/src/utils/check.ts +++ b/src/utils/check.ts @@ -1,3 +1,5 @@ +import Message from '#core/message' + export function isTrue(element?: boolean) { return isDefined(element) && element === true } @@ -41,3 +43,6 @@ export function isName(name: string) { export function isBuffer(element: unknown): element is Buffer { return Buffer.isBuffer(element) } +export function isMessage(element: any): element is Message { + return element instanceof Message +} From 4d2b0331259df24c90084cee50b70e736cf96336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Thu, 21 Sep 2023 11:37:14 +0200 Subject: [PATCH 10/10] minor --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ad9c3dd..6d89dc6 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,8 @@ A CAN message is internally represented as follows. |--------|----------|-----------------------------------------| | `id` | number | The decimal id of the CAN message. | | `data` | number[] | The decimal payload of the CAN message. | +| `ext` | boolean | | +| `rtr` | boolean | | ## Sources @@ -305,7 +307,6 @@ The following options are supported. ## Limitations -- `can2x` currently only supports `id` and `data` of a CAN message. - security aspects, such as encryption, authentication, and authorization, are not supported - messages are not guaranteed to be delivered