Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scylla Calypso #173

Merged
merged 10 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scylla-server-rust/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ prisma.rs

# protobuf
serverdata.rs
command_data.rs
37 changes: 37 additions & 0 deletions scylla-server-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scylla-server-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }

[features]
top = ["dep:console-subscriber"]
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ fn main() {
.includes(["src/proto"])
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
70 changes: 70 additions & 0 deletions scylla-server-rust/src/controllers/car_command_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::sync::Arc;

use axum::{extract::Path, Extension};
use axum_extra::extract::Query;
use protobuf::Message;
use rumqttc::v5::AsyncClient;
use serde::Deserialize;
use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};

/// the prefix for the calypso topic, so topic of cmd is this plus the key appended on
pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/";

#[derive(Deserialize, Debug)]
pub struct ConfigRequest {
pub data: Option<Vec<f32>>,
}

/// Sends a configuration to the car over MQTT
/// * `key` - The key of the configuration, as defined in the cangen YAML
/// * `data_query` - The data of the configuration, a URI query list of data=<f32>. If empty or too short, filled with cangen YAMl defaults
/// * `client` - The MQTT client to be used to send the data
///
/// More info: This follows the specification of sending a command_data object over siren to topic CALYPSO_BIDIR_CMD_PREFIX/<key>
pub async fn send_config_command(
Path(key): Path<String>,
Query(data_query): Query<ConfigRequest>,
Extension(client): Extension<Option<Arc<AsyncClient>>>,
) -> Result<(), ScyllaError> {
info!(
"Sending car config with key: {}, and values: {:?}",
key, data_query.data
);
// disable scylla if not prod, as there will be None mqtt client
let Some(client) = client else {
warn!("Cannot use config endpoint in dev mode!");
return Ok(())
};

// the protobuf calypso converts into CAN
let mut payload = CommandData::new();
// empty "data" in the protobuf tells calypso to use the default value
if let Some(data) = data_query.data {
payload.data = data;
}
let Ok(bytes) = payload.write_to_bytes() else {
return Err(ScyllaError::InvalidEncoding(
"Payload could not be written!".to_string(),
));
};

// publish the message to the topic that calypso's encoder is susbcribed to
if let Err(err) = client
.publish(
format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
false,
bytes,
)
.await
{
warn!("Could not publish instruction: {}", err);
return Err(ScyllaError::CommFailure(
"Siren publish for instruction failed!".to_string(),
));
}

Ok(())
}
1 change: 1 addition & 0 deletions scylla-server-rust/src/controllers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod car_command_controller;
pub mod data_controller;
pub mod data_type_controller;
pub mod driver_controller;
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn get_run_by_id(
let run_data = run_service::get_run_by_id(&db, run_id).await?;

if run_data.is_none() {
return Err(ScyllaError::NotFound);
return Err(ScyllaError::EmptyResult);
RChandler234 marked this conversation as resolved.
Show resolved Hide resolved
}

let run_data_safe = run_data.unwrap();
Expand Down
43 changes: 30 additions & 13 deletions scylla-server-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,48 @@ use prisma_client_rust::{
use tracing::warn;

pub enum ScyllaError {
/// Any prisma query which errors out
PrismaError(QueryError),
NotFound,
/// An instruction was not encodable
InvalidEncoding(String),
/// Could not communicate to car
CommFailure(String),
/// A query turned up empty that should not have
EmptyResult,
RChandler234 marked this conversation as resolved.
Show resolved Hide resolved
}

impl From<QueryError> for ScyllaError {
fn from(error: QueryError) -> Self {
warn!("Query error: {:?}", error);
match error {
e if e.is_prisma_error::<RecordNotFound>() => ScyllaError::NotFound,
e => ScyllaError::PrismaError(e),
}
ScyllaError::PrismaError(error)
}
}

// This centralizes all different errors from our app in one place
impl IntoResponse for ScyllaError {
fn into_response(self) -> Response {
let status = match self {
ScyllaError::PrismaError(error) if error.is_prisma_error::<UniqueKeyViolation>() => {
StatusCode::CONFLICT
}
ScyllaError::PrismaError(_) => StatusCode::BAD_REQUEST,
ScyllaError::NotFound => StatusCode::NOT_FOUND,
let (status, reason) = match self {
ScyllaError::PrismaError(error) if error.is_prisma_error::<UniqueKeyViolation>() => (
StatusCode::CONFLICT,
format!("Unique Key Violation: {}", error),
),
ScyllaError::PrismaError(error) if error.is_prisma_error::<RecordNotFound>() => (
StatusCode::NOT_FOUND,
format!("Record Not Found: {}", error),
),
ScyllaError::PrismaError(error) => (
StatusCode::BAD_REQUEST,
format!("Misc query error: {}", error),
),
ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason),
ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason),
ScyllaError::EmptyResult => (
StatusCode::NOT_FOUND,
"Fetched an empty result that should not be!".to_string(),
),
};

status.into_response()
warn!("Routing error: {}: {}", status, reason);

(status, reason).into_response()
}
}
1 change: 1 addition & 0 deletions scylla-server-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod transformers;
#[allow(warnings)]
pub mod prisma;

pub mod command_data;
pub mod serverdata;

/// The type descriptor of the database passed to the middlelayer through axum state
Expand Down
31 changes: 24 additions & 7 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use axum::{
};
use clap::Parser;
use prisma_client_rust::chrono;
use rumqttc::v5::AsyncClient;
use scylla_server_rust::{
controllers::{
self, data_type_controller, driver_controller, location_controller, node_controller,
self,
car_command_controller::{self},
data_type_controller, driver_controller, location_controller, node_controller,
run_controller, system_controller,
},
prisma::PrismaClient,
Expand Down Expand Up @@ -55,7 +58,12 @@ struct ScyllaArgs {
siren_host_url: String,

/// The time, in seconds between collection for a batch upsert
#[arg(short = 't', long, env = "SCYLLA_BATCH_UPSERT_TIME", default_value = "10")]
#[arg(
short = 't',
long,
env = "SCYLLA_BATCH_UPSERT_TIME",
default_value = "10"
)]
batch_upsert_time: u64,
}

