Skip to content

Commit

Permalink
misc fixes, reconnect logic fix in siren and timestamp logic fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Jul 21, 2024
1 parent 5ecc0f5 commit 1c17163
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 87 deletions.
25 changes: 22 additions & 3 deletions scylla-server-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scylla-server-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ axum = "0.7.5"
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.5.2", features = ["cors"] }
socketioxide = { version = "0.14.0", features = ["tracing"] }
rumqttc = "0.24.0"
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "main"}
tokio-util = { version= "0.7.11", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter"] }
Expand Down
5 changes: 1 addition & 4 deletions scylla-server-rust/prisma-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,4 @@ edition = "2021"
[dependencies]
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", tag = "0.6.11", features = [
"postgresql", "migrations"
], default-features = false }

[profile.release]
strip = true # Automatically strip symbols from the binary.
], default-features = false }
4 changes: 2 additions & 2 deletions scylla-server-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ async fn main() {
// run prod if this isnt present
// create and spawn the mqtt processor
info!("Running processor in MQTT (production) mode");
let (recv, eloop) = MqttProcessor::new(
let recv = MqttProcessor::new(
mqtt_receive,
std::env::var("PROD_SIREN_HOST_URL").unwrap_or("localhost:1883".to_string()),
db.clone(),
io,
)
.await;
tokio::spawn(recv.process_mqtt(eloop));
tokio::spawn(recv.process_mqtt());
}

let app = Router::new()
Expand Down
4 changes: 2 additions & 2 deletions scylla-server-rust/src/processors/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl DbHandler {
}
}

