Skip to content

Commit

Permalink
Merge pull request #651 from arenaxr/mqtt_metrics
Browse files Browse the repository at this point in the history
feat: Batch MQTT worker messages
  • Loading branch information
hi-liang authored Jul 11, 2024
2 parents f28fde6 + c5d2e7f commit 9f60d8e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 3 deletions.
17 changes: 16 additions & 1 deletion src/systems/core/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ AFRAME.registerSystem('arena-mqtt', {
// }
// }),
);
worker.registerMessageHandler('s', proxy(this.onSceneMessageArrived.bind(this)), true);
this.onSceneMessageArrived = this.onSceneMessageArrived.bind(this);
worker.registerMessageHandler(
's',
proxy((messages) => {
messages.forEach(this.onSceneMessageArrived);
}),
true
);
worker.registerMessageQueue('s', true);
return worker;
},

Expand Down Expand Up @@ -139,6 +147,7 @@ AFRAME.registerSystem('arena-mqtt', {
* @param {object} message
*/
onSceneMessageArrived(message) {
delete message.workerTimestamp;
const theMessage = message.payloadObj; // This will be given as json

if (!theMessage) {
Expand Down Expand Up @@ -229,4 +238,10 @@ AFRAME.registerSystem('arena-mqtt', {
break;
}
},

tock() {
this.MQTTWorker?.tock('s').then((messages) => {
messages.forEach(this.onSceneMessageArrived);
});
},
});
52 changes: 50 additions & 2 deletions src/systems/core/workers/mqtt-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import { expose } from 'comlink';
import * as Paho from 'paho-mqtt'; // https://www.npmjs.com/package/paho-mqtt

const MINTOCKINTERVAL = 3 * 1000;
let lastTock = new Date().getTime();

/**
* Main ARENA MQTT webworker client
*/
Expand All @@ -19,6 +22,10 @@ class MQTTWorker {

connectionLostHandlers = [];

messageQueues = {};

messageQueueConf = {};

/**
* @param {object} ARENAConfig
* @param {function} healthCheck
Expand Down Expand Up @@ -114,14 +121,45 @@ class MQTTWorker {
* @param {Paho.Message} message
*/
onMessageArrivedDispatcher(message) {
const now = new Date().getTime();
const topic = message.destinationName;
const topicCategory = topic.split('/')[1];
const handler = this.messageHandlers[topicCategory];
if (handler) {
handler(message);
const trimmedMessage = {
destinationName: message.destinationName,
payloadString: message.payloadString,
workerTimestamp: now,
};
if (this.messageQueues[topicCategory]) {
if (this.messageQueueConf[topicCategory]) {
try {
trimmedMessage.payloadObj = JSON.parse(message.payloadString);
} catch (e) {
// Ignore
}
}
this.messageQueues[topicCategory].push(trimmedMessage);
// Been too long since last tock (lost focus?), flush queue to main thread instead of waiting for pull
if (now - lastTock > MINTOCKINTERVAL && handler) {
const batch = this.messageQueues[topicCategory];
this.messageQueues[topicCategory] = [];
lastTock = now;
console.log(`Worker flushing ${batch.length} messages for ${topicCategory}`);
handler(batch);
}
} else if (handler) {
handler(trimmedMessage);
}
}

tock(topicCategory) {
const batch = this.messageQueues[topicCategory];
this.messageQueues[topicCategory] = [];
const now = new Date().getTime();
lastTock = now;
return batch;
}

/**
* Register a message handler for a given topic category beneath realm (second level).
* @param {string} topicCategory - the topic category to register a handler for
Expand All @@ -144,6 +182,16 @@ class MQTTWorker {
}
}

/**
* Register a message handler for a given topic category beneath realm (second level).
* @param {string} topicCategory - the topic category to register a handler for
* @param {boolean} isJson - whether the payload is expected to be well-formed json
*/
registerMessageQueue(topicCategory, isJson) {
this.messageQueues[topicCategory] = [];
this.messageQueueConf[topicCategory] = isJson;
}

/**
* Publish to given dest topic
* @param {string} topic
Expand Down

0 comments on commit 9f60d8e

Please sign in to comment.