From a193d5236ee8a755e209efb2598268eb1623bbbe Mon Sep 17 00:00:00 2001 From: John-Wiens Date: Fri, 6 Sep 2024 10:08:24 -0600 Subject: [PATCH] Added support for Processed BSM's to mongo and Kafka Connect --- docker/connect/connect_start.sh | 3 +++ docker/mongo/b_create_indexes.js | 38 ++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/docker/connect/connect_start.sh b/docker/connect/connect_start.sh index f7a6e75b..9849f9a4 100644 --- a/docker/connect/connect_start.sh +++ b/docker/connect/connect_start.sh @@ -16,6 +16,8 @@ declare -A OdeRawEncodedBSMJson=([name]="topic.OdeRawEncodedBSMJson" [collection [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) declare -A OdeBsmJson=([name]="topic.OdeBsmJson" [collection]="OdeBsmJson" [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) +declare -A ProcessedBsm=([name]="topic.ProcessedBsm" [collection]="ProcessedBsm" + [convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true) # Record Map Data declare -A OdeMapJson=([name]="topic.DeduplicatedOdeMapJson" [collection]="OdeMapJson" @@ -214,6 +216,7 @@ function createSink() { createSink OdeRawEncodedBSMJson createSink OdeBsmJson +createSink ProcessedBsm createSink OdeMapJson createSink ProcessedMap diff --git a/docker/mongo/b_create_indexes.js b/docker/mongo/b_create_indexes.js index fb294f5f..b861d9f7 100644 --- a/docker/mongo/b_create_indexes.js +++ b/docker/mongo/b_create_indexes.js @@ -66,6 +66,7 @@ const collections = [ // GeoJson Converter Data {name: "ProcessedMap", ttlField: "recordGeneratedAt", timeField: "properties.timeStamp", intersectionField: "properties.intersectionId"}, {name: "ProcessedSpat", ttlField: "recordGeneratedAt", timeField: "utcTimeStamp", intersectionField: "intersectionId"}, + {name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "features.geometry.coordinates"}, // Conflict Monitor Events { name: "CmStopLineStopEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID" }, @@ -145,6 +146,7 @@ do { createTimeIntersectionIndex(collection); createTimeRsuIpIndex(collection); createTimeIndex(collection); + createGeoSpatialIndex(collection); }else{ missing_collection_count++; console.log("Collection " + collection.name + " does not exist yet"); @@ -318,6 +320,38 @@ function createTimeIntersectionIndex(collection){ } } +function createGeoSpatialIndex(collection){ + if(geoSpatialIndexExists(collection)){ + return; + } + + if(collection.hasOwnProperty("timeField") && collection.timeField != null && collection.hasOwnProperty("geoSpatialField") && collection.geoSpatialField != null){ + const collectionName = collection.name; + const timeField = collection.timeField; + const geoSpatialField = collection.geoSpatialField; + console.log("Creating GeoSpatial index for " + collectionName); + + var indexJson = {}; + indexJson[geoSpatialField] = "2dsphere"; + indexJson[timeField] = -1; + + + try { + db[collectionName].createIndex(indexJson); + console.log("Created time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field"); + } catch (err) { + db.runCommand({ + "collMod": collectionName, + "index": { + keyPattern: indexJson + } + }); + console.log("Updated time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field"); + } + } + +} + function ttlIndexExists(collection) { return db[collection.name].getIndexes().find((idx) => idx.hasOwnProperty("expireAfterSeconds")) !== undefined; } @@ -333,3 +367,7 @@ function timeRsuIpIndexExists(collection){ function timeIndexExists(collection){ return db[collection.name].getIndexes().find((idx) => idx.name == collection.timeField + "_-1") !== undefined; } + +function geoSpatialIndexExists(collection){ + return db[collection.name].getIndexes().find((idx) => idx.name == collection.geoSpatialField + "_2dsphere_timeStamp_-1") !== undefined; +}