#[instrument(level = Level::DEBUG)]
#[instrument(level = Level::DEBUG, skip(msg))]
async fn batch_upload(msg: Vec<ClientData>, db: &Database) {
info!(
"Batch uploaded: {:?}",
Expand Down Expand Up @@ -176,7 +176,7 @@ impl DbHandler {

#[instrument(skip(self), level = Level::TRACE)]
async fn handle_msg(&mut self, msg: ClientData, data_channel: &Sender<Vec<ClientData>>) {
debug!(
trace!(
"Mqtt dispatcher: {} of {}",
self.receiver.len(),
self.receiver.max_capacity()
Expand Down
154 changes: 83 additions & 71 deletions scylla-server-rust/src/processors/mqtt_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use prisma_client_rust::{chrono, serde_json};
use protobuf::Message;
use rumqttc::v5::{
mqttbytes::v5::{LastWill, Packet, Publish},
AsyncClient, Event, EventLoop, MqttOptions,
AsyncClient, Event, MqttOptions,
};
use socketioxide::SocketIo;
use tokio::sync::mpsc::Sender;
Expand All @@ -20,6 +20,7 @@ pub struct MqttProcessor {
channel: Sender<ClientData>,
curr_run: i32,
io: SocketIo,
mqtt_ops: MqttOptions,
}

impl MqttProcessor {
Expand All @@ -36,7 +37,7 @@ impl MqttProcessor {
mqtt_path: String,
db: Database,
io: SocketIo,
) -> (MqttProcessor, EventLoop) {
) -> MqttProcessor {
// create the mqtt client and configure it
let mut create_opts = MqttOptions::new(
"ScyllaServer",
Expand All @@ -48,83 +49,85 @@ impl MqttProcessor {
.parse::<u16>()
.expect("Invalid Siren port"),
);
create_opts.set_keep_alive(Duration::from_secs(20));
create_opts.set_last_will(LastWill::new(
"Scylla/Status",
"Scylla has disconnected!",
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
true,
None,
));
create_opts.set_clean_start(false);
create_opts
.set_last_will(LastWill::new(
"Scylla/Status",
"Scylla has disconnected!",
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
true,
None,
))
.set_keep_alive(Duration::from_secs(20))
.set_clean_start(false)
.set_connection_timeout(3)
.set_session_expiry_interval(Some(u32::MAX))
.set_topic_alias_max(Some(600));

// creates the initial run
let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis())
.await
.expect("Could not create initial run!");
debug!("Configuring current run: {:?}", curr_run);

// TODO mess with incoming message cap if db, etc. cannot keep up
let (client, connect) = AsyncClient::new(create_opts, 1000);

debug!("Subscribing to siren");
client
.try_subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce)
.expect("Could not subscribe to Siren");

(
MqttProcessor {
channel,
curr_run: curr_run.id,
io,
},
connect,
)
MqttProcessor {
channel,
curr_run: curr_run.id,
io,
mqtt_ops: create_opts,
}
}

/// This handles the reception of mqtt messages, will not return
/// * `connect` - The eventloop returned by ::new to connect to
pub async fn process_mqtt(self, mut connect: EventLoop) {
pub async fn process_mqtt(self) {
let mut spec_interval = tokio::time::interval(Duration::from_secs(3));
// process over messages, non blocking
// TODO mess with incoming message cap if db, etc. cannot keep up
let (client, mut connect) = AsyncClient::new(self.mqtt_ops.clone(), 600);

debug!("Subscribing to siren");
client
.subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce)
.await
.expect("Could not subscribe to Siren");

loop {
#[rustfmt::skip] // rust cannot format this macro for some reason
tokio::select! {
Ok(msg) = connect.poll() => {
// safe parse the message
if let Event::Incoming(Packet::Publish(msg)) = msg {
trace!("Received mqtt message: {:?}", msg);
// parse the message into the data and the node name it falls under
let msg = match self.parse_msg(msg) {
Ok(msg) => msg,
Err(err) => {
warn!("Message parse error: {:?}", err);
continue;
}
};
self.send_db_msg(msg.clone()).await;
self.send_socket_msg(msg).await;
}

},
_ = spec_interval.tick() => {
trace!("Updating viewership data!");
if let Ok(sockets) = self.io.sockets() {
let client_data = ClientData {
name: "Viewers".to_string(),
node: "Internal".to_string(),
unit: "".to_string(),
run_id: self.curr_run,
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![sockets.len().to_string()]
msg = connect.poll() => match msg {
Ok(Event::Incoming(Packet::Publish(msg))) => {
trace!("Received mqtt message: {:?}", msg);
// parse the message into the data and the node name it falls under
let msg = match self.parse_msg(msg) {
Ok(msg) => msg,
Err(err) => {
warn!("Message parse error: {:?}", err);
continue;
}
};
self.send_socket_msg(client_data).await;

self.send_db_msg(msg.clone()).await;
self.send_socket_msg(msg);
},
Err(msg) => trace!("Received mqtt error: {:?}", msg),
_ => trace!("Received misc mqtt: {:?}", msg),
},
_ = spec_interval.tick() => {
trace!("Updating viewership data!");
if let Ok(sockets) = self.io.sockets() {
let client_data = ClientData {
name: "Viewers".to_string(),
node: "Internal".to_string(),
unit: "".to_string(),
run_id: self.curr_run,
timestamp: chrono::offset::Utc::now().timestamp_millis(),
values: vec![sockets.len().to_string()]
};
self.send_socket_msg(client_data);
} else {
warn!("Could not fetch socket count");
}
}

}
}
}
}

Expand All @@ -145,7 +148,7 @@ impl MqttProcessor {

let data_type = split.1.replace('/', "-");

// extract the unix time, returning the current time instead if needed
// extract the unix time, returning the current time instead if either the "ts" user property isnt present or it isnt parsable
// note the Cow magic involves the return from the map is a borrow, but the unwrap cannot as we dont own it
let unix_time = msg
.properties
Expand All @@ -155,29 +158,38 @@ impl MqttProcessor {
.map(Cow::Borrowed)
.find(|f| f.0 == "ts")
.unwrap_or_else(|| {
debug!("Could not find timestamp in mqtt, using current time");
debug!("Could not find timestamp in mqtt, using system time");
Cow::Owned((
"ts".to_string(),
chrono::offset::Utc::now().timestamp_millis().to_string(),
))
})
.into_owned();
.1
.parse::<i64>()
.unwrap_or_else(|err| {
warn!("Invalid timestamp in mqtt, using system time: {}", err);
chrono::offset::Utc::now().timestamp_millis()
});

// parse time, if invalid time error out
let Ok(time_clean) = unix_time.1.parse::<i64>() else {
return Err(format!("Invalid timestamp: {}", unix_time.1));
};
// ts check for bad sources of time which may return 1970
if time_clean < 963014966000 {
return Err(format!("Timestamp before year 2000: {}", unix_time.1));
}
// if both system time and packet timestamp are before year 2000, the message cannot be recorded
let unix_clean = if unix_time < 963014966000 {
debug!("Timestamp before year 2000: {}", unix_time);
let sys_time = chrono::offset::Utc::now().timestamp_millis();
if sys_time < 963014966000 {
return Err("System has no good time, discarding message!".to_string());
}
sys_time
} else {
unix_time
};

Ok(ClientData {
run_id: self.curr_run,
name: data_type,
unit: data.unit,
values: data.value,
timestamp: time_clean,
timestamp: unix_clean,
node: node.to_string(),
})
}
Expand All @@ -192,7 +204,7 @@ impl MqttProcessor {

/// Sends a message to the socket, printing and IGNORING any error that may occur
/// * `client_data` - The client data to send over the broadcast
async fn send_socket_msg(&self, client_data: ClientData) {
fn send_socket_msg(&self, client_data: ClientData) {
match self.io.emit(
"message",
serde_json::to_string(&client_data).expect("Could not serialize ClientData"),
Expand Down
14 changes: 10 additions & 4 deletions siren-base/mosquitto/mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,27 @@ autosave_interval 30

#autosave_on_changes false

#persistence false
# *** diff from tpu
persistence true

#persistence_file mosquitto.db

#persistence_location
# *** diff from tpu
persistence_location /mosquitto/data


# =================================================================
# Logging
# =================================================================
# *** diff from tpu (for docker)
log_dest file /mosquitto/log/mosquitto.log

log_dest stdout

log_type error
log_type warning
log_type notice
log_type information
log_type subscribe
log_type unsubscribe
#log_type information

connection_messages true
Expand Down

0 comments on commit 1c17163

Please sign in to comment.