Skip to content

Commit

Permalink
If applied, this commit will update the latency log registration.
Browse files Browse the repository at this point in the history
  • Loading branch information
UellingtonDamasceno committed Feb 8, 2024
1 parent abd3172 commit f9b5e19
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/device/fot/virtual/app/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public static void main(String[] args) {
MessageLogController.getInstance().setCanSaveData(true);
}

if(CLI.hasParam("-ll", args)){
//if(CLI.hasParam("-ll", args)){
LatencyLogController.getInstance().createAndUpdateFileName(deviceId + "_latency_log.csv");
LatencyLogController.getInstance().start();
LatencyLogController.getInstance().setCanSaveData(true);
}
// }

List<Sensor> sensors = readSensors("sensors.json", deviceId)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ public void startUpdateBroker(BrokerSettings brokerSettings, double timeout, boo

newClient.subscribe(ExtendedTATUWrapper.getConnectionTopicResponse());
newClient.publish(connectionTopic, new MqttMessage(message.getBytes()));

this.brokerSettings = brokerSettings;

this.timeoutCounter.start();

} catch (MqttException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public void messageArrived(String topic, MqttMessage mqttMessage) throws Excepti
MqttMessage mqttResponse = new MqttMessage();
FoTSensor sensor;

System.out.println("============================");
System.out.println("MQTT_MESSAGE: " + new String(mqttMessage.getPayload()));
System.out.println("MESSAGE ID: "+ mqttMessage.getId());
System.out.println("TOPIC: " + topic);
System.out.println("MY_MESSAGE: " + tatuMessage);

Expand Down Expand Up @@ -96,29 +96,7 @@ public void messageArrived(String topic, MqttMessage mqttMessage) throws Excepti

@Override
public void deliveryComplete(IMqttDeliveryToken imdt) {
MqttMessage deliveredMessage;
try {
deliveredMessage = imdt.getMessage();
if(deliveredMessage == null || deliveredMessage.getPayload().length == 0){
return;
}
String messageContent = new String(deliveredMessage.getPayload());
long customTimestamp = TATUWrapper.getMessageTimestamp(messageContent);
if(customTimestamp == 0){
System.out.println("The message"+ messageContent +" don't have timestamp");
}

long latency = System.currentTimeMillis() - customTimestamp;
LatencyLogController.getInstance().putLatency(latency);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("excep "+me);
Logger.getLogger(DefaultFlowCallback.class.getName()).log(Level.SEVERE, null, me);
} catch (InterruptedException ex) {
Logger.getLogger(DefaultFlowCallback.class.getName()).log(Level.SEVERE, null, ex);
}
LatencyLogController.getInstance().calculateLatancy(imdt.getResponse().getMessageId() - 5);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,44 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.json.JSONObject;

public class LatencyLogController extends PersistenceController<Long> {

private static LatencyLogController latencyLogController = new LatencyLogController();
private static final LatencyLogController latencyLogController = new LatencyLogController();

private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss.SSS");
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private final Map<Integer, String> messages;

private LatencyLogController() {
super("latency_log.csv");
this.messages = new HashMap<>();
}

public synchronized static LatencyLogController getInstance() {
return latencyLogController;
}

public void putLatency(Long latency) throws InterruptedException {
if (canSaveData) {
buffer.put(latency);
public void putNewMessage(int id, String message) {
this.messages.put(id, message);
}

public void calculateLatancy(int messageId) {
if (!this.messages.containsKey(messageId)) {
System.out.println("Não tem mensagem id: "+messageId);
return;
}
String messageContent = this.messages.remove(messageId);
long customTimestamp = new JSONObject(messageContent).getJSONObject("HEADER").getLong("TIMESTAMP");

if (customTimestamp <= 0) {
System.out.println("The message" + messageContent + " don't have timestamp");
}

long latency = System.currentTimeMillis() - customTimestamp;
this.buffer.add(latency);
}

private String buildLogLatencyLine(Long latency) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected void write(List<String> lines) {
lines.forEach(line -> {
try {
w.write(line);
System.out.println(this.fileName + "::" + line);
w.newLine();
} catch (IOException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, null, ex);
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/com/device/fot/virtual/model/FoTSensor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.device.fot.virtual.model;

import com.device.fot.virtual.controller.LatencyLogController;
import com.device.fot.virtual.controller.MessageLogController;
import java.util.LinkedList;
import java.util.Random;
Expand All @@ -10,6 +11,7 @@

import extended.tatu.wrapper.model.Sensor;
import extended.tatu.wrapper.util.TATUWrapper;
import java.util.concurrent.atomic.AtomicInteger;

/**
*
Expand All @@ -25,9 +27,11 @@ public class FoTSensor extends Sensor implements Runnable {

private String flowThreadName;
private Random random;

private int lastValue;

private final static AtomicInteger messageId = new AtomicInteger(0);

public FoTSensor(String deviceId, Sensor sensor) {
this(deviceId,
sensor.getId(),
Expand Down Expand Up @@ -97,6 +101,7 @@ public void startFlow(int newFlowCollect, int newFlowPublish) {
this.collectionTime = newFlowCollect;
this.publishingTime = newFlowPublish;
if (thread == null || !thread.isAlive()) {
System.out.println("Sensor type: " + type + " starting flow.");
this.thread = new Thread(this);
this.thread.setName(flowThreadName);
this.thread.start();
Expand Down Expand Up @@ -150,12 +155,17 @@ public void run() {
String topic = TATUWrapper.buildTATUResponseTopic(deviceId);
this.flow = true;
this.running = true;
MqttMessage mqttMessage;
while (thread.isAlive() && this.running && this.flow) {
try {
var data = this.getDataFlow();
msg = TATUWrapper.buildFlowMessageResponse(deviceId, id, publishingTime, collectionTime,
data.getValues().toArray());
this.publisher.publish(topic, new MqttMessage(msg.getBytes()));
mqttMessage = new MqttMessage(msg.getBytes());
mqttMessage.setId(messageId.getAndIncrement());
mqttMessage.setQos(2);
LatencyLogController.getInstance().putNewMessage(mqttMessage.getId(), msg);
this.publisher.publish(topic, mqttMessage);
MessageLogController.getInstance().putData(data);
} catch (InterruptedException | MqttException ex) {
this.running = false;
Expand All @@ -181,5 +191,5 @@ public String toString() {
sb.append('}');
return sb.toString();
}

}

0 comments on commit f9b5e19

Please sign in to comment.