Skip to content

Commit

Permalink
use processor instead of receiver, other cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Jul 17, 2024
1 parent ef3fe7d commit 59e493d
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 60 deletions.
15 changes: 9 additions & 6 deletions scylla-server-rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ Have an async function that takes time and is somewhat important for performance

### Deploy this app

Use the docker compose above to build & deploy. Note the CI prebuilds arm64 and amd64 images upon request in the actions tab of this repository's github page.
```
docker compose build
docker compose up # use -d to fork to background
```
A database migration is triggered upon every bootup.
See main README.


#### Env variables

- `SOURCE_DATABASE_URL` The timescale URL
- `PROD_SCYLLA` false=use mock instead of production (mqtt) as source of data
- `RUST_LOG=none,scylla_server_rust` levels of logging for this create, see above
- `PROD_SIREN_HOST_URL` URL:Port of the MQTT server, using when `PROD_SCYLLA=/false`
2 changes: 1 addition & 1 deletion scylla-server-rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod controllers;
pub mod error;
pub mod reciever;
pub mod processors;
pub mod services;
pub mod transformers;

Expand Down
34 changes: 17 additions & 17 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use scylla_server_rust::{
run_controller, system_controller,
},
prisma::PrismaClient,
reciever::{db_handler, mock_reciever::MockReciever, mqtt_reciever::MqttReciever, ClientData},
processors::{
db_handler, mock_processor::MockProcessor, mqtt_processor::MqttProcessor, ClientData,
},
Database,
};
use socketioxide::{extract::SocketRef, SocketIo};
Expand Down Expand Up @@ -77,43 +79,41 @@ async fn main() {

// if PROD_SCYLLA=false
if std::env::var("PROD_SCYLLA").is_ok_and(|f| f == "false") {
info!("Running reciever in mock mode, no data will be stored");
let recv = MockReciever::new(io);
info!("Running processor in mock mode, no data will be stored");
let recv = MockProcessor::new(io);
tokio::spawn(recv.generate_mock());
} else {
// run prod if this isnt present
// create and spawn the mqtt reciever
info!("Running reciever in MQTT (production) mode");
let (recv, eloop) = MqttReciever::new(
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let (recv, eloop) = MqttProcessor::new(
tx,
std::env::var("PROD_SIREN_HOST_URL").unwrap_or("localhost:1883".to_string()),
db.clone(),
io,
)
.await;
tokio::spawn(recv.recieve_mqtt(eloop));
tokio::spawn(recv.process_mqtt(eloop));
}

let app = Router::new()
// get all data with the name dataTypeName and runID as specified
// DATA ROUTES
.route(
"/data/:dataTypeName/:runId",
get(controllers::data_controller::get_data),
)
// get all datatypes
// DATA TYPE ROUTES
.route("/datatypes", get(data_type_controller::get_all_data_types))
// get all drivers
// DRIVERS
.route("/drivers", get(driver_controller::get_all_drivers))
// get all locations
// LOCATIONS
.route("/locations", get(location_controller::get_all_locations))
// get all nodes
// NODES
.route("/nodes", get(node_controller::get_all_nodes))
// runs:
// get all runs
// RUNS
.route("/runs", get(run_controller::get_all_runs))
// get run with id
.route("/runs/:id", get(run_controller::get_run_by_id))
// get all systems
// SYSTEMS
.route("/systems", get(system_controller::get_all_systems))
// for CORS handling
.layer(
Expand Down Expand Up @@ -153,7 +153,7 @@ async fn main() {
signal::ctrl_c()
.await
.expect("Could not read cancellation trigger (ctr+c)");
info!("Recieved exit signal, shutting down!");
info!("Received exit signal, shutting down!");
token.cancel();
task_tracker.wait().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ pub struct DbHandler {
/// The list of data types seen by this instance, used for when to upsert
datatype_list: Vec<String>,
/// The broadcast channel which provides serial datapoints for processing
reciever: Receiver<ClientData>,
receiver: Receiver<ClientData>,
/// The database
db: Database,
/// An internal state of an in progress location packet
loc_lock: LocLock,
location_lock: LocLock,
/// Whether the location has been modified this loop
is_loc: bool,
is_location: bool,
/// the queue of data
data_queue: Vec<ClientData>,
/// the time since last batch
Expand All @@ -95,14 +95,14 @@ pub struct DbHandler {
impl DbHandler {
/// Make a new db handler
/// * `recv` - the broadcast reciver of which clientdata will be sent
pub fn new(reciever: Receiver<ClientData>, db: Database) -> DbHandler {
pub fn new(receiver: Receiver<ClientData>, db: Database) -> DbHandler {
DbHandler {
node_list: vec![],
datatype_list: vec![],
reciever,
receiver,
db,
loc_lock: LocLock::new(),
is_loc: false,
location_lock: LocLock::new(),
is_location: false,
data_queue: vec![],
last_time: tokio::time::Instant::now(),
}
Expand Down Expand Up @@ -153,7 +153,7 @@ impl DbHandler {
/// If the data is special, i.e. coordinates, driver, etc. it will store it in its special location of the db immediately
/// For all data points it will add the to the data_channel for batch uploading logic when a certain time has elapsed
/// Before this time the data is stored in an internal queue.
/// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages recieved before cancellation
/// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation
pub async fn handling_loop(
mut self,
data_channel: Sender<Vec<ClientData>>,
Expand All @@ -167,7 +167,7 @@ impl DbHandler {
self.data_queue.clear();
break;
},
Some(msg) = self.reciever.recv() => {
Some(msg) = self.receiver.recv() => {
self.handle_msg(msg, &data_channel).await;
}
}
Expand All @@ -178,8 +178,8 @@ impl DbHandler {
async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender<Vec<ClientData>>) {
debug!(
"Mqtt dispatcher: {} of {}",
self.reciever.len(),
self.reciever.max_capacity()
self.receiver.len(),
self.receiver.max_capacity()
);

// If the time is greater than upload interval, push to batch upload thread and clear queue
Expand Down Expand Up @@ -237,13 +237,13 @@ impl DbHandler {
}
"location" => {
debug!("Upserting location name: {:?}", msg.values);
self.loc_lock.add_loc_name(
self.location_lock.add_loc_name(
msg.values
.first()
.unwrap_or(&"PizzaTheHut".to_string())
.to_string(),
);
self.is_loc = true;
self.is_location = true;
}
"system" => {
debug!("Upserting system: {:?}", msg.values);
Expand All @@ -262,7 +262,7 @@ impl DbHandler {
}
"GPS-Location" => {
debug!("Upserting location points: {:?}", msg.values);
self.loc_lock.add_points(
self.location_lock.add_points(
msg.values
.first()
.unwrap_or(&"PizzaTheHut".to_string())
Expand All @@ -274,25 +274,25 @@ impl DbHandler {
.parse::<f64>()
.unwrap_or_default(),
);
self.is_loc = true;
self.is_location = true;
}
"Radius" => {
debug!("Upserting location radius: {:?}", msg.values);
self.loc_lock.add_radius(
self.location_lock.add_radius(
msg.values
.first()
.unwrap_or(&"PizzaTheHut".to_string())
.parse::<f64>()
.unwrap_or_default(),
);
self.is_loc = true;
self.is_location = true;
}
_ => {}
}
// if location has been modified, push a new location of the loc lock object returns Some
if self.is_loc {
if self.is_location {
trace!("Checking location status...");
if let Some(loc) = self.loc_lock.finalize() {
if let Some(loc) = self.location_lock.finalize() {
debug!("Upserting location: {:?}", loc);
if let Err(err) = location_service::upsert_location(
&self.db,
Expand All @@ -307,7 +307,7 @@ impl DbHandler {
warn!("Location upsert error: {:?}", err);
}
}
self.is_loc = false;
self.is_location = false;
}

// no matter what, batch upload the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,14 @@ const BASE_MOCK_STRING_DATA: [MockStringData; 2] = [
},
];

pub struct MockReciever {
pub struct MockProcessor {
curr_run: i32,
io: SocketIo,
}

impl MockReciever {
impl MockProcessor {
pub fn new(io: SocketIo) -> Self {
MockReciever { curr_run: 1, io }
MockProcessor { curr_run: 1, io }
}

pub async fn generate_mock(self) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod db_handler;
pub mod mock_reciever;
pub mod mqtt_reciever;
pub mod mock_processor;
pub mod mqtt_processor;

/// Represents the client data
/// This has the dual purposes of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ use crate::{serverdata, services::run_service, Database};
use super::ClientData;
use std::borrow::Cow;

pub struct MqttReciever {
pub struct MqttProcessor {
channel: Sender<ClientData>,
curr_run: i32,
io: SocketIo,
}

impl MqttReciever {
/// Creates a new mqtt reciever and socketio sender
impl MqttProcessor {
/// Creates a new mqtt receiver and socketio and db sender
/// * `channel` - The mpsc channel to send the database data to
/// * `mqtt_path` - The mqtt URI, including port, (without the mqtt://) to subscribe to
/// * `db` - The database to store the data in
/// * `io` - The socketio layer to send the data to
///
/// This is async as it creates the initial run and gets the ID, as well as connecting to and subbing Siren
/// Returns the instance and the event loop, which can be passed into the recieve_mqtt func to begin recieiving
/// Returns the instance and the event loop, which can be passed into the process_mqtt func to begin recieiving
pub async fn new(
channel: Sender<ClientData>,
mqtt_path: String,
db: Database,
io: SocketIo,
) -> (MqttReciever, EventLoop) {
) -> (MqttProcessor, EventLoop) {
// create the mqtt client and configure it
let mut create_opts = MqttOptions::new(
"ScyllaServer",
Expand Down Expand Up @@ -73,7 +73,7 @@ impl MqttReciever {
.expect("Could not subscribe to Siren");

(
MqttReciever {
MqttProcessor {
channel,
curr_run: curr_run.id,
io,
Expand All @@ -84,12 +84,12 @@ impl MqttReciever {

/// This handles the reception of mqtt messages, will not return
/// * `connect` - The eventloop returned by ::new to connect to
pub async fn recieve_mqtt(self, mut connect: EventLoop) {
pub async fn process_mqtt(self, mut connect: EventLoop) {
// process over messages, non blocking
while let Ok(msg) = connect.poll().await {
// safe parse the message
if let Event::Incoming(Packet::Publish(msg)) = msg {
trace!("Recieved mqtt message: {:?}", msg);
trace!("Received mqtt message: {:?}", msg);
// parse the message into the data and the node name it falls under
let item_data = match self.parse_msg(msg).await {
Ok(msg) => msg,
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/services/data_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use prisma_client_rust::{chrono::DateTime, QueryError};

use crate::{prisma, reciever::ClientData, Database};
use crate::{prisma, processors::ClientData, Database};

prisma::data::select! {public_data {
time
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/src/transformers/data_transformer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::Serialize;

use crate::{reciever::ClientData, services::data_service};
use crate::{processors::ClientData, services::data_service};

/// The struct defining the data format sent to the client
#[derive(Serialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
2 changes: 1 addition & 1 deletion scylla-server-rust/tests/data_service_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod test_utils;

use prisma_client_rust::QueryError;
use scylla_server_rust::{
reciever::ClientData,
processors::ClientData,
services::{data_service, data_type_service, node_service, run_service},
transformers::data_transformer::PublicData,
};
Expand Down

0 comments on commit 59e493d

Please sign in to comment.