Skip to content

Commit

Permalink
websocket compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
jrCleber committed Jul 9, 2024
1 parent e2a28e3 commit 1ea62aa
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 32 deletions.
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "whatsapp-api",
"version": "1.3.1",
"version": "1.3.2",
"description": "Rest api for communication with WhatsApp",
"main": "./dist/src/main.js",
"scripts": {
Expand Down Expand Up @@ -45,7 +45,7 @@
"@hapi/boom": "^10.0.1",
"@prisma/client": "^5.14.0",
"@scalar/express-api-reference": "^0.4.42",
"@whiskeysockets/baileys": "6.6.0",
"@whiskeysockets/baileys": "6.7.5",
"axios": "^1.6.2",
"class-validator": "^0.14.0",
"cross-env": "^7.0.3",
Expand All @@ -69,7 +69,8 @@
"pino-pretty": "^11.0.0",
"qrcode": "^1.5.1",
"qrcode-terminal": "^0.12.0",
"uuid": "^9.0.0",
"uuid": "^10.0.0",
"ws": "^8.18.0",
"yamljs": "^0.3.0"
},
"devDependencies": {
Expand All @@ -84,7 +85,8 @@
"@types/node": "^20.12.12",
"@types/qrcode": "^1.5.5",
"@types/qrcode-terminal": "^0.12.2",
"@types/uuid": "^9.0.7",
"@types/uuid": "^10.0.0",
"@types/ws": "^8.5.10",
"@types/yamljs": "^0.2.34",
"@typescript-eslint/eslint-plugin": "6.21.0",
"@typescript-eslint/parser": "6.21.0",
Expand Down
11 changes: 10 additions & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ import { ErrorMiddle } from './middle/error.middle';
import 'express-async-errors';
import { docsRouter } from './config/scala.config';
import { ProviderFiles } from './provider/sessions';
import { Websocket } from './websocket/server';
import { createServer } from 'http';

export function describeRoutes(
rootPath: string,
Expand Down Expand Up @@ -105,6 +107,7 @@ export enum HttpStatus {

export async function AppModule(context: Map<string, any>) {
const app = express();
const server = createServer(app);

const configService = new ConfigService();

Expand All @@ -118,11 +121,16 @@ export async function AppModule(context: Map<string, any>) {
await repository.onModuleInit();
logger.info('Repository - ON');

const wss = new Websocket(configService);
wss.server(server);
logger.info('WebSocket Server - ON');

const waMonitor = new WAMonitoringService(
eventEmitter,
configService,
repository,
providerFiles,
wss,
);

logger.info('WAMonitoringService - ON');
Expand Down Expand Up @@ -151,6 +159,7 @@ export async function AppModule(context: Map<string, any>) {
eventEmitter,
instanceService,
providerFiles,
wss,
);
logger.info('InstanceController - ON');

Expand Down Expand Up @@ -225,7 +234,7 @@ export async function AppModule(context: Map<string, any>) {
await providerFiles.onModuleDestroy();
};

context.set('app', app);
context.set('app', server);
context.set('module:logger', logger);
context.set('module:repository', repository);
context.set('module:redisCache', providerFiles);
Expand Down
19 changes: 13 additions & 6 deletions src/config/env.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ export class ConfigService {
CONTACTS: process.env?.DATABASE_SAVE_DATA_CONTACTS === 'true',
CHATS: process.env?.DATABASE_SAVE_DATA_CHATS === 'true',
LOGS: process.env?.DATABASE_SAVE_LOGS === 'true',
ACTIVITY_LOGS: process.env?.DATABASE_SAVE_ACTIVITY_LOGS ? process.env?.DATABASE_SAVE_ACTIVITY_LOGS === 'true' : true
ACTIVITY_LOGS: process.env?.DATABASE_SAVE_ACTIVITY_LOGS
? process.env?.DATABASE_SAVE_ACTIVITY_LOGS === 'true'
: true,
},
},
PROVIDER: {
Expand All @@ -191,9 +193,10 @@ export class ConfigService {
LEVEL: process.env?.LOG_LEVEL.split('|') as LogLevel[],
COLOR: process.env?.LOG_COLOR === 'true',
},
INSTANCE_EXPIRATION_TIME: process.env?.INSTANCE_EXPIRATION_TIME === 'false'
? false
: Number.parseInt(process.env?.INSTANCE_EXPIRATION_TIME || '5'),
INSTANCE_EXPIRATION_TIME:
process.env?.INSTANCE_EXPIRATION_TIME === 'false'
? false
: Number.parseInt(process.env?.INSTANCE_EXPIRATION_TIME || '5'),
GLOBAL_WEBHOOK: {
URL: process.env?.WEBHOOK_GLOBAL_URL,
ENABLED: process.env?.WEBHOOK_GLOBAL_ENABLED === 'true',
Expand All @@ -205,8 +208,12 @@ export class ConfigService {
QRCODE: {
LIMIT: Number.parseInt(process.env?.QRCODE_LIMIT || '10'),
EXPIRATION_TIME: Number.parseInt(process.env?.QRCODE_EXPIRATION_TIME || '60'),
LIGHT_COLOR: process.env?.QRCODE_LIGHT_COLOR ? process.env?.QRCODE_LIGHT_COLOR : '#ffffff',
DARK_COLOR: process.env?.QRCODE_DARK_COLOR ? process.env?.QRCODE_DARK_COLOR : '#198754'
LIGHT_COLOR: process.env?.QRCODE_LIGHT_COLOR
? process.env?.QRCODE_LIGHT_COLOR
: '#ffffff',
DARK_COLOR: process.env?.QRCODE_DARK_COLOR
? process.env?.QRCODE_DARK_COLOR
: '#198754',
},
CONNECTION_TIMEOUT: Number.parseInt(process.env?.CONNECTION_TIMEOUT || '300'),
AUTHENTICATION: {
Expand Down
108 changes: 108 additions & 0 deletions src/websocket/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
## Websocket

### 1. Inicialização da Conexão WebSocket

O processo começa com a criação de uma nova instância do objeto `WebSocket`. O URL para a conexão inclui o caminho para o servidor WebSocket, um parâmetro de evento que especifica o tipo de evento ao qual o cliente está se inscrevendo e um token de autenticação. Este token será usado pelo servidor para verificar se o cliente tem permissão para conectar e receber esses eventos.

```javascript
const ws = new WebSocket(`${url}?event=${encodeURIComponent(eventName)}&token=${encodeURIComponent(instanceToken)}`);
```

A URL é construída para incluir:

- **`event`**: um parâmetro que especifica o tipo de evento de interesse, facilitando ao servidor rotear ou filtrar as mensagens de acordo com o tipo de dado ou comando desejado.
- **`token`**: um parâmetro de segurança que provavelmente é usado pelo servidor para autenticar e autorizar a conexão.

### 2. Manipuladores de Eventos

**`onopen`**:
- Este evento é disparado quando a conexão WebSocket é estabelecida com sucesso.
- No manipulador de `onopen`, uma mensagem de teste é enviada imediatamente ao servidor. Isso pode ser usado para confirmar que a via de comunicação está funcionando ou para informar ao servidor sobre o estado inicial desejado do cliente.

```javascript
ws.onopen = () => {
console.log('Connected to the server');
ws.send(JSON.stringify({ message: "test data" }));
};
```

**`onmessage`**:
- Este evento é acionado sempre que uma mensagem é recebida do servidor.
- As mensagens recebidas são tratadas convertendo o conteúdo de `event.data` de uma string JSON para um objeto JavaScript, que é então passado para a função `callback` fornecida pelo usuário do script.

```javascript
ws.onmessage = (event) => {
if (callback) {
const data = JSON.parse(event.data)
callback(data, event)
}
};
```

**`onerror`**:
- Acionado quando ocorre um erro na conexão WebSocket.
- Pode ser usado para logar ou tratar erros de rede, falhas de transmissão, etc.

```javascript
ws.onerror = (error) => {
console.log('Error:', error);
};
```

**`onclose`**:
- Este evento é acionado quando a conexão WebSocket é fechada, seja por iniciativa do cliente, do servidor, ou devido a falhas de rede.
- O manipulador de eventos `onclose` tenta automaticamente reconectar-se ao servidor após um intervalo definido. Importante corrigir aqui, o `setTimeout` deve chamar `socket(eventName, callback)` ao invés de `socket(event)`, para garantir que a reconexão seja feita corretamente.

```javascript
ws.onclose = (event) => {
console.log(`Connection closed with code ${event.code} and reason ${event.reason}, attempting to reconnect...`);
setTimeout(() => socket(eventName, callback), reconnectInterval);
};
```

### 3. Reconexão Automática

- Após a conexão ser fechada, o cliente tenta se reconectar usando um intervalo de tempo definido (`reconnectInterval`). Este comportamento garante que o cliente tente manter uma conexão persistente mesmo em face de problemas de rede ou reinícios do servidor.

### 4. Exemplo completo

```javascript
const url = 'ws://localhost:8084/ws/events';
const reconnectInterval = 5000; // 5 segundos

function socket(eventName, callback) {
const ws = new WebSocket(`${url}?event=${encodeURIComponent(eventName)}&token=${encodeURIComponent("{{auth.token}}")}`);

ws.onopen = () => {
console.log('Connected to the server');
};

ws.onmessage = (event) => {
if (callback) {
const data = JSON.parse(event.data)
callback(data, event)
}
};

ws.onerror = (error) => {
console.log('Error:', error);
};

ws.onclose = (event) => {
console.log(`Connection closed with code ${event.code} and reason ${event.reason}, attempting to reconnect...`);
setTimeout(() => socket(event), reconnectInterval);
};
}

// Será criado uma instância da função para cada evento
// Os eventos são os mesmos disparados pela webhook
// exceto o evento "messaging-history.set"

socket("connection.update", (msg, event) => {
console.log(msg)
})

socket("messages.upsert", (msg, event) => {
console.log(msg)
})
```
123 changes: 123 additions & 0 deletions src/websocket/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* ┌──────────────────────────────────────────────────────────────────────────────┐
* │ @author jrCleber │
* │ @filename server.ts │
* │ Developed by: Cleber Wilson │
* │ Creation date: Jul 08, 2024 │
* │ Contact: [email protected]
* ├──────────────────────────────────────────────────────────────────────────────┤
* │ @copyright © Cleber Wilson 2022. All rights reserved. │
* │ Licensed under the Apache License, Version 2.0 │
* │ │
* │ @license "https://github.com/code-chat-br/whatsapp-api/blob/main/LICENSE" │
* │ │
* │ You may not use this file except in compliance with the License. │
* │ You may obtain a copy of the License at │
* │ │
* │ http://www.apache.org/licenses/LICENSE-2.0 │
* │ │
* │ Unless required by applicable law or agreed to in writing, software │
* │ distributed under the License is distributed on an "AS IS" BASIS, │
* │ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. │
* │ │
* │ See the License for the specific language governing permissions and │
* │ limitations under the License. │
* │ │
* │ @class │
* │ @constructs Websocket │
* │ @param {ConfigService} configService │
* ├──────────────────────────────────────────────────────────────────────────────┤
* │ @important │
* │ For any future changes to the code in this file, it is recommended to │
* │ contain, together with the modification, the information of the developer │
* │ who changed it and the date of modification. │
* └──────────────────────────────────────────────────────────────────────────────┘
*/

import Ws from 'ws';
import { Logger } from '../config/logger.config';
import { Auth, ConfigService } from '../config/env.config';
import { Server } from 'http';
import { isJWT } from 'class-validator';
import { verify } from 'jsonwebtoken';
import { JwtPayload } from '../whatsapp/services/instance.service';
import { EventsType, ListEvents } from '../whatsapp/dto/webhook.dto';

export class Websocket {
constructor(private readonly configService: ConfigService) {}

private readonly logger = new Logger(this.configService, Websocket.name);

private readonly hub = new Map<string, Ws>();

send<T>(instance: string, event: EventsType, data: T): boolean {
const key = `${instance}_${event}`;
const client = this.hub.get(key);

if (!client) {
return;
}

const json = JSON.stringify(data);
client.send(json);
}

server(server: Server) {
const wss = new Ws.Server({ noServer: true });

let key = '';
let canActivate = false;

wss.on('connection', (ws, req) => {
if (!canActivate) {
ws.close(401, ' HTTP/1.1 401 Unauthorized');
return;
}
});

server.on('upgrade', (req, socket, head) => {
const url = new URL(req.url, `http://${req.headers.host}`);
const params = url.searchParams;

const event = params.get('event') as EventsType;
const token = params.get('token');

try {
if (
url.pathname !== '/ws/events' ||
!event ||
!token ||
!isJWT(token) ||
!ListEvents.includes(event)
) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}

const jwtOpts = this.configService.get<Auth>('AUTHENTICATION').JWT;
const decode = verify(token, jwtOpts.SECRET, {
ignoreExpiration: jwtOpts.EXPIRIN_IN === 0,
}) as JwtPayload;

canActivate = true;

if (!canActivate) {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
key = `${decode.instanceName}_${event}`;

wss.handleUpgrade(req, socket, head, (socket) => {
wss.emit('connection', socket, req);
});
} catch (error) {
this.logger.error(error);
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
});
}
}
Loading

0 comments on commit 1ea62aa

Please sign in to comment.