Skip to content

Commit

Permalink
Added support for Processed BSM's to mongo and Kafka Connect
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Wiens committed Sep 6, 2024
1 parent 905d5d1 commit a193d52
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docker/connect/connect_start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ declare -A OdeRawEncodedBSMJson=([name]="topic.OdeRawEncodedBSMJson" [collection
[convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
declare -A OdeBsmJson=([name]="topic.OdeBsmJson" [collection]="OdeBsmJson"
[convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
declare -A ProcessedBsm=([name]="topic.ProcessedBsm" [collection]="ProcessedBsm"
[convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)

# Record Map Data
declare -A OdeMapJson=([name]="topic.DeduplicatedOdeMapJson" [collection]="OdeMapJson"
Expand Down Expand Up @@ -214,6 +216,7 @@ function createSink() {

createSink OdeRawEncodedBSMJson
createSink OdeBsmJson
createSink ProcessedBsm

createSink OdeMapJson
createSink ProcessedMap
Expand Down
38 changes: 38 additions & 0 deletions docker/mongo/b_create_indexes.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const collections = [
// GeoJson Converter Data
{name: "ProcessedMap", ttlField: "recordGeneratedAt", timeField: "properties.timeStamp", intersectionField: "properties.intersectionId"},
{name: "ProcessedSpat", ttlField: "recordGeneratedAt", timeField: "utcTimeStamp", intersectionField: "intersectionId"},
{name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "features.geometry.coordinates"},

// Conflict Monitor Events
{ name: "CmStopLineStopEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID" },
Expand Down Expand Up @@ -145,6 +146,7 @@ do {
createTimeIntersectionIndex(collection);
createTimeRsuIpIndex(collection);
createTimeIndex(collection);
createGeoSpatialIndex(collection);
}else{
missing_collection_count++;
console.log("Collection " + collection.name + " does not exist yet");
Expand Down Expand Up @@ -318,6 +320,38 @@ function createTimeIntersectionIndex(collection){
}
}

function createGeoSpatialIndex(collection){
if(geoSpatialIndexExists(collection)){
return;
}

if(collection.hasOwnProperty("timeField") && collection.timeField != null && collection.hasOwnProperty("geoSpatialField") && collection.geoSpatialField != null){
const collectionName = collection.name;
const timeField = collection.timeField;
const geoSpatialField = collection.geoSpatialField;
console.log("Creating GeoSpatial index for " + collectionName);

var indexJson = {};
indexJson[geoSpatialField] = "2dsphere";
indexJson[timeField] = -1;


try {
db[collectionName].createIndex(indexJson);
console.log("Created time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field");
} catch (err) {
db.runCommand({
"collMod": collectionName,
"index": {
keyPattern: indexJson
}
});
console.log("Updated time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field");
}
}

}

function ttlIndexExists(collection) {
return db[collection.name].getIndexes().find((idx) => idx.hasOwnProperty("expireAfterSeconds")) !== undefined;
}
Expand All @@ -333,3 +367,7 @@ function timeRsuIpIndexExists(collection){
function timeIndexExists(collection){
return db[collection.name].getIndexes().find((idx) => idx.name == collection.timeField + "_-1") !== undefined;
}

function geoSpatialIndexExists(collection){
return db[collection.name].getIndexes().find((idx) => idx.name == collection.geoSpatialField + "_2dsphere_timeStamp_-1") !== undefined;
}

0 comments on commit a193d52

Please sign in to comment.