From 2e68c6179ce81543e88c24a238e7bd77cafee74e Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 1 Aug 2024 19:10:17 -0400 Subject: [PATCH] misc improvements and cleanup of cursory processing --- scylla-server-rust/src/controllers/mod.rs | 2 +- .../src/controllers/send_config_controller.rs | 6 ++-- scylla-server-rust/src/lib.rs | 2 +- scylla-server-rust/src/main.rs | 5 +++- .../src/processors/db_handler.rs | 30 ++++++++----------- .../src/processors/mqtt_processor.rs | 22 ++++++++++---- 6 files changed, 40 insertions(+), 27 deletions(-) diff --git a/scylla-server-rust/src/controllers/mod.rs b/scylla-server-rust/src/controllers/mod.rs index ba50c11a..2417f623 100644 --- a/scylla-server-rust/src/controllers/mod.rs +++ b/scylla-server-rust/src/controllers/mod.rs @@ -4,5 +4,5 @@ pub mod driver_controller; pub mod location_controller; pub mod node_controller; pub mod run_controller; -pub mod system_controller; pub mod send_config_controller; +pub mod system_controller; diff --git a/scylla-server-rust/src/controllers/send_config_controller.rs b/scylla-server-rust/src/controllers/send_config_controller.rs index 4b1c0f67..8b07a405 100644 --- a/scylla-server-rust/src/controllers/send_config_controller.rs +++ b/scylla-server-rust/src/controllers/send_config_controller.rs @@ -9,6 +9,8 @@ use tracing::{info, warn}; use crate::{command_data::CommandData, error::ScyllaError}; +pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/"; + #[derive(Deserialize, Debug)] pub struct ConfigRequest { pub data: Vec, @@ -21,7 +23,7 @@ pub async fn send_config( ) -> Result<(), ScyllaError> { info!( "Sending car config with key: {}, and values: {:?}", - key, data_query.0 + key, data_query.0.data ); let Some(client) = client else { return Err(ScyllaError::NotProd); @@ -35,7 +37,7 @@ pub async fn send_config( if let Err(err) = client .publish( - format!("Calypso/Bidir/Command/{}", key), + format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key), rumqttc::v5::mqttbytes::QoS::ExactlyOnce, false, bytes, diff --git a/scylla-server-rust/src/lib.rs b/scylla-server-rust/src/lib.rs index 04385fe4..f3bd395c 100644 --- a/scylla-server-rust/src/lib.rs +++ b/scylla-server-rust/src/lib.rs @@ -8,8 +8,8 @@ pub mod transformers; #[allow(warnings)] pub mod prisma; -pub mod serverdata; pub mod command_data; +pub mod serverdata; /// The type descriptor of the database passed to the middlelayer through axum state pub type Database = std::sync::Arc; diff --git a/scylla-server-rust/src/main.rs b/scylla-server-rust/src/main.rs index f61d2112..136cd3d8 100755 --- a/scylla-server-rust/src/main.rs +++ b/scylla-server-rust/src/main.rs @@ -199,7 +199,10 @@ async fn main() { // SYSTEMS .route("/systems", get(system_controller::get_all_systems)) // CONFIG - .route("/config/:key", post(send_config_controller::send_config).layer(Extension(client))) + .route( + "/config/:key", + post(send_config_controller::send_config).layer(Extension(client)), + ) // for CORS handling .layer( CorsLayer::new() diff --git a/scylla-server-rust/src/processors/db_handler.rs b/scylla-server-rust/src/processors/db_handler.rs index c65f09b9..6541d24d 100644 --- a/scylla-server-rust/src/processors/db_handler.rs +++ b/scylla-server-rust/src/processors/db_handler.rs @@ -112,7 +112,7 @@ impl DbHandler { /// It uses the queue from data queue to insert to the database specified /// On cancellation, will await one final queue message to cleanup anything remaining in the channel pub async fn batching_loop( - mut data_queue: Receiver>, + mut batch_queue: Receiver>, database: Database, saturate_batches: bool, cancel_token: CancellationToken, @@ -120,22 +120,18 @@ impl DbHandler { loop { tokio::select! { _ = cancel_token.cancelled() => { - loop { - info!("{} batches remaining!", data_queue.len()); - if let Some(final_msgs) = data_queue.recv().await { - info!( - "A cleanup batch uploaded: {:?}", - data_service::add_many(&database, final_msgs).await - ); - } else { - info!("No more messages to cleanup."); - break; + // cleanup all remaining messages if batches start backing up + while let Some(final_msgs) = batch_queue.recv().await { + info!("{} batches remaining!", batch_queue.len()+1); + info!( + "A cleanup batch uploaded: {:?}", + data_service::add_many(&database, final_msgs).await + ); } - - } - break; + info!("No more messages to cleanup."); + break; }, - Some(msgs) = data_queue.recv() => { + Some(msgs) = batch_queue.recv() => { if saturate_batches { let shared_db = database.clone(); tokio::spawn(async move { @@ -146,8 +142,8 @@ impl DbHandler { } debug!( "DB send: {} of {}", - data_queue.len(), - data_queue.max_capacity() + batch_queue.len(), + batch_queue.max_capacity() ); } } diff --git a/scylla-server-rust/src/processors/mqtt_processor.rs b/scylla-server-rust/src/processors/mqtt_processor.rs index 670f5422..01ac3d7b 100644 --- a/scylla-server-rust/src/processors/mqtt_processor.rs +++ b/scylla-server-rust/src/processors/mqtt_processor.rs @@ -12,7 +12,10 @@ use socketioxide::SocketIo; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn, Level}; -use crate::{serverdata, services::run_service}; +use crate::{ + controllers::send_config_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, + services::run_service, +}; use super::ClientData; use std::borrow::Cow; @@ -161,14 +164,23 @@ impl MqttProcessor { /// returns the ClientData, or the Err of something that can be debug printed #[instrument(skip(self), level = Level::TRACE)] fn parse_msg(&self, msg: Publish) -> Result { - let data = serverdata::ServerData::parse_from_bytes(&msg.payload) - .map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?; + let topic = std::str::from_utf8(&msg.topic) + .map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))?; + + // ignore command messages, less confusing in logs than just failing to decode protobuf + if topic.starts_with(CALYPSO_BIDIR_CMD_PREFIX) { + return Err(format!("Skipping command message: {}", topic)); + } - let split = std::str::from_utf8(&msg.topic) - .map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))? + let split = topic .split_once('/') .ok_or(&format!("Could not parse nesting: {:?}", msg.topic))?; + // look at data after topic as if we dont have a topic the protobuf is useless anyways + let data = serverdata::ServerData::parse_from_bytes(&msg.payload) + .map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?; + + // get the node and datatype from the topic extracted at the beginning let node = split.0; let data_type = split.1.replace('/', "-");