forked from vert-x3/vertx-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.js
101 lines (73 loc) · 3.6 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
var MqttServer = require("vertx-mqtt-server-js/mqtt_server");
var Buffer = require("vertx-js/buffer");
var mqttServer = MqttServer.create(vertx);
mqttServer.endpointHandler(function (endpoint) {
// shows main connect info
console.log("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
if ((endpoint.auth() !== null && endpoint.auth() !== undefined)) {
console.log("[username = " + endpoint.auth().userName() + ", password = " + endpoint.auth().password() + "]");
}
if ((endpoint.will() !== null && endpoint.will() !== undefined)) {
console.log("[will flag = " + endpoint.will().isWillFlag() + " topic = " + endpoint.will().willTopic() + " msg = " + endpoint.will().willMessage() + " QoS = " + endpoint.will().willQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
}
console.log("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
// accept connection from the remote client
endpoint.accept(false);
// handling requests for subscriptions
endpoint.subscribeHandler(function (subscribe) {
var grantedQosLevels = [];
Array.prototype.forEach.call(subscribe.topicSubscriptions(), function(s) {
console.log("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
grantedQosLevels.push(s.qualityOfService());
});
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
// just as example, publish a message on the first topic with requested QoS
endpoint.publish(subscribe.topicSubscriptions()[0].topicName(), Buffer.buffer("Hello from the Vert.x MQTT server"), subscribe.topicSubscriptions()[0].qualityOfService(), false, false);
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(function (messageId) {
console.log("Received ack for message = " + messageId);
}).publishReceivedHandler(function (messageId) {
endpoint.publishRelease(messageId);
}).publishCompletionHandler(function (messageId) {
console.log("Received ack for message = " + messageId);
});
});
// handling requests for unsubscriptions
endpoint.unsubscribeHandler(function (unsubscribe) {
Array.prototype.forEach.call(unsubscribe.topics(), function(t) {
console.log("Unsubscription for " + t);
});
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
// handling ping from client
endpoint.pingHandler(function (v) {
console.log("Ping received from client");
});
// handling disconnect message
endpoint.disconnectHandler(function (v) {
console.log("Received disconnect from client");
});
// handling closing connection
endpoint.closeHandler(function (v) {
console.log("Connection closed");
});
// handling incoming published messages
endpoint.publishHandler(function (message) {
console.log("Just received message on [" + message.topicName() + "] payload [" + message.payload() + "] with QoS [" + message.qosLevel() + "]");
if (message.qosLevel() === 'AT_LEAST_ONCE') {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() === 'EXACTLY_ONCE') {
endpoint.publishReceived(message.messageId());
}
}).publishReleaseHandler(function (messageId) {
endpoint.publishComplete(messageId);
});
}).listen(1883, "0.0.0.0", function (ar, ar_err) {
if (ar_err == null) {
console.log("MQTT server is listening on port " + mqttServer.actualPort());
} else {
console.error("Error on starting the server" + ar_err.getMessage());
}
});