Skip to content

Commit

Permalink
Merge pull request #105 from usdot-jpo-ode/tim-dedup-update
Browse files Browse the repository at this point in the history
Tim dedup update
  • Loading branch information
John-Wiens authored Aug 21, 2024
2 parents f5c775d + 3b43198 commit 4ca677a
Show file tree
Hide file tree
Showing 29 changed files with 957 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,6 @@ public boolean detectConflict(LaneConnection otherConnection) {

if (connectingPath.intersects(otherConnection.connectingPath)) {
if (this.signalGroup == otherConnection.signalGroup) {
// System.out.println("Conflict Detected between " + this.ingress.getLaneID() + ","
// + this.egress.getLaneID() + " and " + otherConnection.ingress.getLaneID() + ","
// + otherConnection.egress.getLaneID());
// System.out.println("First Path");
// printConnectingPath();
// System.out.println("Second Path");
// otherConnection.printConnectingPath();

return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,31 @@ public class DeduplicatorProperties implements EnvironmentAware {
private String kafkaTopicProcessedMap;
private String kafkaTopicDeduplicatedProcessedMap;
private boolean enableProcessedMapDeduplication;
private String kafkaStateStoreProcessedMapName = "ProcessedMap-store";

// Processed Map WKT Configuration
private String kafkaTopicProcessedMapWKT;
private String kafkaTopicDeduplicatedProcessedMapWKT;
private boolean enableProcessedMapWktDeduplication;
private String kafkaStateStoreProcessedMapWKTName = "ProcessedMapWKT-store";

// Ode Map Json Configuration
private String kafkaTopicOdeMapJson;
private String kafkaTopicDeduplicatedOdeMapJson;
private boolean enableOdeMapDeduplication;
private String kafkaStateStoreOdeMapJsonName = "OdeMapJson-store";

// Ode Tim Json Configuration
private String kafkaTopicOdeTimJson;
private String kafkaTopicDeduplicatedOdeTimJson;
private boolean enableOdeTimDeduplication;
private String kafkaStateStoreOdeTimJsonName = "OdeTimJson-store";

// Ode Raw Encoded Tim Json Configuration
private String kafkaTopicOdeRawEncodedTimJson;
private String kafkaTopicDeduplicatedOdeRawEncodedTimJson;
private boolean enableOdeRawEncodedTimDeduplication;
private String kafkaStateStoreOdeRawEncodedTimJsonName = "OdeRawEncodedTimJson-store";

//Ode BsmJson Configuration
private String kafkaTopicOdeBsmJson;
Expand All @@ -80,6 +90,7 @@ public class DeduplicatorProperties implements EnvironmentAware {
private long odeBsmMaximumTimeDelta;
private double odeBsmMaximumPositionDelta;
private double odeBsmAlwaysIncludeAtSpeed;
private String kafkaStateStoreOdeBsmJsonName = "OdeBsmJson-store";

// Confluent Properties
private boolean confluentCloudEnabled = false;
Expand Down Expand Up @@ -344,6 +355,20 @@ public void setEnableOdeTimDeduplication(boolean enableOdeTimDeduplication) {
this.enableOdeTimDeduplication = enableOdeTimDeduplication;
}

@Value("${kafkaTopicOdeRawEncodedTimJson}")
public void setKafkaTopicOdeRawEncodedTimJson(String kafkaTopicOdeRawEncodedTimJson) {
this.kafkaTopicOdeRawEncodedTimJson = kafkaTopicOdeRawEncodedTimJson;
}

@Value("${kafkaTopicDeduplicatedOdeRawEncodedTimJson}")
public void setKafkaTopicDeduplicatedOdeRawEncodedTimJson(String kafkaTopicDeduplicatedOdeRawEncodedTimJson) {
this.kafkaTopicDeduplicatedOdeRawEncodedTimJson = kafkaTopicDeduplicatedOdeRawEncodedTimJson;
}

@Value("${enableOdeRawEncodedTimDeduplication}")
public void setEnableOdeRawEncodedTimDeduplication(boolean enableOdeRawEncodedTimDeduplication) {
this.enableOdeRawEncodedTimDeduplication = enableOdeRawEncodedTimDeduplication;
}

@Value("${kafkaTopicOdeBsmJson}")
public void setkafkaTopicOdeBsmJson(String kafkaTopicOdeBsmJson) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.BsmDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.MapDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.TimDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.OdeRawEncodedTimDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedMapDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedMapWktDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.TimDeduplicatorTopology;

@Controller
@DependsOn("createKafkaTopics")
Expand All @@ -46,40 +47,44 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,

if(props.isEnableProcessedMapDeduplication()){
ProcessedMapDeduplicatorTopology processedMapDeduplicatorTopology = new ProcessedMapDeduplicatorTopology(
props.getKafkaTopicProcessedMap(),
props.getKafkaTopicDeduplicatedProcessedMap(),
props,
props.createStreamProperties("ProcessedMapDeduplicator")
);
processedMapDeduplicatorTopology.start();
}

if(props.isEnableProcessedMapWktDeduplication()){
ProcessedMapWktDeduplicatorTopology processedMapWktDeduplicatorTopology = new ProcessedMapWktDeduplicatorTopology(
props.getKafkaTopicProcessedMapWKT(),
props.getKafkaTopicDeduplicatedProcessedMapWKT(),
props,
props.createStreamProperties("ProcessedMapWKTdeduplicator")
);
processedMapWktDeduplicatorTopology.start();
}

if(props.isEnableProcessedMapDeduplication()){
MapDeduplicatorTopology mapDeduplicatorTopology = new MapDeduplicatorTopology(
props.getKafkaTopicOdeMapJson(),
props.getKafkaTopicDeduplicatedOdeMapJson(),
props,
props.createStreamProperties("MapDeduplicator")
);
mapDeduplicatorTopology.start();
}

if(props.isEnableOdeTimDeduplication()){
TimDeduplicatorTopology timDeduplicatorTopology = new TimDeduplicatorTopology(
props.getKafkaTopicOdeTimJson(),
props.getKafkaTopicDeduplicatedOdeTimJson(),
props,
props.createStreamProperties("TimDeduplicator")
);
timDeduplicatorTopology.start();
}

if(props.isEnableOdeRawEncodedTimDeduplication()){
OdeRawEncodedTimDeduplicatorTopology odeRawEncodedTimDeduplicatorTopology = new OdeRawEncodedTimDeduplicatorTopology(
props,
props.createStreamProperties("OdeRawEncodedTimDeduplicator")
);
odeRawEncodedTimDeduplicatorTopology.start();
}

if(props.isEnableOdeBsmDeduplication()){
BsmDeduplicatorTopology bsmDeduplicatorTopology = new BsmDeduplicatorTopology(props);
bsmDeduplicatorTopology.start();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package us.dot.its.jpo.deduplicator.deduplicator.processors;

import java.time.Duration;
import java.time.Instant;

import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;

import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.KeyValue;

public abstract class DeduplicationProcessor<T> implements Processor<String, T, String, T>{

private ProcessorContext<String, T> context;
private KeyValueStore<String, T> store;
public String storeName;

@Override
public void init(ProcessorContext<String, T> context) {
this.context = context;
store = context.getStateStore(storeName);
this.context.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, this::cleanupOldKeys);
}

@Override
public void process(Record<String, T> record) {

// Don't do anything if key is bad
if(record.key().equals("")){
return;
}

T lastRecord = store.get(record.key());
if(lastRecord == null){
store.put(record.key(), record.value());
context.forward(record);
return;
}

if(!isDuplicate(lastRecord, record.value())){
store.put(record.key(), record.value());
context.forward(record);
return;
}
}

private void cleanupOldKeys(final long timestamp) {
try (KeyValueIterator<String, T> iterator = store.all()) {
while (iterator.hasNext()) {

KeyValue<String, T> record = iterator.next();
// Delete any record more than 2 hours old.
if(Instant.ofEpochMilli(timestamp).minusSeconds(2 * 60 * 60).isAfter(getMessageTime(record.value))){
store.delete(record.key);
}
}
}
}

// returns an instant representing the time of the message
public abstract Instant getMessageTime(T message);

// returns if two messages are duplicates of one another
public abstract boolean isDuplicate(T lastMessage, T newMessage);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package us.dot.its.jpo.deduplicator.deduplicator.processors;


import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;

import org.geotools.referencing.GeodeticCalculator;

import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm;
import us.dot.its.jpo.ode.plugin.j2735.J2735BsmCoreData;

public class OdeBsmJsonProcessor extends DeduplicationProcessor<OdeBsmData>{

DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
DeduplicatorProperties props;
GeodeticCalculator calculator;

public OdeBsmJsonProcessor(String storeName, DeduplicatorProperties props){
this.storeName = storeName;
this.props = props;
calculator = new GeodeticCalculator();
}


@Override
public Instant getMessageTime(OdeBsmData message) {
try {
String time = ((OdeBsmMetadata)message.getMetadata()).getOdeReceivedAt();
return Instant.from(formatter.parse(time));
} catch (Exception e) {
System.out.println("Failed to Parse Time");
return Instant.ofEpochMilli(0);
}
}

@Override
public boolean isDuplicate(OdeBsmData lastMessage, OdeBsmData newMessage) {
Instant newValueTime = getMessageTime(newMessage);
Instant oldValueTime = getMessageTime(lastMessage);

// If the messages are more than a certain time apart, forward the new message on
if(newValueTime.minus(Duration.ofMillis(props.getOdeBsmMaximumTimeDelta())).isAfter(oldValueTime)){
return false;
}

J2735BsmCoreData oldCore = ((J2735Bsm)lastMessage.getPayload().getData()).getCoreData();
J2735BsmCoreData newCore = ((J2735Bsm)newMessage.getPayload().getData()).getCoreData();


// If the Vehicle is moving, forward the message on
if(newCore.getSpeed().doubleValue() > props.getOdeBsmAlwaysIncludeAtSpeed()){
return false;
}


double distance = calculateGeodeticDistance(
newCore.getPosition().getLatitude().doubleValue(),
newCore.getPosition().getLongitude().doubleValue(),
oldCore.getPosition().getLatitude().doubleValue(),
oldCore.getPosition().getLongitude().doubleValue()
);

// If the position delta between the messages is suitable large, forward the message on
if(distance > props.getOdeBsmMaximumPositionDelta()){
return false;
}

return true;
}

public double calculateGeodeticDistance(double lat1, double lon1, double lat2, double lon2) {
calculator.setStartingGeographicPoint(lon1, lat1);
calculator.setDestinationGeographicPoint(lon2, lat2);
return calculator.getOrthodromicDistance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package us.dot.its.jpo.deduplicator.deduplicator.processors;

import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Objects;

import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.ode.model.OdeMapData;
import us.dot.its.jpo.ode.model.OdeMapMetadata;
import us.dot.its.jpo.ode.model.OdeMapPayload;

public class OdeMapJsonProcessor extends DeduplicationProcessor<OdeMapData>{

DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;

DeduplicatorProperties props;

public OdeMapJsonProcessor(DeduplicatorProperties props){
this.props = props;
this.storeName = props.getKafkaStateStoreOdeMapJsonName();
}


@Override
public Instant getMessageTime(OdeMapData message) {
try {
String time = ((OdeMapMetadata)message.getMetadata()).getOdeReceivedAt();
return Instant.from(formatter.parse(time));
} catch (Exception e) {
return Instant.ofEpochMilli(0);
}
}

@Override
public boolean isDuplicate(OdeMapData lastMessage, OdeMapData newMessage) {

Instant newValueTime = getMessageTime(newMessage);
Instant oldValueTime = getMessageTime(lastMessage);

if(newValueTime.minus(Duration.ofHours(1)).isAfter(oldValueTime)){
return false;

}else{
OdeMapPayload oldPayload = (OdeMapPayload)lastMessage.getPayload();
OdeMapPayload newPayload = (OdeMapPayload)newMessage.getPayload();

Integer oldTimestamp = oldPayload.getMap().getTimeStamp();
Integer newTimestamp = newPayload.getMap().getTimeStamp();


newPayload.getMap().setTimeStamp(oldTimestamp);

int oldHash = hashMapMessage(lastMessage);
int newhash = hashMapMessage(newMessage);

if(oldHash != newhash){
newPayload.getMap().setTimeStamp(newTimestamp);
return false;
}
}
return true;
}

public int hashMapMessage(OdeMapData map){
OdeMapPayload payload = (OdeMapPayload)map.getPayload();
return Objects.hash(payload.toJson());

}
}
Loading

0 comments on commit 4ca677a

Please sign in to comment.