Skip to content

Commit

Permalink
Scylla Calypso (#173)
Browse files Browse the repository at this point in the history
* basic proof of concept

* misc improvements and cleanup of cursory processing

* implement reids review suggestions

* try and fix clippy

* and again fix clippy

* make data query optional to reset calypso to default

if calypso receives a short or emtpy protobuf command_data, it will fill the extra values with the default value specified in the YAML. so we should allow the client to specify blank which could end up meaning a "Reset to Default" button.

* clippy is trolling me

* fixup error handling for plaintext error message

* implement reid fixes

* remove not prod error
  • Loading branch information
jr1221 authored Aug 11, 2024
1 parent 22d6146 commit f0c12af
Show file tree
Hide file tree
Showing 22 changed files with 243 additions and 81 deletions.
1 change: 1 addition & 0 deletions scylla-server-rust/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ prisma.rs

# protobuf
serverdata.rs
command_data.rs
37 changes: 37 additions & 0 deletions scylla-server-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scylla-server-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }

[features]
top = ["dep:console-subscriber"]
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ fn main() {
.includes(["src/proto"])
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
70 changes: 70 additions & 0 deletions scylla-server-rust/src/controllers/car_command_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::sync::Arc;

use axum::{extract::Path, Extension};
use axum_extra::extract::Query;
use protobuf::Message;
use rumqttc::v5::AsyncClient;
use serde::Deserialize;
use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};

/// the prefix for the calypso topic, so topic of cmd is this plus the key appended on
pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/";

#[derive(Deserialize, Debug)]
pub struct ConfigRequest {
pub data: Option<Vec<f32>>,
}

/// Sends a configuration to the car over MQTT
/// * `key` - The key of the configuration, as defined in the cangen YAML
/// * `data_query` - The data of the configuration, a URI query list of data=<f32>. If empty or too short, filled with cangen YAMl defaults
/// * `client` - The MQTT client to be used to send the data
///
/// More info: This follows the specification of sending a command_data object over siren to topic CALYPSO_BIDIR_CMD_PREFIX/<key>
pub async fn send_config_command(
Path(key): Path<String>,
Query(data_query): Query<ConfigRequest>,
Extension(client): Extension<Option<Arc<AsyncClient>>>,
) -> Result<(), ScyllaError> {
info!(
"Sending car config with key: {}, and values: {:?}",
key, data_query.data
);
// disable scylla if not prod, as there will be None mqtt client
let Some(client) = client else {
warn!("Cannot use config endpoint in dev mode!");
return Ok(())
};

// the protobuf calypso converts into CAN
let mut payload = CommandData::new();
// empty "data" in the protobuf tells calypso to use the default value
if let Some(data) = data_query.data {
payload.data = data;
}
let Ok(bytes) = payload.write_to_bytes() else {
return Err(ScyllaError::InvalidEncoding(
"Payload could not be written!".to_string(),
));
};

// publish the message to the topic that calypso's encoder is susbcribed to
if let Err(err) = client
.publish(
format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
false,
bytes,
)
.await
{
warn!("Could not publish instruction: {}", err);
return Err(ScyllaError::CommFailure(
"Siren publish for instruction failed!".to_string(),
));
}

Ok(())
}
1 change: 1 addition & 0 deletions scylla-server-rust/src/controllers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod car_command_controller;
pub mod data_controller;
pub mod data_type_controller;
pub mod driver_controller;
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn get_run_by_id(
let run_data = run_service::get_run_by_id(&db, run_id).await?;

if run_data.is_none() {
return Err(ScyllaError::NotFound);
return Err(ScyllaError::EmptyResult);
}

let run_data_safe = run_data.unwrap();
Expand Down
43 changes: 30 additions & 13 deletions scylla-server-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,48 @@ use prisma_client_rust::{
use tracing::warn;

pub enum ScyllaError {
/// Any prisma query which errors out
PrismaError(QueryError),
NotFound,
/// An instruction was not encodable
InvalidEncoding(String),
/// Could not communicate to car
CommFailure(String),
/// A query turned up empty that should not have
EmptyResult,
}

impl From<QueryError> for ScyllaError {
fn from(error: QueryError) -> Self {
warn!("Query error: {:?}", error);
match error {
e if e.is_prisma_error::<RecordNotFound>() => ScyllaError::NotFound,
e => ScyllaError::PrismaError(e),
}
ScyllaError::PrismaError(error)
}
}

// This centralizes all different errors from our app in one place
impl IntoResponse for ScyllaError {
fn into_response(self) -> Response {
let status = match self {
ScyllaError::PrismaError(error) if error.is_prisma_error::<UniqueKeyViolation>() => {
StatusCode::CONFLICT
}
ScyllaError::PrismaError(_) => StatusCode::BAD_REQUEST,
ScyllaError::NotFound => StatusCode::NOT_FOUND,
let (status, reason) = match self {
ScyllaError::PrismaError(error) if error.is_prisma_error::<UniqueKeyViolation>() => (
StatusCode::CONFLICT,
format!("Unique Key Violation: {}", error),
),
ScyllaError::PrismaError(error) if error.is_prisma_error::<RecordNotFound>() => (
StatusCode::NOT_FOUND,
format!("Record Not Found: {}", error),
),
ScyllaError::PrismaError(error) => (
StatusCode::BAD_REQUEST,
format!("Misc query error: {}", error),
),
ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason),
ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason),
ScyllaError::EmptyResult => (
StatusCode::NOT_FOUND,
"Fetched an empty result that should not be!".to_string(),
),
};

status.into_response()
warn!("Routing error: {}: {}", status, reason);

(status, reason).into_response()
}
}
1 change: 1 addition & 0 deletions scylla-server-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod transformers;
#[allow(warnings)]
pub mod prisma;

pub mod command_data;
pub mod serverdata;

/// The type descriptor of the database passed to the middlelayer through axum state
Expand Down
31 changes: 24 additions & 7 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use axum::{
};
use clap::Parser;
use prisma_client_rust::chrono;
use rumqttc::v5::AsyncClient;
use scylla_server_rust::{
controllers::{
self, data_type_controller, driver_controller, location_controller, node_controller,
self,
car_command_controller::{self},
data_type_controller, driver_controller, location_controller, node_controller,
run_controller, system_controller,
},
prisma::PrismaClient,
Expand Down Expand Up @@ -55,7 +58,12 @@ struct ScyllaArgs {
siren_host_url: String,

/// The time, in seconds between collection for a batch upsert
#[arg(short = 't', long, env = "SCYLLA_BATCH_UPSERT_TIME", default_value = "10")]
#[arg(
short = 't',
long,
env = "SCYLLA_BATCH_UPSERT_TIME",
default_value = "10"
)]
batch_upsert_time: u64,
}

