Skip to content

Commit

Permalink
misc improvements and cleanup of cursory processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Aug 1, 2024
1 parent 2956402 commit 2e68c61
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 27 deletions.
2 changes: 1 addition & 1 deletion scylla-server-rust/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ pub mod driver_controller;
pub mod location_controller;
pub mod node_controller;
pub mod run_controller;
pub mod system_controller;
pub mod send_config_controller;
pub mod system_controller;
6 changes: 4 additions & 2 deletions scylla-server-rust/src/controllers/send_config_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};

pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/";

#[derive(Deserialize, Debug)]
pub struct ConfigRequest {
pub data: Vec<f32>,
Expand All @@ -21,7 +23,7 @@ pub async fn send_config(
) -> Result<(), ScyllaError> {
info!(
"Sending car config with key: {}, and values: {:?}",
key, data_query.0
key, data_query.0.data
);
let Some(client) = client else {
return Err(ScyllaError::NotProd);
Expand All @@ -35,7 +37,7 @@ pub async fn send_config(

if let Err(err) = client
.publish(
format!("Calypso/Bidir/Command/{}", key),
format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
false,
bytes,
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ pub mod transformers;
#[allow(warnings)]
pub mod prisma;

pub mod serverdata;
pub mod command_data;
pub mod serverdata;

/// The type descriptor of the database passed to the middlelayer through axum state
pub type Database = std::sync::Arc<prisma::PrismaClient>;
5 changes: 4 additions & 1 deletion scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ async fn main() {
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
.route("/config/:key", post(send_config_controller::send_config).layer(Extension(client)))
.route(
"/config/:key",
post(send_config_controller::send_config).layer(Extension(client)),
)
// for CORS handling
.layer(
CorsLayer::new()
Expand Down
30 changes: 13 additions & 17 deletions scylla-server-rust/src/processors/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,26 @@ impl DbHandler {
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut data_queue: Receiver<Vec<ClientData>>,
mut batch_queue: Receiver<Vec<ClientData>>,
database: Database,
saturate_batches: bool,
cancel_token: CancellationToken,
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
loop {
info!("{} batches remaining!", data_queue.len());
if let Some(final_msgs) = data_queue.recv().await {
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
} else {
info!("No more messages to cleanup.");
break;
// cleanup all remaining messages if batches start backing up
while let Some(final_msgs) = batch_queue.recv().await {
info!("{} batches remaining!", batch_queue.len()+1);
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
}

}
break;
info!("No more messages to cleanup.");
break;
},
Some(msgs) = data_queue.recv() => {
Some(msgs) = batch_queue.recv() => {
if saturate_batches {
let shared_db = database.clone();
tokio::spawn(async move {
Expand All @@ -146,8 +142,8 @@ impl DbHandler {
}
debug!(
"DB send: {} of {}",
data_queue.len(),
data_queue.max_capacity()
batch_queue.len(),
batch_queue.max_capacity()
);
}
}
Expand Down
22 changes: 17 additions & 5 deletions scylla-server-rust/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use socketioxide::SocketIo;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, instrument, trace, warn, Level};

use crate::{serverdata, services::run_service};
use crate::{
controllers::send_config_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata,
services::run_service,
};

use super::ClientData;
use std::borrow::Cow;
Expand Down Expand Up @@ -161,14 +164,23 @@ impl MqttProcessor {
/// returns the ClientData, or the Err of something that can be debug printed
#[instrument(skip(self), level = Level::TRACE)]
fn parse_msg(&self, msg: Publish) -> Result<ClientData, impl fmt::Debug> {
let data = serverdata::ServerData::parse_from_bytes(&msg.payload)
.map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?;
let topic = std::str::from_utf8(&msg.topic)
.map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))?;

// ignore command messages, less confusing in logs than just failing to decode protobuf
if topic.starts_with(CALYPSO_BIDIR_CMD_PREFIX) {
return Err(format!("Skipping command message: {}", topic));
}

let split = std::str::from_utf8(&msg.topic)
.map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))?
let split = topic
.split_once('/')
.ok_or(&format!("Could not parse nesting: {:?}", msg.topic))?;

// look at data after topic as if we dont have a topic the protobuf is useless anyways
let data = serverdata::ServerData::parse_from_bytes(&msg.payload)
.map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?;

// get the node and datatype from the topic extracted at the beginning
let node = split.0;

let data_type = split.1.replace('/', "-");
Expand Down

0 comments on commit 2e68c61

Please sign in to comment.