Expand Down Expand Up @@ -138,11 +146,12 @@ async fn main() {
token.clone(),
));

// if PROD_SCYLLA=false
if !cli.prod {
// if PROD_SCYLLA=false, also procur a client for use in the config state
let client: Option<Arc<AsyncClient>> = if !cli.prod {
info!("Running processor in mock mode, no data will be stored");
let recv = MockProcessor::new(io);
tokio::spawn(recv.generate_mock());
None
} else {
// creates the initial run
let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis())
Expand All @@ -153,15 +162,18 @@ async fn main() {
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let recv = MqttProcessor::new(
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
cli.siren_host_url,
curr_run.id,
io,
);
tokio::spawn(recv.process_mqtt());
}
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
tokio::spawn(recv.process_mqtt(client_sharable.clone(), eventloop));
Some(client_sharable)
};

let app = Router::new()
// DATA ROUTES
Expand All @@ -186,6 +198,11 @@ async fn main() {
)
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
.route(
"/config/set/:configKey",
post(car_command_controller::send_config_command).layer(Extension(client)),
)
// for CORS handling
.layer(
CorsLayer::new()
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @generated

pub mod command_data;
pub mod serverdata;
30 changes: 13 additions & 17 deletions scylla-server-rust/src/processors/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,26 @@ impl DbHandler {
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut data_queue: Receiver<Vec<ClientData>>,
mut batch_queue: Receiver<Vec<ClientData>>,
database: Database,
saturate_batches: bool,
cancel_token: CancellationToken,
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
loop {
info!("{} batches remaining!", data_queue.len());
if let Some(final_msgs) = data_queue.recv().await {
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
} else {
info!("No more messages to cleanup.");
break;
// cleanup all remaining messages if batches start backing up
while let Some(final_msgs) = batch_queue.recv().await {
info!("{} batches remaining!", batch_queue.len()+1);
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this code doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code batch uploads all remaining batches before exiting, such that scylla will not exit until all batch uploads are complete.

basically I noticed I used a loop and the proper rust syntax is a while and it annoyed me so much I switched it in this PR even though this PR is unrelated. my b.

}

}
break;
info!("No more messages to cleanup.");
break;
},
Some(msgs) = data_queue.recv() => {
Some(msgs) = batch_queue.recv() => {
if saturate_batches {
let shared_db = database.clone();
tokio::spawn(async move {
Expand All @@ -146,8 +142,8 @@ impl DbHandler {
}
debug!(
"DB send: {} of {}",
data_queue.len(),
data_queue.max_capacity()
batch_queue.len(),
batch_queue.max_capacity()
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod mqtt_processor;
/// This has the dual purposes of
/// * - representing the packet sent over the socket for live data
/// * - representing the struct for the service layer to unpack for insertion
/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet
/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet
#[derive(serde::Serialize, Clone, Debug)]
pub struct ClientData {
pub run_id: i32,
Expand Down
Loading
Loading