Expand Down Expand Up @@ -138,11 +146,12 @@ async fn main() {
token.clone(),
));

// if PROD_SCYLLA=false
if !cli.prod {
// if PROD_SCYLLA=false, also procur a client for use in the config state
let client: Option<Arc<AsyncClient>> = if !cli.prod {
info!("Running processor in mock mode, no data will be stored");
let recv = MockProcessor::new(io);
tokio::spawn(recv.generate_mock());
None
} else {
// creates the initial run
let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis())
Expand All @@ -153,15 +162,18 @@ async fn main() {
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let recv = MqttProcessor::new(
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
cli.siren_host_url,
curr_run.id,
io,
);
tokio::spawn(recv.process_mqtt());
}
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
tokio::spawn(recv.process_mqtt(client_sharable.clone(), eventloop));
Some(client_sharable)
};

let app = Router::new()
// DATA ROUTES
Expand All @@ -186,6 +198,11 @@ async fn main() {
)
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
.route(
"/config/set/:configKey",
post(car_command_controller::send_config_command).layer(Extension(client)),
)
// for CORS handling
.layer(
CorsLayer::new()
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @generated

pub mod command_data;
pub mod serverdata;
30 changes: 13 additions & 17 deletions scylla-server-rust/src/processors/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,26 @@ impl DbHandler {
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut data_queue: Receiver<Vec<ClientData>>,
mut batch_queue: Receiver<Vec<ClientData>>,
database: Database,
saturate_batches: bool,
cancel_token: CancellationToken,
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
loop {
info!("{} batches remaining!", data_queue.len());
if let Some(final_msgs) = data_queue.recv().await {
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
} else {
info!("No more messages to cleanup.");
break;
// cleanup all remaining messages if batches start backing up
while let Some(final_msgs) = batch_queue.recv().await {
info!("{} batches remaining!", batch_queue.len()+1);
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
}

}
break;
info!("No more messages to cleanup.");
break;
},
Some(msgs) = data_queue.recv() => {
Some(msgs) = batch_queue.recv() => {
if saturate_batches {
let shared_db = database.clone();
tokio::spawn(async move {
Expand All @@ -146,8 +142,8 @@ impl DbHandler {
}
debug!(
"DB send: {} of {}",
data_queue.len(),
data_queue.max_capacity()
batch_queue.len(),
batch_queue.max_capacity()
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod mqtt_processor;
/// This has the dual purposes of
/// * - representing the packet sent over the socket for live data
/// * - representing the struct for the service layer to unpack for insertion
/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet
/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet
#[derive(serde::Serialize, Clone, Debug)]
pub struct ClientData {
pub run_id: i32,
Expand Down
Loading

0 comments on commit f0c12af

Please sign in to comment.