From 29564028f0f91c9b440e14b9b57f766e214b3474 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 1 Aug 2024 18:35:09 -0400 Subject: [PATCH 01/10] basic proof of concept --- scylla-server-rust/.gitignore | 1 + scylla-server-rust/Cargo.lock | 37 ++++++++++++++ scylla-server-rust/Cargo.toml | 1 + scylla-server-rust/build.rs | 1 + scylla-server-rust/src/controllers/mod.rs | 1 + .../src/controllers/send_config_controller.rs | 49 +++++++++++++++++++ scylla-server-rust/src/error.rs | 10 ++++ scylla-server-rust/src/lib.rs | 1 + scylla-server-rust/src/main.rs | 28 ++++++++--- scylla-server-rust/src/mod.rs | 1 + .../src/processors/mqtt_processor.rs | 42 ++++++++-------- .../src/proto/command_data.proto | 7 +++ 12 files changed, 151 insertions(+), 28 deletions(-) create mode 100644 scylla-server-rust/src/controllers/send_config_controller.rs create mode 100644 scylla-server-rust/src/proto/command_data.proto diff --git a/scylla-server-rust/.gitignore b/scylla-server-rust/.gitignore index 11d3f83a..a9a01cca 100644 --- a/scylla-server-rust/.gitignore +++ b/scylla-server-rust/.gitignore @@ -22,3 +22,4 @@ prisma.rs # protobuf serverdata.rs +command_data.rs diff --git a/scylla-server-rust/Cargo.lock b/scylla-server-rust/Cargo.lock index 51f2f9df..432320e4 100755 --- a/scylla-server-rust/Cargo.lock +++ b/scylla-server-rust/Cargo.lock @@ -288,6 +288,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be6ea09c9b96cb5076af0de2e383bd2bc0c18f827cf1967bdd353e0b910d733" +dependencies = [ + "axum 0.7.5", + "axum-core 0.4.3", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "serde_html_form", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -3201,6 +3224,7 @@ name = "scylla-server-rust" version = "0.0.1" dependencies = [ "axum 0.7.5", + "axum-extra", "clap", "console-subscriber", "prisma-client-rust", @@ -3272,6 +3296,19 @@ dependencies = [ "syn 2.0.67", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap 2.2.6", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.117" diff --git a/scylla-server-rust/Cargo.toml b/scylla-server-rust/Cargo.toml index f7d11bd8..c5ec9fd2 100644 --- a/scylla-server-rust/Cargo.toml +++ b/scylla-server-rust/Cargo.toml @@ -22,6 +22,7 @@ rand = "0.8.5" console-subscriber = { version = "0.3.0", optional = true } ringbuffer = "0.15.0" clap = { version = "4.5.11", features = ["derive", "env"] } +axum-extra = { version = "0.9.3", features = ["query"] } [features] top = ["dep:console-subscriber"] diff --git a/scylla-server-rust/build.rs b/scylla-server-rust/build.rs index a57e70ec..44b9825e 100644 --- a/scylla-server-rust/build.rs +++ b/scylla-server-rust/build.rs @@ -8,6 +8,7 @@ fn main() { .includes(["src/proto"]) // Inputs must reside in some of include paths. .input("src/proto/serverdata.proto") + .input("src/proto/command_data.proto") // Specify output directory relative to Cargo output directory. .out_dir("src") .run_from_script(); diff --git a/scylla-server-rust/src/controllers/mod.rs b/scylla-server-rust/src/controllers/mod.rs index 079a3dab..ba50c11a 100644 --- a/scylla-server-rust/src/controllers/mod.rs +++ b/scylla-server-rust/src/controllers/mod.rs @@ -5,3 +5,4 @@ pub mod location_controller; pub mod node_controller; pub mod run_controller; pub mod system_controller; +pub mod send_config_controller; diff --git a/scylla-server-rust/src/controllers/send_config_controller.rs b/scylla-server-rust/src/controllers/send_config_controller.rs new file mode 100644 index 00000000..4b1c0f67 --- /dev/null +++ b/scylla-server-rust/src/controllers/send_config_controller.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use axum::{extract::Path, Extension}; +use axum_extra::extract::Query; +use protobuf::Message; +use rumqttc::v5::AsyncClient; +use serde::Deserialize; +use tracing::{info, warn}; + +use crate::{command_data::CommandData, error::ScyllaError}; + +#[derive(Deserialize, Debug)] +pub struct ConfigRequest { + pub data: Vec, +} + +pub async fn send_config( + Path(key): Path, + data_query: Query, + Extension(client): Extension>>, +) -> Result<(), ScyllaError> { + info!( + "Sending car config with key: {}, and values: {:?}", + key, data_query.0 + ); + let Some(client) = client else { + return Err(ScyllaError::NotProd); + }; + + let mut payload = CommandData::new(); + payload.data = data_query.0.data; + let Ok(bytes) = payload.write_to_bytes() else { + return Err(ScyllaError::ImpossibleEncoding); + }; + + if let Err(err) = client + .publish( + format!("Calypso/Bidir/Command/{}", key), + rumqttc::v5::mqttbytes::QoS::ExactlyOnce, + false, + bytes, + ) + .await + { + warn!("Could not publish instruction: {}", err); + return Err(ScyllaError::CommFailure); + } + Ok(()) +} diff --git a/scylla-server-rust/src/error.rs b/scylla-server-rust/src/error.rs index 221b4f13..216873ca 100644 --- a/scylla-server-rust/src/error.rs +++ b/scylla-server-rust/src/error.rs @@ -10,7 +10,14 @@ use tracing::warn; pub enum ScyllaError { PrismaError(QueryError), + /// A generic not found for a prisma query NotFound, + /// Not available in mock mode, which the system is in + NotProd, + /// An instruction was not encodable + ImpossibleEncoding, + /// Could not communicate to car + CommFailure, } impl From for ScyllaError { @@ -32,6 +39,9 @@ impl IntoResponse for ScyllaError { } ScyllaError::PrismaError(_) => StatusCode::BAD_REQUEST, ScyllaError::NotFound => StatusCode::NOT_FOUND, + ScyllaError::NotProd => StatusCode::SERVICE_UNAVAILABLE, + ScyllaError::ImpossibleEncoding => StatusCode::UNPROCESSABLE_ENTITY, + ScyllaError::CommFailure => StatusCode::BAD_GATEWAY, }; status.into_response() diff --git a/scylla-server-rust/src/lib.rs b/scylla-server-rust/src/lib.rs index d0b47a4c..04385fe4 100644 --- a/scylla-server-rust/src/lib.rs +++ b/scylla-server-rust/src/lib.rs @@ -9,6 +9,7 @@ pub mod transformers; pub mod prisma; pub mod serverdata; +pub mod command_data; /// The type descriptor of the database passed to the middlelayer through axum state pub type Database = std::sync::Arc; diff --git a/scylla-server-rust/src/main.rs b/scylla-server-rust/src/main.rs index 5453cd0c..f61d2112 100755 --- a/scylla-server-rust/src/main.rs +++ b/scylla-server-rust/src/main.rs @@ -7,10 +7,13 @@ use axum::{ }; use clap::Parser; use prisma_client_rust::chrono; +use rumqttc::v5::AsyncClient; use scylla_server_rust::{ controllers::{ self, data_type_controller, driver_controller, location_controller, node_controller, - run_controller, system_controller, + run_controller, + send_config_controller::{self}, + system_controller, }, prisma::PrismaClient, processors::{ @@ -55,7 +58,12 @@ struct ScyllaArgs { siren_host_url: String, /// The time, in seconds between collection for a batch upsert - #[arg(short = 't', long, env = "SCYLLA_BATCH_UPSERT_TIME", default_value = "10")] + #[arg( + short = 't', + long, + env = "SCYLLA_BATCH_UPSERT_TIME", + default_value = "10" + )] batch_upsert_time: u64, } @@ -138,11 +146,12 @@ async fn main() { token.clone(), )); - // if PROD_SCYLLA=false - if !cli.prod { + // if PROD_SCYLLA=false, also procur a client for use in the config state + let client: Option> = if !cli.prod { info!("Running processor in mock mode, no data will be stored"); let recv = MockProcessor::new(io); tokio::spawn(recv.generate_mock()); + None } else { // creates the initial run let curr_run = run_service::create_run(&db, chrono::offset::Utc::now().timestamp_millis()) @@ -153,15 +162,18 @@ async fn main() { // run prod if this isnt present // create and spawn the mqtt processor info!("Running processor in MQTT (production) mode"); - let recv = MqttProcessor::new( + let (recv, opts) = MqttProcessor::new( mqtt_send, new_run_receive, cli.siren_host_url, curr_run.id, io, ); - tokio::spawn(recv.process_mqtt()); - } + let (client, eventloop) = AsyncClient::new(opts, 600); + let client_sharable: Arc = Arc::new(client); + tokio::spawn(recv.process_mqtt(client_sharable.clone(), eventloop)); + Some(client_sharable) + }; let app = Router::new() // DATA ROUTES @@ -186,6 +198,8 @@ async fn main() { ) // SYSTEMS .route("/systems", get(system_controller::get_all_systems)) + // CONFIG + .route("/config/:key", post(send_config_controller::send_config).layer(Extension(client))) // for CORS handling .layer( CorsLayer::new() diff --git a/scylla-server-rust/src/mod.rs b/scylla-server-rust/src/mod.rs index 31b78fdf..b7017c61 100644 --- a/scylla-server-rust/src/mod.rs +++ b/scylla-server-rust/src/mod.rs @@ -1,3 +1,4 @@ // @generated +pub mod command_data; pub mod serverdata; diff --git a/scylla-server-rust/src/processors/mqtt_processor.rs b/scylla-server-rust/src/processors/mqtt_processor.rs index 7f34c0a6..670f5422 100644 --- a/scylla-server-rust/src/processors/mqtt_processor.rs +++ b/scylla-server-rust/src/processors/mqtt_processor.rs @@ -1,12 +1,12 @@ use core::fmt; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use prisma_client_rust::{bigdecimal::ToPrimitive, chrono, serde_json}; use protobuf::Message; use ringbuffer::RingBuffer; use rumqttc::v5::{ mqttbytes::v5::{LastWill, Packet, Publish}, - AsyncClient, Event, MqttOptions, + AsyncClient, Event, EventLoop, MqttOptions, }; use socketioxide::SocketIo; use tokio::sync::mpsc::{Receiver, Sender}; @@ -22,7 +22,6 @@ pub struct MqttProcessor { new_run_channel: Receiver, curr_run: i32, io: SocketIo, - mqtt_ops: MqttOptions, } impl MqttProcessor { @@ -32,16 +31,16 @@ impl MqttProcessor { /// * `db` - The database to store the data in /// * `io` - The socketio layer to send the data to /// - /// Returns the instance and the event loop, which can be passed into the process_mqtt func to begin recieiving + /// Returns the instance and options to create a client, which is then used in the process_mqtt loop pub fn new( channel: Sender, new_run_channel: Receiver, mqtt_path: String, initial_run: i32, io: SocketIo, - ) -> MqttProcessor { + ) -> (MqttProcessor, MqttOptions) { // create the mqtt client and configure it - let mut create_opts = MqttOptions::new( + let mut mqtt_opts = MqttOptions::new( "ScyllaServer", mqtt_path.split_once(':').expect("Invalid Siren URL").0, mqtt_path @@ -51,7 +50,7 @@ impl MqttProcessor { .parse::() .expect("Invalid Siren port"), ); - create_opts + mqtt_opts .set_last_will(LastWill::new( "Scylla/Status", "Scylla has disconnected!", @@ -65,27 +64,28 @@ impl MqttProcessor { .set_session_expiry_interval(Some(u32::MAX)) .set_topic_alias_max(Some(600)); - MqttProcessor { - channel, - new_run_channel, - curr_run: initial_run, - io, - mqtt_ops: create_opts, - } + // TODO mess with incoming message cap if db, etc. cannot keep up + + ( + MqttProcessor { + channel, + new_run_channel, + curr_run: initial_run, + io, + }, + mqtt_opts, + ) } /// This handles the reception of mqtt messages, will not return - /// * `connect` - The eventloop returned by ::new to connect to - pub async fn process_mqtt(mut self) { + /// * `eventloop` - The eventloop returned by ::new to connect to. The loop isnt sync so this is the best that can be done + /// * `client` - The async mqttt v5 client to use for subscriptions + pub async fn process_mqtt(mut self, client: Arc, mut eventloop: EventLoop) { let mut view_interval = tokio::time::interval(Duration::from_secs(3)); let mut latency_interval = tokio::time::interval(Duration::from_millis(250)); let mut latency_ringbuffer = ringbuffer::AllocRingBuffer::::new(20); - // process over messages, non blocking - // TODO mess with incoming message cap if db, etc. cannot keep up - let (client, mut connect) = AsyncClient::new(self.mqtt_ops.clone(), 600); - debug!("Subscribing to siren"); client .subscribe("#", rumqttc::v5::mqttbytes::QoS::ExactlyOnce) @@ -95,7 +95,7 @@ impl MqttProcessor { loop { #[rustfmt::skip] // rust cannot format this macro for some reason tokio::select! { - msg = connect.poll() => match msg { + msg = eventloop.poll() => match msg { Ok(Event::Incoming(Packet::Publish(msg))) => { trace!("Received mqtt message: {:?}", msg); // parse the message into the data and the node name it falls under diff --git a/scylla-server-rust/src/proto/command_data.proto b/scylla-server-rust/src/proto/command_data.proto new file mode 100644 index 00000000..fe6b7744 --- /dev/null +++ b/scylla-server-rust/src/proto/command_data.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package command_data.v1; + +message CommandData { + repeated float data = 1; +} From 2e68c6179ce81543e88c24a238e7bd77cafee74e Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 1 Aug 2024 19:10:17 -0400 Subject: [PATCH 02/10] misc improvements and cleanup of cursory processing --- scylla-server-rust/src/controllers/mod.rs | 2 +- .../src/controllers/send_config_controller.rs | 6 ++-- scylla-server-rust/src/lib.rs | 2 +- scylla-server-rust/src/main.rs | 5 +++- .../src/processors/db_handler.rs | 30 ++++++++----------- .../src/processors/mqtt_processor.rs | 22 ++++++++++---- 6 files changed, 40 insertions(+), 27 deletions(-) diff --git a/scylla-server-rust/src/controllers/mod.rs b/scylla-server-rust/src/controllers/mod.rs index ba50c11a..2417f623 100644 --- a/scylla-server-rust/src/controllers/mod.rs +++ b/scylla-server-rust/src/controllers/mod.rs @@ -4,5 +4,5 @@ pub mod driver_controller; pub mod location_controller; pub mod node_controller; pub mod run_controller; -pub mod system_controller; pub mod send_config_controller; +pub mod system_controller; diff --git a/scylla-server-rust/src/controllers/send_config_controller.rs b/scylla-server-rust/src/controllers/send_config_controller.rs index 4b1c0f67..8b07a405 100644 --- a/scylla-server-rust/src/controllers/send_config_controller.rs +++ b/scylla-server-rust/src/controllers/send_config_controller.rs @@ -9,6 +9,8 @@ use tracing::{info, warn}; use crate::{command_data::CommandData, error::ScyllaError}; +pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/"; + #[derive(Deserialize, Debug)] pub struct ConfigRequest { pub data: Vec, @@ -21,7 +23,7 @@ pub async fn send_config( ) -> Result<(), ScyllaError> { info!( "Sending car config with key: {}, and values: {:?}", - key, data_query.0 + key, data_query.0.data ); let Some(client) = client else { return Err(ScyllaError::NotProd); @@ -35,7 +37,7 @@ pub async fn send_config( if let Err(err) = client .publish( - format!("Calypso/Bidir/Command/{}", key), + format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key), rumqttc::v5::mqttbytes::QoS::ExactlyOnce, false, bytes, diff --git a/scylla-server-rust/src/lib.rs b/scylla-server-rust/src/lib.rs index 04385fe4..f3bd395c 100644 --- a/scylla-server-rust/src/lib.rs +++ b/scylla-server-rust/src/lib.rs @@ -8,8 +8,8 @@ pub mod transformers; #[allow(warnings)] pub mod prisma; -pub mod serverdata; pub mod command_data; +pub mod serverdata; /// The type descriptor of the database passed to the middlelayer through axum state pub type Database = std::sync::Arc; diff --git a/scylla-server-rust/src/main.rs b/scylla-server-rust/src/main.rs index f61d2112..136cd3d8 100755 --- a/scylla-server-rust/src/main.rs +++ b/scylla-server-rust/src/main.rs @@ -199,7 +199,10 @@ async fn main() { // SYSTEMS .route("/systems", get(system_controller::get_all_systems)) // CONFIG - .route("/config/:key", post(send_config_controller::send_config).layer(Extension(client))) + .route( + "/config/:key", + post(send_config_controller::send_config).layer(Extension(client)), + ) // for CORS handling .layer( CorsLayer::new() diff --git a/scylla-server-rust/src/processors/db_handler.rs b/scylla-server-rust/src/processors/db_handler.rs index c65f09b9..6541d24d 100644 --- a/scylla-server-rust/src/processors/db_handler.rs +++ b/scylla-server-rust/src/processors/db_handler.rs @@ -112,7 +112,7 @@ impl DbHandler { /// It uses the queue from data queue to insert to the database specified /// On cancellation, will await one final queue message to cleanup anything remaining in the channel pub async fn batching_loop( - mut data_queue: Receiver>, + mut batch_queue: Receiver>, database: Database, saturate_batches: bool, cancel_token: CancellationToken, @@ -120,22 +120,18 @@ impl DbHandler { loop { tokio::select! { _ = cancel_token.cancelled() => { - loop { - info!("{} batches remaining!", data_queue.len()); - if let Some(final_msgs) = data_queue.recv().await { - info!( - "A cleanup batch uploaded: {:?}", - data_service::add_many(&database, final_msgs).await - ); - } else { - info!("No more messages to cleanup."); - break; + // cleanup all remaining messages if batches start backing up + while let Some(final_msgs) = batch_queue.recv().await { + info!("{} batches remaining!", batch_queue.len()+1); + info!( + "A cleanup batch uploaded: {:?}", + data_service::add_many(&database, final_msgs).await + ); } - - } - break; + info!("No more messages to cleanup."); + break; }, - Some(msgs) = data_queue.recv() => { + Some(msgs) = batch_queue.recv() => { if saturate_batches { let shared_db = database.clone(); tokio::spawn(async move { @@ -146,8 +142,8 @@ impl DbHandler { } debug!( "DB send: {} of {}", - data_queue.len(), - data_queue.max_capacity() + batch_queue.len(), + batch_queue.max_capacity() ); } } diff --git a/scylla-server-rust/src/processors/mqtt_processor.rs b/scylla-server-rust/src/processors/mqtt_processor.rs index 670f5422..01ac3d7b 100644 --- a/scylla-server-rust/src/processors/mqtt_processor.rs +++ b/scylla-server-rust/src/processors/mqtt_processor.rs @@ -12,7 +12,10 @@ use socketioxide::SocketIo; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn, Level}; -use crate::{serverdata, services::run_service}; +use crate::{ + controllers::send_config_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, + services::run_service, +}; use super::ClientData; use std::borrow::Cow; @@ -161,14 +164,23 @@ impl MqttProcessor { /// returns the ClientData, or the Err of something that can be debug printed #[instrument(skip(self), level = Level::TRACE)] fn parse_msg(&self, msg: Publish) -> Result { - let data = serverdata::ServerData::parse_from_bytes(&msg.payload) - .map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?; + let topic = std::str::from_utf8(&msg.topic) + .map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))?; + + // ignore command messages, less confusing in logs than just failing to decode protobuf + if topic.starts_with(CALYPSO_BIDIR_CMD_PREFIX) { + return Err(format!("Skipping command message: {}", topic)); + } - let split = std::str::from_utf8(&msg.topic) - .map_err(|f| format!("Could not parse topic: {}, topic: {:?}", f, msg.topic))? + let split = topic .split_once('/') .ok_or(&format!("Could not parse nesting: {:?}", msg.topic))?; + // look at data after topic as if we dont have a topic the protobuf is useless anyways + let data = serverdata::ServerData::parse_from_bytes(&msg.payload) + .map_err(|f| format!("Could not parse message topic:{:?} error: {}", msg.topic, f))?; + + // get the node and datatype from the topic extracted at the beginning let node = split.0; let data_type = split.1.replace('/', "-"); From b4eda5b28381fcb7cb17124733d6f48ee8262e5d Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 2 Aug 2024 18:44:24 -0400 Subject: [PATCH 03/10] implement reids review suggestions --- ...onfig_controller.rs => car_command_controller.rs} | 9 +++++++++ scylla-server-rust/src/controllers/mod.rs | 2 +- scylla-server-rust/src/main.rs | 12 ++++++------ scylla-server-rust/src/processors/mqtt_processor.rs | 2 +- 4 files changed, 17 insertions(+), 8 deletions(-) rename scylla-server-rust/src/controllers/{send_config_controller.rs => car_command_controller.rs} (61%) diff --git a/scylla-server-rust/src/controllers/send_config_controller.rs b/scylla-server-rust/src/controllers/car_command_controller.rs similarity index 61% rename from scylla-server-rust/src/controllers/send_config_controller.rs rename to scylla-server-rust/src/controllers/car_command_controller.rs index 8b07a405..4b59cfe9 100644 --- a/scylla-server-rust/src/controllers/send_config_controller.rs +++ b/scylla-server-rust/src/controllers/car_command_controller.rs @@ -9,6 +9,7 @@ use tracing::{info, warn}; use crate::{command_data::CommandData, error::ScyllaError}; +/// the prefix for the calypso topic, so topic of cmd is this plus the key appended on pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/"; #[derive(Deserialize, Debug)] @@ -16,6 +17,10 @@ pub struct ConfigRequest { pub data: Vec, } +/// This controller recieves two peices of info: the key from the path, and the list of float values from the query params +/// The key is used in accordance with the calypso specification: a command_data object is sent over siren to CALYPSO_BIDIR_CMD_PREFIX/ +/// That command_data object is processed and matched via the key to a YAML specification for the encoding and sending of a CAN message +/// TLDR: This triggers a command with the values for calypso to update the CAN packet sent to the bus. pub async fn send_config( Path(key): Path, data_query: Query, @@ -25,16 +30,19 @@ pub async fn send_config( "Sending car config with key: {}, and values: {:?}", key, data_query.0.data ); + // disable scylla if not prod, as there will be None mqtt client let Some(client) = client else { return Err(ScyllaError::NotProd); }; + // create a payload object of the values to be parsed by calypso into a CAN packet let mut payload = CommandData::new(); payload.data = data_query.0.data; let Ok(bytes) = payload.write_to_bytes() else { return Err(ScyllaError::ImpossibleEncoding); }; + // publish the message to the topic that calypso's encoder is susbcribed to if let Err(err) = client .publish( format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key), @@ -47,5 +55,6 @@ pub async fn send_config( warn!("Could not publish instruction: {}", err); return Err(ScyllaError::CommFailure); } + Ok(()) } diff --git a/scylla-server-rust/src/controllers/mod.rs b/scylla-server-rust/src/controllers/mod.rs index 2417f623..cf82adb2 100644 --- a/scylla-server-rust/src/controllers/mod.rs +++ b/scylla-server-rust/src/controllers/mod.rs @@ -1,8 +1,8 @@ +pub mod car_command_controller; pub mod data_controller; pub mod data_type_controller; pub mod driver_controller; pub mod location_controller; pub mod node_controller; pub mod run_controller; -pub mod send_config_controller; pub mod system_controller; diff --git a/scylla-server-rust/src/main.rs b/scylla-server-rust/src/main.rs index 136cd3d8..8cd86986 100755 --- a/scylla-server-rust/src/main.rs +++ b/scylla-server-rust/src/main.rs @@ -10,10 +10,10 @@ use prisma_client_rust::chrono; use rumqttc::v5::AsyncClient; use scylla_server_rust::{ controllers::{ - self, data_type_controller, driver_controller, location_controller, node_controller, - run_controller, - send_config_controller::{self}, - system_controller, + self, + car_command_controller::{self}, + data_type_controller, driver_controller, location_controller, node_controller, + run_controller, system_controller, }, prisma::PrismaClient, processors::{ @@ -200,8 +200,8 @@ async fn main() { .route("/systems", get(system_controller::get_all_systems)) // CONFIG .route( - "/config/:key", - post(send_config_controller::send_config).layer(Extension(client)), + "/config/set/:configKey", + post(car_command_controller::send_config).layer(Extension(client)), ) // for CORS handling .layer( diff --git a/scylla-server-rust/src/processors/mqtt_processor.rs b/scylla-server-rust/src/processors/mqtt_processor.rs index 01ac3d7b..df2d214c 100644 --- a/scylla-server-rust/src/processors/mqtt_processor.rs +++ b/scylla-server-rust/src/processors/mqtt_processor.rs @@ -13,7 +13,7 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, instrument, trace, warn, Level}; use crate::{ - controllers::send_config_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, + controllers::car_command_controller::CALYPSO_BIDIR_CMD_PREFIX, serverdata, services::run_service, }; From 51e41c180fe5e8c7dbc0602dd2317e0d2e1050f6 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 2 Aug 2024 19:07:00 -0400 Subject: [PATCH 04/10] try and fix clippy --- scylla-server-rust/src/services/data_service.rs | 4 ++-- scylla-server-rust/src/services/data_type_service.rs | 4 ++-- scylla-server-rust/src/services/driver_service.rs | 4 ++-- scylla-server-rust/src/services/location_service.rs | 4 ++-- scylla-server-rust/src/services/node_service.rs | 4 ++-- scylla-server-rust/src/services/run_service.rs | 8 ++++---- scylla-server-rust/src/services/system_service.rs | 4 ++-- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/scylla-server-rust/src/services/data_service.rs b/scylla-server-rust/src/services/data_service.rs index d3a65be1..729c9f43 100644 --- a/scylla-server-rust/src/services/data_service.rs +++ b/scylla-server-rust/src/services/data_service.rs @@ -13,7 +13,7 @@ prisma::data::select! {public_data { /// * `run_id` - The run id to filter the data /// * `fetch_run` whether to fetch the run assocaited with this data /// * `fetch_data_type` whether to fetch the data type associated with this data -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn get_data( db: &Database, data_type_name: String, @@ -35,7 +35,7 @@ pub async fn get_data( /// * `unix_time` - The time im miliseconds since unix epoch of the message /// * `data_type_name` - The name of the data type, note this data type must already exist! /// * `rin_id` - The run id to assign the data point to, note this run must already exist! -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn add_data( db: &Database, client_data: ClientData, diff --git a/scylla-server-rust/src/services/data_type_service.rs b/scylla-server-rust/src/services/data_type_service.rs index 8e1aabb5..2cf87278 100644 --- a/scylla-server-rust/src/services/data_type_service.rs +++ b/scylla-server-rust/src/services/data_type_service.rs @@ -9,7 +9,7 @@ prisma::data_type::select! {public_datatype { /// Gets all datatypes /// * `db` - The prisma client to make the call to -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn get_all_data_types(db: &Database) -> Result, QueryError> { db.data_type() .find_many(vec![]) @@ -23,7 +23,7 @@ pub async fn get_all_data_types(db: &Database) -> Result Result, QueryError> { db.driver() .find_many(vec![]) @@ -31,7 +31,7 @@ pub async fn get_all_drivers(db: &Database) -> Result, /// * `db` - The prisma client to make the call to /// * `driver_name` - The name of the driver to upsert /// * `run_id` - The id of the run to link to the driver, must already exist! -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn upsert_driver( db: &Database, driver_name: String, diff --git a/scylla-server-rust/src/services/location_service.rs b/scylla-server-rust/src/services/location_service.rs index 04cf5796..717045ba 100644 --- a/scylla-server-rust/src/services/location_service.rs +++ b/scylla-server-rust/src/services/location_service.rs @@ -21,7 +21,7 @@ prisma::location::select! {public_location{ /// Gets all locations /// * `db` - The prisma client to make the call to -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn get_all_locations(db: &Database) -> Result, QueryError> { db.location() .find_many(vec![]) @@ -37,7 +37,7 @@ pub async fn get_all_locations(db: &Database) -> Result Result, QueryError> { db.node() .find_many(vec![]) @@ -24,7 +24,7 @@ pub async fn get_all_nodes(db: &Database) -> Result, Quer /// Upserts a node, either creating or updating one depending on its existence /// * `db` - The prisma client to make the call to /// * `node_name` - The name of the node linked to the data type -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn upsert_node( db: &Database, node_name: String, diff --git a/scylla-server-rust/src/services/run_service.rs b/scylla-server-rust/src/services/run_service.rs index 3e1581fa..5520851e 100644 --- a/scylla-server-rust/src/services/run_service.rs +++ b/scylla-server-rust/src/services/run_service.rs @@ -17,7 +17,7 @@ prisma::run::select! {public_run{ /// Gets all runs /// * `db` - The prisma client to make the call to -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn get_all_runs(db: &Database) -> Result, QueryError> { db.run() .find_many(vec![]) @@ -29,7 +29,7 @@ pub async fn get_all_runs(db: &Database) -> Result, QueryE /// Gets a single run by its id /// * `db` - The prisma client to make the call to /// * `run_id` - The id of the run to search for -/// returns: A result containing the data (or None if the `run_id` was not a valid run) or the QueryError propogated by the db +/// returns: A result containing the data (or None if the `run_id` was not a valid run) or the QueryError propogated by the db pub async fn get_run_by_id( db: &Database, run_id: i32, @@ -44,7 +44,7 @@ pub async fn get_run_by_id( /// Creates a run /// * `db` - The prisma client to make the call to /// * `timestamp` - The unix time since epoch in miliseconds when the run starts -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn create_run(db: &Database, timestamp: i64) -> Result { db.run() .create( @@ -62,7 +62,7 @@ pub async fn create_run(db: &Database, timestamp: i64) -> Result Result, QueryError> { db.system() .find_many(vec![]) @@ -34,7 +34,7 @@ pub async fn get_all_systems(db: &Database) -> Result, /// * `db` - The prisma client to make the call to /// * `system_name` - The system name name to upsert /// * `run_id` - The id of the run to link to the system, must already exist -/// returns: A result containing the data or the QueryError propogated by the db +/// returns: A result containing the data or the QueryError propogated by the db pub async fn upsert_system( db: &Database, system_name: String, From f278a9f383900222632e2a2f8281a305f45d8818 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 2 Aug 2024 19:21:50 -0400 Subject: [PATCH 05/10] and again fix clippy --- scylla-server-rust/src/processors/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla-server-rust/src/processors/mod.rs b/scylla-server-rust/src/processors/mod.rs index e28de9c8..3df66cf7 100644 --- a/scylla-server-rust/src/processors/mod.rs +++ b/scylla-server-rust/src/processors/mod.rs @@ -7,7 +7,7 @@ pub mod mqtt_processor; /// This has the dual purposes of /// * - representing the packet sent over the socket for live data /// * - representing the struct for the service layer to unpack for insertion -/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet +/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet #[derive(serde::Serialize, Clone, Debug)] pub struct ClientData { pub run_id: i32, From 6de996b754eef552d5bd224dba7211bf86e18530 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 2 Aug 2024 19:56:31 -0400 Subject: [PATCH 06/10] make data query optional to reset calypso to default if calypso receives a short or emtpy protobuf command_data, it will fill the extra values with the default value specified in the YAML. so we should allow the client to specify blank which could end up meaning a "Reset to Default" button. --- .../src/controllers/car_command_controller.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/scylla-server-rust/src/controllers/car_command_controller.rs b/scylla-server-rust/src/controllers/car_command_controller.rs index 4b59cfe9..d1646c6f 100644 --- a/scylla-server-rust/src/controllers/car_command_controller.rs +++ b/scylla-server-rust/src/controllers/car_command_controller.rs @@ -14,12 +14,13 @@ pub const CALYPSO_BIDIR_CMD_PREFIX: &str = "Calypso/Bidir/Command/"; #[derive(Deserialize, Debug)] pub struct ConfigRequest { - pub data: Vec, + pub data: Option>, } /// This controller recieves two peices of info: the key from the path, and the list of float values from the query params /// The key is used in accordance with the calypso specification: a command_data object is sent over siren to CALYPSO_BIDIR_CMD_PREFIX/ -/// That command_data object is processed and matched via the key to a YAML specification for the encoding and sending of a CAN message +/// That command_data object is processed and matched via the key to a YAML specification for the encoding and sending of a CAN message. +/// If the data is invalid, too short, etc. the default feild from the YAML is instead sent, as with when the car initially turns on before argos connects. /// TLDR: This triggers a command with the values for calypso to update the CAN packet sent to the bus. pub async fn send_config( Path(key): Path, @@ -37,7 +38,10 @@ pub async fn send_config( // create a payload object of the values to be parsed by calypso into a CAN packet let mut payload = CommandData::new(); - payload.data = data_query.0.data; + // writing an empty protobuf is OK, that indicates to calypso that the default value should be used + if let Some(data) = data_query.0.data { + payload.data = data; + } let Ok(bytes) = payload.write_to_bytes() else { return Err(ScyllaError::ImpossibleEncoding); }; From 67c4a3ba52071d422286fb3cb835175e8cad08cc Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Fri, 2 Aug 2024 20:02:51 -0400 Subject: [PATCH 07/10] clippy is trolling me --- scylla-server-rust/src/processors/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla-server-rust/src/processors/mod.rs b/scylla-server-rust/src/processors/mod.rs index 3df66cf7..afb3bbef 100644 --- a/scylla-server-rust/src/processors/mod.rs +++ b/scylla-server-rust/src/processors/mod.rs @@ -7,7 +7,7 @@ pub mod mqtt_processor; /// This has the dual purposes of /// * - representing the packet sent over the socket for live data /// * - representing the struct for the service layer to unpack for insertion -/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet +/// Note: node name is only considered for database storage and convenience, it is not serialized in a socket packet #[derive(serde::Serialize, Clone, Debug)] pub struct ClientData { pub run_id: i32, From 17b62c0bb095adeca97bbf0bfe975b0d19c1ccf7 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Thu, 8 Aug 2024 20:06:47 -0400 Subject: [PATCH 08/10] fixup error handling for plaintext error message --- .../src/controllers/car_command_controller.rs | 12 +++-- .../src/controllers/run_controller.rs | 2 +- scylla-server-rust/src/error.rs | 49 +++++++++++-------- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/scylla-server-rust/src/controllers/car_command_controller.rs b/scylla-server-rust/src/controllers/car_command_controller.rs index d1646c6f..0e9557e9 100644 --- a/scylla-server-rust/src/controllers/car_command_controller.rs +++ b/scylla-server-rust/src/controllers/car_command_controller.rs @@ -33,7 +33,9 @@ pub async fn send_config( ); // disable scylla if not prod, as there will be None mqtt client let Some(client) = client else { - return Err(ScyllaError::NotProd); + return Err(ScyllaError::NotProd( + "Car config sending is disabled in mock mode!".to_string(), + )); }; // create a payload object of the values to be parsed by calypso into a CAN packet @@ -43,7 +45,9 @@ pub async fn send_config( payload.data = data; } let Ok(bytes) = payload.write_to_bytes() else { - return Err(ScyllaError::ImpossibleEncoding); + return Err(ScyllaError::ImpossibleEncoding( + "Payload could not be written!".to_string(), + )); }; // publish the message to the topic that calypso's encoder is susbcribed to @@ -57,7 +61,9 @@ pub async fn send_config( .await { warn!("Could not publish instruction: {}", err); - return Err(ScyllaError::CommFailure); + return Err(ScyllaError::CommFailure( + "Siren publish for instruction failed!".to_string(), + )); } Ok(()) diff --git a/scylla-server-rust/src/controllers/run_controller.rs b/scylla-server-rust/src/controllers/run_controller.rs index 984f6db2..528c4405 100644 --- a/scylla-server-rust/src/controllers/run_controller.rs +++ b/scylla-server-rust/src/controllers/run_controller.rs @@ -25,7 +25,7 @@ pub async fn get_run_by_id( let run_data = run_service::get_run_by_id(&db, run_id).await?; if run_data.is_none() { - return Err(ScyllaError::NotFound); + return Err(ScyllaError::EmptyResult); } let run_data_safe = run_data.unwrap(); diff --git a/scylla-server-rust/src/error.rs b/scylla-server-rust/src/error.rs index 216873ca..a0c09895 100644 --- a/scylla-server-rust/src/error.rs +++ b/scylla-server-rust/src/error.rs @@ -10,40 +10,49 @@ use tracing::warn; pub enum ScyllaError { PrismaError(QueryError), - /// A generic not found for a prisma query - NotFound, /// Not available in mock mode, which the system is in - NotProd, + NotProd(String), /// An instruction was not encodable - ImpossibleEncoding, + ImpossibleEncoding(String), /// Could not communicate to car - CommFailure, + CommFailure(String), + /// A query turned up empty that should not have + EmptyResult, } impl From for ScyllaError { fn from(error: QueryError) -> Self { - warn!("Query error: {:?}", error); - match error { - e if e.is_prisma_error::() => ScyllaError::NotFound, - e => ScyllaError::PrismaError(e), - } + ScyllaError::PrismaError(error) } } // This centralizes all different errors from our app in one place impl IntoResponse for ScyllaError { fn into_response(self) -> Response { - let status = match self { - ScyllaError::PrismaError(error) if error.is_prisma_error::() => { - StatusCode::CONFLICT - } - ScyllaError::PrismaError(_) => StatusCode::BAD_REQUEST, - ScyllaError::NotFound => StatusCode::NOT_FOUND, - ScyllaError::NotProd => StatusCode::SERVICE_UNAVAILABLE, - ScyllaError::ImpossibleEncoding => StatusCode::UNPROCESSABLE_ENTITY, - ScyllaError::CommFailure => StatusCode::BAD_GATEWAY, + let (status, reason) = match self { + ScyllaError::PrismaError(error) if error.is_prisma_error::() => ( + StatusCode::CONFLICT, + format!("Unique Key Violation: {}", error), + ), + ScyllaError::PrismaError(error) if error.is_prisma_error::() => ( + StatusCode::NOT_FOUND, + format!("Record Not Found: {}", error), + ), + ScyllaError::PrismaError(error) => ( + StatusCode::BAD_REQUEST, + format!("Misc query error: {}", error), + ), + ScyllaError::NotProd(reason) => (StatusCode::SERVICE_UNAVAILABLE, reason), + ScyllaError::ImpossibleEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason), + ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason), + ScyllaError::EmptyResult => ( + StatusCode::NOT_FOUND, + "Fetched an empty result that should not be!".to_string(), + ), }; - status.into_response() + warn!("Routing error: {}: {}", status, reason); + + (status, reason).into_response() } } From 4ab7ee66f82f62bc9c85b47573ccdd3c1aaf1619 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sat, 10 Aug 2024 15:09:53 -0400 Subject: [PATCH 09/10] implement reid fixes --- .../src/controllers/car_command_controller.rs | 25 ++++++++++--------- scylla-server-rust/src/error.rs | 5 ++-- scylla-server-rust/src/main.rs | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/scylla-server-rust/src/controllers/car_command_controller.rs b/scylla-server-rust/src/controllers/car_command_controller.rs index 0e9557e9..a6f440a3 100644 --- a/scylla-server-rust/src/controllers/car_command_controller.rs +++ b/scylla-server-rust/src/controllers/car_command_controller.rs @@ -17,19 +17,20 @@ pub struct ConfigRequest { pub data: Option>, } -/// This controller recieves two peices of info: the key from the path, and the list of float values from the query params -/// The key is used in accordance with the calypso specification: a command_data object is sent over siren to CALYPSO_BIDIR_CMD_PREFIX/ -/// That command_data object is processed and matched via the key to a YAML specification for the encoding and sending of a CAN message. -/// If the data is invalid, too short, etc. the default feild from the YAML is instead sent, as with when the car initially turns on before argos connects. -/// TLDR: This triggers a command with the values for calypso to update the CAN packet sent to the bus. -pub async fn send_config( +/// Sends a configuration to the car over MQTT +/// * `key` - The key of the configuration, as defined in the cangen YAML +/// * `data_query` - The data of the configuration, a URI query list of data=. If empty or too short, filled with cangen YAMl defaults +/// * `client` - The MQTT client to be used to send the data +/// +/// More info: This follows the specification of sending a command_data object over siren to topic CALYPSO_BIDIR_CMD_PREFIX/ +pub async fn send_config_command( Path(key): Path, - data_query: Query, + Query(data_query): Query, Extension(client): Extension>>, ) -> Result<(), ScyllaError> { info!( "Sending car config with key: {}, and values: {:?}", - key, data_query.0.data + key, data_query.data ); // disable scylla if not prod, as there will be None mqtt client let Some(client) = client else { @@ -38,14 +39,14 @@ pub async fn send_config( )); }; - // create a payload object of the values to be parsed by calypso into a CAN packet + // the protobuf calypso converts into CAN let mut payload = CommandData::new(); - // writing an empty protobuf is OK, that indicates to calypso that the default value should be used - if let Some(data) = data_query.0.data { + // empty "data" in the protobuf tells calypso to use the default value + if let Some(data) = data_query.data { payload.data = data; } let Ok(bytes) = payload.write_to_bytes() else { - return Err(ScyllaError::ImpossibleEncoding( + return Err(ScyllaError::InvalidEncoding( "Payload could not be written!".to_string(), )); }; diff --git a/scylla-server-rust/src/error.rs b/scylla-server-rust/src/error.rs index a0c09895..8d660ccd 100644 --- a/scylla-server-rust/src/error.rs +++ b/scylla-server-rust/src/error.rs @@ -9,11 +9,12 @@ use prisma_client_rust::{ use tracing::warn; pub enum ScyllaError { + /// Any prisma query which errors out PrismaError(QueryError), /// Not available in mock mode, which the system is in NotProd(String), /// An instruction was not encodable - ImpossibleEncoding(String), + InvalidEncoding(String), /// Could not communicate to car CommFailure(String), /// A query turned up empty that should not have @@ -43,7 +44,7 @@ impl IntoResponse for ScyllaError { format!("Misc query error: {}", error), ), ScyllaError::NotProd(reason) => (StatusCode::SERVICE_UNAVAILABLE, reason), - ScyllaError::ImpossibleEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason), + ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason), ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason), ScyllaError::EmptyResult => ( StatusCode::NOT_FOUND, diff --git a/scylla-server-rust/src/main.rs b/scylla-server-rust/src/main.rs index 8cd86986..71c53d49 100755 --- a/scylla-server-rust/src/main.rs +++ b/scylla-server-rust/src/main.rs @@ -201,7 +201,7 @@ async fn main() { // CONFIG .route( "/config/set/:configKey", - post(car_command_controller::send_config).layer(Extension(client)), + post(car_command_controller::send_config_command).layer(Extension(client)), ) // for CORS handling .layer( From fdaf164a3563e0600956e07f5f8d348ab74fc4df Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Sat, 10 Aug 2024 20:52:40 -0400 Subject: [PATCH 10/10] remove not prod error --- scylla-server-rust/src/controllers/car_command_controller.rs | 5 ++--- scylla-server-rust/src/error.rs | 3 --- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/scylla-server-rust/src/controllers/car_command_controller.rs b/scylla-server-rust/src/controllers/car_command_controller.rs index a6f440a3..d36ec09a 100644 --- a/scylla-server-rust/src/controllers/car_command_controller.rs +++ b/scylla-server-rust/src/controllers/car_command_controller.rs @@ -34,9 +34,8 @@ pub async fn send_config_command( ); // disable scylla if not prod, as there will be None mqtt client let Some(client) = client else { - return Err(ScyllaError::NotProd( - "Car config sending is disabled in mock mode!".to_string(), - )); + warn!("Cannot use config endpoint in dev mode!"); + return Ok(()) }; // the protobuf calypso converts into CAN diff --git a/scylla-server-rust/src/error.rs b/scylla-server-rust/src/error.rs index 8d660ccd..8b7563ad 100644 --- a/scylla-server-rust/src/error.rs +++ b/scylla-server-rust/src/error.rs @@ -11,8 +11,6 @@ use tracing::warn; pub enum ScyllaError { /// Any prisma query which errors out PrismaError(QueryError), - /// Not available in mock mode, which the system is in - NotProd(String), /// An instruction was not encodable InvalidEncoding(String), /// Could not communicate to car @@ -43,7 +41,6 @@ impl IntoResponse for ScyllaError { StatusCode::BAD_REQUEST, format!("Misc query error: {}", error), ), - ScyllaError::NotProd(reason) => (StatusCode::SERVICE_UNAVAILABLE, reason), ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason), ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason), ScyllaError::EmptyResult => (