Skip to content

Commit

Permalink
basic proof of concept
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Aug 1, 2024
1 parent 22d6146 commit 2956402
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 28 deletions.
1 change: 1 addition & 0 deletions scylla-server-rust/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ prisma.rs

# protobuf
serverdata.rs
command_data.rs
37 changes: 37 additions & 0 deletions scylla-server-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scylla-server-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rand = "0.8.5"
console-subscriber = { version = "0.3.0", optional = true }
ringbuffer = "0.15.0"
clap = { version = "4.5.11", features = ["derive", "env"] }
axum-extra = { version = "0.9.3", features = ["query"] }

[features]
top = ["dep:console-subscriber"]
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ fn main() {
.includes(["src/proto"])
// Inputs must reside in some of include paths.
.input("src/proto/serverdata.proto")
.input("src/proto/command_data.proto")
// Specify output directory relative to Cargo output directory.
.out_dir("src")
.run_from_script();
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub mod location_controller;
pub mod node_controller;
pub mod run_controller;
pub mod system_controller;
pub mod send_config_controller;
49 changes: 49 additions & 0 deletions scylla-server-rust/src/controllers/send_config_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::sync::Arc;

use axum::{extract::Path, Extension};
use axum_extra::extract::Query;
use protobuf::Message;
use rumqttc::v5::AsyncClient;
use serde::Deserialize;
use tracing::{info, warn};

use crate::{command_data::CommandData, error::ScyllaError};

#[derive(Deserialize, Debug)]
pub struct ConfigRequest {
pub data: Vec<f32>,
}

pub async fn send_config(
Path(key): Path<String>,
data_query: Query<ConfigRequest>,
Extension(client): Extension<Option<Arc<AsyncClient>>>,
) -> Result<(), ScyllaError> {
info!(
"Sending car config with key: {}, and values: {:?}",
key, data_query.0
);
let Some(client) = client else {
return Err(ScyllaError::NotProd);
};

let mut payload = CommandData::new();
payload.data = data_query.0.data;
let Ok(bytes) = payload.write_to_bytes() else {
return Err(ScyllaError::ImpossibleEncoding);
};

if let Err(err) = client
.publish(
format!("Calypso/Bidir/Command/{}", key),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
false,
bytes,
)
.await
{
warn!("Could not publish instruction: {}", err);
return Err(ScyllaError::CommFailure);
}
Ok(())
}
10 changes: 10 additions & 0 deletions scylla-server-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ use tracing::warn;

pub enum ScyllaError {
PrismaError(QueryError),
/// A generic not found for a prisma query
NotFound,
/// Not available in mock mode, which the system is in
NotProd,
/// An instruction was not encodable
ImpossibleEncoding,
/// Could not communicate to car
CommFailure,
}

impl From<QueryError> for ScyllaError {
Expand All @@ -32,6 +39,9 @@ impl IntoResponse for ScyllaError {
}
ScyllaError::PrismaError(_) => StatusCode::BAD_REQUEST,
ScyllaError::NotFound => StatusCode::NOT_FOUND,
ScyllaError::NotProd => StatusCode::SERVICE_UNAVAILABLE,
ScyllaError::ImpossibleEncoding => StatusCode::UNPROCESSABLE_ENTITY,
ScyllaError::CommFailure => StatusCode::BAD_GATEWAY,
};

status.into_response()
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod transformers;
pub mod prisma;

pub mod serverdata;
pub mod command_data;

/// The type descriptor of the database passed to the middlelayer through axum state
pub type Database = std::sync::Arc<prisma::PrismaClient>;
28 changes: 21 additions & 7 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use axum::{
};
use clap::Parser;
use prisma_client_rust::chrono;
use rumqttc::v5::AsyncClient;
use scylla_server_rust::{
controllers::{
self, data_type_controller, driver_controller, location_controller, node_controller,
run_controller, system_controller,
run_controller,
send_config_controller::{self},
system_controller,
},
prisma::PrismaClient,
processors::{
Expand Down Expand Up @@ -55,7 +58,12 @@ struct ScyllaArgs {
siren_host_url: String,

/// The time, in seconds between collection for a batch upsert
#[arg(short = 't', long, env = "SCYLLA_BATCH_UPSERT_TIME", default_value = "10")]
#[arg(
short = 't',
long,
env = "SCYLLA_BATCH_UPSERT_TIME",
default_value = "10"
)]
batch_upsert_time: u64,
}

Expand Down Expand Up @@ -138,11 +146,12 @@ async fn main() {
token.clone(),
));

