Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scylla Calypso #173

Merged
merged 10 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scylla-server-rust/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ prisma.rs

# protobuf
serverdata.rs
command_data.rs
37 changes: 37 additions & 0 deletions scylla-server-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scylla-server-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }

[features]
top = ["dep:console-subscriber"]
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ fn main() {
.includes(["src/proto"])
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod driver_controller;
pub mod location_controller;
pub mod node_controller;
pub mod run_controller;
pub mod send_config_controller;
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
pub mod system_controller;
51 changes: 51 additions & 0 deletions scylla-server-rust/src/controllers/send_config_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::sync::Arc;

use axum::{extract::Path, Extension};
use axum_extra::extract::Query;
use protobuf::Message;
use rumqttc::v5::AsyncClient;
use serde::Deserialize;
use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};

pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/";

#[derive(Deserialize, Debug)]
pub struct ConfigRequest {
pub data: Vec<f32>,
}

pub async fn send_config(
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
Path(key): Path<String>,
data_query: Query<ConfigRequest>,
Extension(client): Extension<Option<Arc<AsyncClient>>>,
) -> Result<(), ScyllaError> {
info!(
"Sending car config with key: {}, and values: {:?}",
key, data_query.0.data
);
let Some(client) = client else {
return Err(ScyllaError::NotProd);
};

let mut payload = CommandData::new();
payload.data = data_query.0.data;
let Ok(bytes) = payload.write_to_bytes() else {
return Err(ScyllaError::ImpossibleEncoding);
};

if let Err(err) = client
.publish(
format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
false,
bytes,
)
.await
{
warn!("Could not publish instruction: {}", err);
return Err(ScyllaError::CommFailure);
}
Ok(())
}
10 changes: 10 additions & 0 deletions scylla-server-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ use tracing::warn;

pub enum ScyllaError {
PrismaError(QueryError),
/// A generic not found for a prisma query
NotFound,
/// Not available in mock mode, which the system is in
NotProd,
/// An instruction was not encodable
ImpossibleEncoding,
/// Could not communicate to car
CommFailure,
}

impl From<QueryError> for ScyllaError {
Expand All @@ -32,6 +39,9 @@ impl IntoResponse for ScyllaError {
}
ScyllaError::PrismaError(_) => StatusCode::BAD_REQUEST,
ScyllaError::NotFound => StatusCode::NOT_FOUND,
ScyllaError::NotProd => StatusCode::SERVICE_UNAVAILABLE,
ScyllaError::ImpossibleEncoding => StatusCode::UNPROCESSABLE_ENTITY,
ScyllaError::CommFailure => StatusCode::BAD_GATEWAY,
};

status.into_response()
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod transformers;
#[allow(warnings)]
pub mod prisma;

pub mod command_data;
pub mod serverdata;

/// The type descriptor of the database passed to the middlelayer through axum state
Expand Down
31 changes: 24 additions & 7 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use axum::{
};
use clap::Parser;
use prisma_client_rust::chrono;
use rumqttc::v5::AsyncClient;
use scylla_server_rust::{
controllers::{
self, data_type_controller, driver_controller, location_controller, node_controller,
run_controller, system_controller,
run_controller,
send_config_controller::{self},
system_controller,
},
prisma::PrismaClient,
processors::{
Expand Down Expand Up @@ -55,7 +58,12 @@ struct ScyllaArgs {
siren_host_url: String,

/// The time, in seconds between collection for a batch upsert
#[arg(short = 't', long, env = "SCYLLA_BATCH_UPSERT_TIME", default_value = "10")]
#[arg(
short = 't',
long,
env = "SCYLLA_BATCH_UPSERT_TIME",
default_value = "10"
)]
batch_upsert_time: u64,
}

Expand Down Expand Up @@ -138,11 +146,12 @@ async fn main() {
token.clone(),
));

// if PROD_SCYLLA=false
if !cli.prod {
// if PROD_SCYLLA=false, also procur a client for use in the config state
let client: Option<Arc<AsyncClient>> = if !cli.prod {
info!("Running processor in mock mode, no data will be stored");
let recv = MockProcessor::new(io);
tokio::spawn(recv.generate_mock());
None
} else {
// creates the initial run
let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis())
Expand All @@ -153,15 +162,18 @@ async fn main() {
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let recv = MqttProcessor::new(
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
cli.siren_host_url,
curr_run.id,
io,
);
tokio::spawn(recv.process_mqtt());
}
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
tokio::spawn(recv.process_mqtt(client_sharable.clone(), eventloop));
Some(client_sharable)
};

let app = Router::new()
// DATA ROUTES
Expand All @@ -186,6 +198,11 @@ async fn main() {
)
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
.route(
"/config/:key",
post(send_config_controller::send_config).layer(Extension(client)),
jr1221 marked this conversation as resolved.
Show resolved Hide resolved
)
// for CORS handling
.layer(
CorsLayer::new()
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @generated

pub mod command_data;
pub mod serverdata;
30 changes: 13 additions & 17 deletions scylla-server-rust/src/processors/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,26 @@ impl DbHandler {
/// It uses the queue from data queue to insert to the database specified
/// On cancellation, will await one final queue message to cleanup anything remaining in the channel
pub async fn batching_loop(
mut data_queue: Receiver<Vec<ClientData>>,
mut batch_queue: Receiver<Vec<ClientData>>,
database: Database,
saturate_batches: bool,
cancel_token: CancellationToken,
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
loop {
info!("{} batches remaining!", data_queue.len());
if let Some(final_msgs) = data_queue.recv().await {
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
} else {
info!("No more messages to cleanup.");
break;
// cleanup all remaining messages if batches start backing up
while let Some(final_msgs) = batch_queue.recv().await {
info!("{} batches remaining!", batch_queue.len()+1);
info!(
"A cleanup batch uploaded: {:?}",
data_service::add_many(&database, final_msgs).await
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this code doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code batch uploads all remaining batches before exiting, such that scylla will not exit until all batch uploads are complete.

basically I noticed I used a loop and the proper rust syntax is a while and it annoyed me so much I switched it in this PR even though this PR is unrelated. my b.

}

}
break;
info!("No more messages to cleanup.");
break;
},
Some(msgs) = data_queue.recv() => {
Some(msgs) = batch_queue.recv() => {
if saturate_batches {
let shared_db = database.clone();
tokio::spawn(async move {
Expand All @@ -146,8 +142,8 @@ impl DbHandler {
}
debug!(
"DB send: {} of {}",
data_queue.len(),
data_queue.max_capacity()
batch_queue.len(),
batch_queue.max_capacity()
);
}
}
Expand Down
Loading
Loading