From 1ea62aa81ee26a21f3fe26bc3cb7484757c2b2e6 Mon Sep 17 00:00:00 2001 From: jrcleber Date: Tue, 9 Jul 2024 09:41:54 -0300 Subject: [PATCH] websocket compatibility --- package.json | 10 +- src/app.module.ts | 11 +- src/config/env.config.ts | 19 ++- src/websocket/Readme.md | 108 +++++++++++++++ src/websocket/server.ts | 123 ++++++++++++++++++ .../controllers/instance.controller.ts | 13 +- src/whatsapp/dto/webhook.dto.ts | 2 + src/whatsapp/services/monitor.service.ts | 7 +- src/whatsapp/services/whatsapp.service.ts | 30 ++++- views/qrcode.hbs | 88 +++++++++++-- 10 files changed, 379 insertions(+), 32 deletions(-) create mode 100644 src/websocket/Readme.md create mode 100644 src/websocket/server.ts diff --git a/package.json b/package.json index 1ae84856..3d64a1e1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "whatsapp-api", - "version": "1.3.1", + "version": "1.3.2", "description": "Rest api for communication with WhatsApp", "main": "./dist/src/main.js", "scripts": { @@ -45,7 +45,7 @@ "@hapi/boom": "^10.0.1", "@prisma/client": "^5.14.0", "@scalar/express-api-reference": "^0.4.42", - "@whiskeysockets/baileys": "6.6.0", + "@whiskeysockets/baileys": "6.7.5", "axios": "^1.6.2", "class-validator": "^0.14.0", "cross-env": "^7.0.3", @@ -69,7 +69,8 @@ "pino-pretty": "^11.0.0", "qrcode": "^1.5.1", "qrcode-terminal": "^0.12.0", - "uuid": "^9.0.0", + "uuid": "^10.0.0", + "ws": "^8.18.0", "yamljs": "^0.3.0" }, "devDependencies": { @@ -84,7 +85,8 @@ "@types/node": "^20.12.12", "@types/qrcode": "^1.5.5", "@types/qrcode-terminal": "^0.12.2", - "@types/uuid": "^9.0.7", + "@types/uuid": "^10.0.0", + "@types/ws": "^8.5.10", "@types/yamljs": "^0.2.34", "@typescript-eslint/eslint-plugin": "6.21.0", "@typescript-eslint/parser": "6.21.0", diff --git a/src/app.module.ts b/src/app.module.ts index c8bcacf1..9c7eed32 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -73,6 +73,8 @@ import { ErrorMiddle } from './middle/error.middle'; import 'express-async-errors'; import { docsRouter } from './config/scala.config'; import { ProviderFiles } from './provider/sessions'; +import { Websocket } from './websocket/server'; +import { createServer } from 'http'; export function describeRoutes( rootPath: string, @@ -105,6 +107,7 @@ export enum HttpStatus { export async function AppModule(context: Map) { const app = express(); + const server = createServer(app); const configService = new ConfigService(); @@ -118,11 +121,16 @@ export async function AppModule(context: Map) { await repository.onModuleInit(); logger.info('Repository - ON'); + const wss = new Websocket(configService); + wss.server(server); + logger.info('WebSocket Server - ON'); + const waMonitor = new WAMonitoringService( eventEmitter, configService, repository, providerFiles, + wss, ); logger.info('WAMonitoringService - ON'); @@ -151,6 +159,7 @@ export async function AppModule(context: Map) { eventEmitter, instanceService, providerFiles, + wss, ); logger.info('InstanceController - ON'); @@ -225,7 +234,7 @@ export async function AppModule(context: Map) { await providerFiles.onModuleDestroy(); }; - context.set('app', app); + context.set('app', server); context.set('module:logger', logger); context.set('module:repository', repository); context.set('module:redisCache', providerFiles); diff --git a/src/config/env.config.ts b/src/config/env.config.ts index e0862986..372b3446 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -178,7 +178,9 @@ export class ConfigService { CONTACTS: process.env?.DATABASE_SAVE_DATA_CONTACTS === 'true', CHATS: process.env?.DATABASE_SAVE_DATA_CHATS === 'true', LOGS: process.env?.DATABASE_SAVE_LOGS === 'true', - ACTIVITY_LOGS: process.env?.DATABASE_SAVE_ACTIVITY_LOGS ? process.env?.DATABASE_SAVE_ACTIVITY_LOGS === 'true' : true + ACTIVITY_LOGS: process.env?.DATABASE_SAVE_ACTIVITY_LOGS + ? process.env?.DATABASE_SAVE_ACTIVITY_LOGS === 'true' + : true, }, }, PROVIDER: { @@ -191,9 +193,10 @@ export class ConfigService { LEVEL: process.env?.LOG_LEVEL.split('|') as LogLevel[], COLOR: process.env?.LOG_COLOR === 'true', }, - INSTANCE_EXPIRATION_TIME: process.env?.INSTANCE_EXPIRATION_TIME === 'false' - ? false - : Number.parseInt(process.env?.INSTANCE_EXPIRATION_TIME || '5'), + INSTANCE_EXPIRATION_TIME: + process.env?.INSTANCE_EXPIRATION_TIME === 'false' + ? false + : Number.parseInt(process.env?.INSTANCE_EXPIRATION_TIME || '5'), GLOBAL_WEBHOOK: { URL: process.env?.WEBHOOK_GLOBAL_URL, ENABLED: process.env?.WEBHOOK_GLOBAL_ENABLED === 'true', @@ -205,8 +208,12 @@ export class ConfigService { QRCODE: { LIMIT: Number.parseInt(process.env?.QRCODE_LIMIT || '10'), EXPIRATION_TIME: Number.parseInt(process.env?.QRCODE_EXPIRATION_TIME || '60'), - LIGHT_COLOR: process.env?.QRCODE_LIGHT_COLOR ? process.env?.QRCODE_LIGHT_COLOR : '#ffffff', - DARK_COLOR: process.env?.QRCODE_DARK_COLOR ? process.env?.QRCODE_DARK_COLOR : '#198754' + LIGHT_COLOR: process.env?.QRCODE_LIGHT_COLOR + ? process.env?.QRCODE_LIGHT_COLOR + : '#ffffff', + DARK_COLOR: process.env?.QRCODE_DARK_COLOR + ? process.env?.QRCODE_DARK_COLOR + : '#198754', }, CONNECTION_TIMEOUT: Number.parseInt(process.env?.CONNECTION_TIMEOUT || '300'), AUTHENTICATION: { diff --git a/src/websocket/Readme.md b/src/websocket/Readme.md new file mode 100644 index 00000000..1a7bec0e --- /dev/null +++ b/src/websocket/Readme.md @@ -0,0 +1,108 @@ +## Websocket + +### 1. Inicialização da Conexão WebSocket + +O processo começa com a criação de uma nova instância do objeto `WebSocket`. O URL para a conexão inclui o caminho para o servidor WebSocket, um parâmetro de evento que especifica o tipo de evento ao qual o cliente está se inscrevendo e um token de autenticação. Este token será usado pelo servidor para verificar se o cliente tem permissão para conectar e receber esses eventos. + +```javascript +const ws = new WebSocket(`${url}?event=${encodeURIComponent(eventName)}&token=${encodeURIComponent(instanceToken)}`); +``` + +A URL é construída para incluir: + +- **`event`**: um parâmetro que especifica o tipo de evento de interesse, facilitando ao servidor rotear ou filtrar as mensagens de acordo com o tipo de dado ou comando desejado. +- **`token`**: um parâmetro de segurança que provavelmente é usado pelo servidor para autenticar e autorizar a conexão. + +### 2. Manipuladores de Eventos + +**`onopen`**: +- Este evento é disparado quando a conexão WebSocket é estabelecida com sucesso. +- No manipulador de `onopen`, uma mensagem de teste é enviada imediatamente ao servidor. Isso pode ser usado para confirmar que a via de comunicação está funcionando ou para informar ao servidor sobre o estado inicial desejado do cliente. + +```javascript +ws.onopen = () => { + console.log('Connected to the server'); + ws.send(JSON.stringify({ message: "test data" })); +}; +``` + +**`onmessage`**: +- Este evento é acionado sempre que uma mensagem é recebida do servidor. +- As mensagens recebidas são tratadas convertendo o conteúdo de `event.data` de uma string JSON para um objeto JavaScript, que é então passado para a função `callback` fornecida pelo usuário do script. + +```javascript +ws.onmessage = (event) => { + if (callback) { + const data = JSON.parse(event.data) + callback(data, event) + } +}; +``` + +**`onerror`**: +- Acionado quando ocorre um erro na conexão WebSocket. +- Pode ser usado para logar ou tratar erros de rede, falhas de transmissão, etc. + +```javascript +ws.onerror = (error) => { + console.log('Error:', error); +}; +``` + +**`onclose`**: +- Este evento é acionado quando a conexão WebSocket é fechada, seja por iniciativa do cliente, do servidor, ou devido a falhas de rede. +- O manipulador de eventos `onclose` tenta automaticamente reconectar-se ao servidor após um intervalo definido. Importante corrigir aqui, o `setTimeout` deve chamar `socket(eventName, callback)` ao invés de `socket(event)`, para garantir que a reconexão seja feita corretamente. + +```javascript +ws.onclose = (event) => { + console.log(`Connection closed with code ${event.code} and reason ${event.reason}, attempting to reconnect...`); + setTimeout(() => socket(eventName, callback), reconnectInterval); +}; +``` + +### 3. Reconexão Automática + +- Após a conexão ser fechada, o cliente tenta se reconectar usando um intervalo de tempo definido (`reconnectInterval`). Este comportamento garante que o cliente tente manter uma conexão persistente mesmo em face de problemas de rede ou reinícios do servidor. + +### 4. Exemplo completo + +```javascript +const url = 'ws://localhost:8084/ws/events'; +const reconnectInterval = 5000; // 5 segundos + +function socket(eventName, callback) { + const ws = new WebSocket(`${url}?event=${encodeURIComponent(eventName)}&token=${encodeURIComponent("{{auth.token}}")}`); + + ws.onopen = () => { + console.log('Connected to the server'); + }; + + ws.onmessage = (event) => { + if (callback) { + const data = JSON.parse(event.data) + callback(data, event) + } + }; + + ws.onerror = (error) => { + console.log('Error:', error); + }; + + ws.onclose = (event) => { + console.log(`Connection closed with code ${event.code} and reason ${event.reason}, attempting to reconnect...`); + setTimeout(() => socket(event), reconnectInterval); + }; +} + +// Será criado uma instância da função para cada evento +// Os eventos são os mesmos disparados pela webhook +// exceto o evento "messaging-history.set" + +socket("connection.update", (msg, event) => { + console.log(msg) +}) + +socket("messages.upsert", (msg, event) => { + console.log(msg) +}) +``` \ No newline at end of file diff --git a/src/websocket/server.ts b/src/websocket/server.ts new file mode 100644 index 00000000..69e9fd53 --- /dev/null +++ b/src/websocket/server.ts @@ -0,0 +1,123 @@ +/** + * ┌──────────────────────────────────────────────────────────────────────────────┐ + * │ @author jrCleber │ + * │ @filename server.ts │ + * │ Developed by: Cleber Wilson │ + * │ Creation date: Jul 08, 2024 │ + * │ Contact: contato@codechat.dev │ + * ├──────────────────────────────────────────────────────────────────────────────┤ + * │ @copyright © Cleber Wilson 2022. All rights reserved. │ + * │ Licensed under the Apache License, Version 2.0 │ + * │ │ + * │ @license "https://github.com/code-chat-br/whatsapp-api/blob/main/LICENSE" │ + * │ │ + * │ You may not use this file except in compliance with the License. │ + * │ You may obtain a copy of the License at │ + * │ │ + * │ http://www.apache.org/licenses/LICENSE-2.0 │ + * │ │ + * │ Unless required by applicable law or agreed to in writing, software │ + * │ distributed under the License is distributed on an "AS IS" BASIS, │ + * │ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │ + * │ │ + * │ See the License for the specific language governing permissions and │ + * │ limitations under the License. │ + * │ │ + * │ @class │ + * │ @constructs Websocket │ + * │ @param {ConfigService} configService │ + * ├──────────────────────────────────────────────────────────────────────────────┤ + * │ @important │ + * │ For any future changes to the code in this file, it is recommended to │ + * │ contain, together with the modification, the information of the developer │ + * │ who changed it and the date of modification. │ + * └──────────────────────────────────────────────────────────────────────────────┘ + */ + +import Ws from 'ws'; +import { Logger } from '../config/logger.config'; +import { Auth, ConfigService } from '../config/env.config'; +import { Server } from 'http'; +import { isJWT } from 'class-validator'; +import { verify } from 'jsonwebtoken'; +import { JwtPayload } from '../whatsapp/services/instance.service'; +import { EventsType, ListEvents } from '../whatsapp/dto/webhook.dto'; + +export class Websocket { + constructor(private readonly configService: ConfigService) {} + + private readonly logger = new Logger(this.configService, Websocket.name); + + private readonly hub = new Map(); + + send(instance: string, event: EventsType, data: T): boolean { + const key = `${instance}_${event}`; + const client = this.hub.get(key); + + if (!client) { + return; + } + + const json = JSON.stringify(data); + client.send(json); + } + + server(server: Server) { + const wss = new Ws.Server({ noServer: true }); + + let key = ''; + let canActivate = false; + + wss.on('connection', (ws, req) => { + if (!canActivate) { + ws.close(401, ' HTTP/1.1 401 Unauthorized'); + return; + } + }); + + server.on('upgrade', (req, socket, head) => { + const url = new URL(req.url, `http://${req.headers.host}`); + const params = url.searchParams; + + const event = params.get('event') as EventsType; + const token = params.get('token'); + + try { + if ( + url.pathname !== '/ws/events' || + !event || + !token || + !isJWT(token) || + !ListEvents.includes(event) + ) { + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + + const jwtOpts = this.configService.get('AUTHENTICATION').JWT; + const decode = verify(token, jwtOpts.SECRET, { + ignoreExpiration: jwtOpts.EXPIRIN_IN === 0, + }) as JwtPayload; + + canActivate = true; + + if (!canActivate) { + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + key = `${decode.instanceName}_${event}`; + + wss.handleUpgrade(req, socket, head, (socket) => { + wss.emit('connection', socket, req); + }); + } catch (error) { + this.logger.error(error); + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.destroy(); + return; + } + }); + } +} diff --git a/src/whatsapp/controllers/instance.controller.ts b/src/whatsapp/controllers/instance.controller.ts index c9989750..33aef8c3 100644 --- a/src/whatsapp/controllers/instance.controller.ts +++ b/src/whatsapp/controllers/instance.controller.ts @@ -55,6 +55,7 @@ import { InstanceService, OldToken } from '../services/instance.service'; import { WAStartupService } from '../services/whatsapp.service'; import { isString } from 'class-validator'; import { ProviderFiles } from '../../provider/sessions'; +import { Websocket } from '../../websocket/server'; export class InstanceController { constructor( @@ -64,6 +65,7 @@ export class InstanceController { private readonly eventEmitter: EventEmitter2, private readonly instanceService: InstanceService, private readonly providerFiles: ProviderFiles, + private readonly ws: Websocket, ) {} private readonly logger = new Logger(this.configService, InstanceController.name); @@ -111,18 +113,21 @@ export class InstanceController { try { let instance: WAStartupService; instance = this.waMonitor.waInstances.get(instanceName); - if ( - instance?.connectionStatus?.state === 'open' - ) { + if (instance?.connectionStatus?.state === 'open') { throw 'Instance already connected'; } - if (!instance || !instance.connectionStatus || instance?.connectionStatus?.state === 'refused') { + if ( + !instance || + !instance.connectionStatus || + instance?.connectionStatus?.state === 'refused' + ) { instance = new WAStartupService( this.configService, this.eventEmitter, this.repository, this.providerFiles, + this.ws, ); await instance.setInstanceName(instanceName); this.waMonitor.addInstance(instanceName, instance); diff --git a/src/whatsapp/dto/webhook.dto.ts b/src/whatsapp/dto/webhook.dto.ts index b60bf381..188b27fd 100644 --- a/src/whatsapp/dto/webhook.dto.ts +++ b/src/whatsapp/dto/webhook.dto.ts @@ -109,4 +109,6 @@ export const WebhookEventsEnum: Record = { callUpsert: 'call.upsert', }; +export const ListEvents: EventsType[] = Object.values(WebhookEventsEnum); + export type WebhookEventsMap = typeof WebhookEventsEnum; diff --git a/src/whatsapp/services/monitor.service.ts b/src/whatsapp/services/monitor.service.ts index b0dd8cf7..d06d9311 100644 --- a/src/whatsapp/services/monitor.service.ts +++ b/src/whatsapp/services/monitor.service.ts @@ -52,6 +52,7 @@ import { import { Repository } from '../../repository/repository.service'; import { Instance } from '@prisma/client'; import { ProviderFiles } from '../../provider/sessions'; +import { Websocket } from '../../websocket/server'; export class WAMonitoringService { constructor( @@ -59,6 +60,7 @@ export class WAMonitoringService { private readonly configService: ConfigService, private readonly repository: Repository, private readonly providerFiles: ProviderFiles, + private readonly ws: Websocket, ) { this.removeInstance(); this.noConnection(); @@ -126,7 +128,9 @@ export class WAMonitoringService { private clearListeners(instanceName: string) { try { - this.waInstances.get(instanceName)?.client?.ev.removeAllListeners('connection.update'); + this.waInstances + .get(instanceName) + ?.client?.ev.removeAllListeners('connection.update'); this.waInstances.get(instanceName)?.client?.ev.flush(); this.waInstances.delete(instanceName); } catch { @@ -147,6 +151,7 @@ export class WAMonitoringService { this.eventEmitter, this.repository, this.providerFiles, + this.ws, ); await init.setInstanceName(name); this.addInstance(init.instanceName, init); diff --git a/src/whatsapp/services/whatsapp.service.ts b/src/whatsapp/services/whatsapp.service.ts index 0e2a136f..5d1df3c3 100644 --- a/src/whatsapp/services/whatsapp.service.ts +++ b/src/whatsapp/services/whatsapp.service.ts @@ -133,6 +133,7 @@ import PrismType from '@prisma/client'; import * as s3Service from '../../integrations/minio/minio.utils'; import { TypebotSessionService } from '../../integrations/typebot/typebot.service'; import { ProviderFiles } from '../../provider/sessions'; +import { Websocket } from '../../websocket/server'; type InstanceQrCode = { count: number; @@ -151,6 +152,7 @@ export class WAStartupService { private readonly eventEmitter: EventEmitter2, private readonly repository: Repository, private readonly providerFiles: ProviderFiles, + private readonly ws: Websocket, ) { this.authStateProvider = new AuthStateProvider( this.configService, @@ -391,6 +393,8 @@ export class WAStartupService { this.instanceQr.base64 = base64; this.instanceQr.code = qr; + this.ws.send(this.instance.name, 'qrcode.updated', { code: qr, base64 }); + this.sendDataWebhook('qrcodeUpdated', { qrcode: { instance: this.instance.name, code: qr, base64 }, }); @@ -408,10 +412,13 @@ export class WAStartupService { this.stateConnection.state = connection; this.stateConnection.statusReason = (lastDisconnect?.error as Boom)?.output?.statusCode ?? 200; - this.sendDataWebhook('connectionUpdated', { + + const data = { instance: this.instance.name, ...this.stateConnection, - }); + }; + this.ws.send(this.instance.name, 'connection.update', data); + this.sendDataWebhook('connectionUpdated', data); } if (connection === 'close') { @@ -578,6 +585,8 @@ export class WAStartupService { } as PrismType.Chat); } + this.ws.send(this.instance.name, 'chats.upsert', chatsRaw); + await this.sendDataWebhook('chatsUpsert', chatsRaw); await this.repository.chat.createMany({ data: chatsRaw, @@ -596,6 +605,7 @@ export class WAStartupService { const chatsRaw: PrismType.Chat[] = chats.map((chat) => { return { remoteJid: chat.id, instanceId: this.instance.id } as PrismType.Chat; }); + this.ws.send(this.instance.name, 'chats.update', chatsRaw); await this.sendDataWebhook('chatsUpdated', chatsRaw); }, @@ -635,6 +645,9 @@ export class WAStartupService { instanceId: this.instance.id, } as unknown as PrismType.Contact); } + + this.ws.send(this.instance.name, 'contacts.upsert', contactsRaw); + await this.sendDataWebhook('contactsUpsert', contactsRaw); await this.repository.contact.createMany({ data: contactsRaw, @@ -663,6 +676,8 @@ export class WAStartupService { contactsRaw.push(data); } + this.ws.send(this.instance.name, 'contacts.update', contactsRaw); + await this.sendDataWebhook('contactsUpdated', contactsRaw); }, }; @@ -811,6 +826,8 @@ export class WAStartupService { this.logger.log('Type: ' + type); console.log(messageRaw); + this.ws.send(this.instance.name, 'messages.upsert', messageRaw); + await this.sendDataWebhook('messagesUpsert', messageRaw); if (s3Service.BUCKET?.ENABLE) { @@ -964,6 +981,9 @@ export class WAStartupService { dateTime: new Date(), instanceId: this.instance.id, }; + + this.ws.send(this.instance.name, 'messages.update', message); + await this.sendDataWebhook('messagesUpdated', message); if (this.databaseOptions.DB_OPTIONS.MESSAGE_UPDATE) { this.repository.message @@ -993,10 +1013,12 @@ export class WAStartupService { private readonly groupHandler = { 'groups.upsert': (groupMetadata: GroupMetadata[]) => { + this.ws.send(this.instance.name, 'groups.upsert', groupMetadata); this.sendDataWebhook('groupsUpsert', groupMetadata); }, 'groups.update': (groupMetadataUpdate: Partial[]) => { + this.ws.send(this.instance.name, 'groups.update', groupMetadataUpdate); this.sendDataWebhook('groupsUpdated', groupMetadataUpdate); }, @@ -1005,6 +1027,7 @@ export class WAStartupService { participants: string[]; action: ParticipantAction; }) => { + this.ws.send(this.instance.name, 'group-participants.update', participantsUpdate); this.sendDataWebhook('groupsParticipantsUpdated', participantsUpdate); }, }; @@ -1012,6 +1035,7 @@ export class WAStartupService { private readonly callHandler = { 'call.upsert': (call: WACallEvent[]) => { call.forEach((c) => { + this.ws.send(this.instance.name, 'call.upsert', c); this.sendDataWebhook('callUpsert', c); }); }, @@ -1045,6 +1069,7 @@ export class WAStartupService { if (events?.['presence.update']) { const payload = events['presence.update']; + this.ws.send(this.instance.name, 'presence.update', payload); this.sendDataWebhook('presenceUpdated', payload); } @@ -1299,6 +1324,7 @@ export class WAStartupService { messageSent.id = id; } + this.ws.send(this.instance.name, 'send.message', messageSent); this.sendDataWebhook('sendMessage', messageSent).catch((error) => this.logger.error(error), ); diff --git a/views/qrcode.hbs b/views/qrcode.hbs index f4bac69f..43576a89 100644 --- a/views/qrcode.hbs +++ b/views/qrcode.hbs @@ -11,13 +11,31 @@ - + Generate QRCode @@ -29,15 +47,16 @@

Connect to whatsapp

+ Instance name
-
-
-
+
+
+