// if PROD_SCYLLA=false
if !cli.prod {
// if PROD_SCYLLA=false, also procur a client for use in the config state
let client: Option<Arc<AsyncClient>> = if !cli.prod {
info!("Running processor in mock mode, no data will be stored");
let recv = MockProcessor::new(io);
tokio::spawn(recv.generate_mock());
None
} else {
// creates the initial run
let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis())
Expand All @@ -153,15 +162,18 @@ async fn main() {
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let recv = MqttProcessor::new(
let (recv, opts) = MqttProcessor::new(
mqtt_send,
new_run_receive,
cli.siren_host_url,
curr_run.id,
io,
);
tokio::spawn(recv.process_mqtt());
}
let (client, eventloop) = AsyncClient::new(opts, 600);
let client_sharable: Arc<AsyncClient> = Arc::new(client);
tokio::spawn(recv.process_mqtt(client_sharable.clone(), eventloop));
Some(client_sharable)
};

let app = Router::new()
// DATA ROUTES
Expand All @@ -186,6 +198,8 @@ async fn main() {
)
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// CONFIG
.route("/config/:key", post(send_config_controller::send_config).layer(Extension(client)))
// for CORS handling
.layer(
CorsLayer::new()
Expand Down
1 change: 1 addition & 0 deletions scylla-server-rust/src/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @generated

pub mod command_data;
pub mod serverdata;
42 changes: 21 additions & 21 deletions scylla-server-rust/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use core::fmt;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json};
use protobuf::Message;
use ringbuffer::RingBuffer;
use rumqttc::v5::{
mqttbytes::v5::{LastWill, Packet, Publish},
AsyncClient, Event, MqttOptions,
AsyncClient, Event, EventLoop, MqttOptions,
};
use socketioxide::SocketIo;
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -22,7 +22,6 @@ pub struct MqttProcessor {
new_run_channel: Receiver<run_service::public_run::Data>,
curr_run: i32,
io: SocketIo,
mqtt_ops: MqttOptions,
}

impl MqttProcessor {
Expand All @@ -32,16 +31,16 @@ impl MqttProcessor {
/// * `db` - The database to store the data in
/// * `io` - The socketio layer to send the data to
///
/// Returns the instance and the event loop, which can be passed into the process_mqtt func to begin recieiving
/// Returns the instance and options to create a client, which is then used in the process_mqtt loop
pub fn new(
channel: Sender<ClientData>,
new_run_channel: Receiver<run_service::public_run::Data>,
mqtt_path: String,
initial_run: i32,
io: SocketIo,
) -> MqttProcessor {
) -> (MqttProcessor, MqttOptions) {
// create the mqtt client and configure it
let mut create_opts = MqttOptions::new(
let mut mqtt_opts = MqttOptions::new(
"ScyllaServer",
mqtt_path.split_once(':').expect("Invalid Siren URL").0,
mqtt_path
Expand All @@ -51,7 +50,7 @@ impl MqttProcessor {
.parse::<u16>()
.expect("Invalid Siren port"),
);
create_opts
mqtt_opts
.set_last_will(LastWill::new(
"Scylla/Status",
"Scylla has disconnected!",
Expand All @@ -65,27 +64,28 @@ impl MqttProcessor {
.set_session_expiry_interval(Some(u32::MAX))
.set_topic_alias_max(Some(600));

MqttProcessor {
channel,
new_run_channel,
curr_run: initial_run,
io,
mqtt_ops: create_opts,
}
// TODO mess with incoming message cap if db, etc. cannot keep up

(
MqttProcessor {
channel,
new_run_channel,
curr_run: initial_run,
io,
},
mqtt_opts,
)
}

/// This handles the reception of mqtt messages, will not return
/// * `connect` - The eventloop returned by ::new to connect to
pub async fn process_mqtt(mut self) {
/// * `eventloop` - The eventloop returned by ::new to connect to. The loop isnt sync so this is the best that can be done
/// * `client` - The async mqttt v5 client to use for subscriptions
pub async fn process_mqtt(mut self, client: Arc<AsyncClient>, mut eventloop: EventLoop) {
let mut view_interval = tokio::time::interval(Duration::from_secs(3));

let mut latency_interval = tokio::time::interval(Duration::from_millis(250));
let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::<i64>::new(20);

// process over messages, non blocking
// TODO mess with incoming message cap if db, etc. cannot keep up
let (client, mut connect) = AsyncClient::new(self.mqtt_ops.clone(), 600);

debug!("Subscribing to siren");
client
.subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce)
Expand All @@ -95,7 +95,7 @@ impl MqttProcessor {
loop {
#[rustfmt::skip] // rust cannot format this macro for some reason
tokio::select! {
msg = connect.poll() => match msg {
msg = eventloop.poll() => match msg {
Ok(Event::Incoming(Packet::Publish(msg))) => {
trace!("Received mqtt message: {:?}", msg);
// parse the message into the data and the node name it falls under
Expand Down
7 changes: 7 additions & 0 deletions scylla-server-rust/src/proto/command_data.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto3";

package command_data.v1;

message CommandData {
repeated float data = 1;
}

0 comments on commit 2956402

Please sign in to comment.