From 1a56ddb26611db925496366df618abd6824aa355 Mon Sep 17 00:00:00 2001 From: Sean Matthews Date: Wed, 16 Oct 2024 20:40:30 -0400 Subject: [PATCH] Add core model for Websocket Connections TODO [] Decouple from/abstract the AWS implementation, implement a Core Extension for AWS [] Add more helper functions [] Pair with a cleanup function to purge the DB of short lived connection --- packages/core/database/index.js | 18 ++++--- .../database/models/WebsocketConnection.js | 49 +++++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 packages/core/database/models/WebsocketConnection.js diff --git a/packages/core/database/index.js b/packages/core/database/index.js index 13e4f967d..17c596292 100644 --- a/packages/core/database/index.js +++ b/packages/core/database/index.js @@ -1,14 +1,15 @@ -const { mongoose} = require('./mongoose'); +const { mongoose } = require('./mongoose'); const { connectToDatabase, disconnectFromDatabase, createObjectId, } = require('./mongo'); -const {IndividualUser} = require('./models/IndividualUser'); -const {OrganizationUser} = require('./models/OrganizationUser'); -const {State} = require('./models/State'); -const {Token} = require('./models/Token'); -const {UserModel} = require('./models/UserModel'); +const { IndividualUser } = require('./models/IndividualUser'); +const { OrganizationUser } = require('./models/OrganizationUser'); +const { State } = require('./models/State'); +const { Token } = require('./models/Token'); +const { UserModel } = require('./models/UserModel'); +const { WebsocketConnection } = require('./models/WebsocketConnection'); module.exports = { mongoose, @@ -19,5 +20,6 @@ module.exports = { OrganizationUser, State, Token, - UserModel -} \ No newline at end of file + UserModel, + WebsocketConnection, +}; diff --git a/packages/core/database/models/WebsocketConnection.js b/packages/core/database/models/WebsocketConnection.js new file mode 100644 index 000000000..9a858f28f --- /dev/null +++ b/packages/core/database/models/WebsocketConnection.js @@ -0,0 +1,49 @@ +const { mongoose } = require('../mongoose'); +const AWS = require('aws-sdk'); + +const schema = new mongoose.Schema({ + connectionId: { type: mongoose.Schema.Types.String }, +}); + +// Add a static method to get active connections +schema.statics.getActiveConnections = async function () { + try { + const connections = await this.find({}, 'connectionId'); + return connections.map((conn) => ({ + connectionId: conn.connectionId, + send: async (data) => { + const apigwManagementApi = new AWS.ApiGatewayManagementApi({ + apiVersion: '2018-11-29', + endpoint: process.env.WEBSOCKET_API_ENDPOINT, + }); + + try { + await apigwManagementApi + .postToConnection({ + ConnectionId: conn.connectionId, + Data: JSON.stringify(data), + }) + .promise(); + } catch (error) { + if (error.statusCode === 410) { + console.log(`Stale connection ${conn.connectionId}`); + await this.deleteOne({ + connectionId: conn.connectionId, + }); + } else { + throw error; + } + } + }, + })); + } catch (error) { + console.error('Error getting active connections:', error); + throw error; + } +}; + +const WebsocketConnection = + mongoose.models.WebsocketConnection || + mongoose.model('WebsocketConnection', schema); + +module.exports = { WebsocketConnection };