From be90143992ab98471ab01c995d81525d36e67819 Mon Sep 17 00:00:00 2001 From: Nereuxofficial <37740907+Nereuxofficial@users.noreply.github.com> Date: Thu, 14 Sep 2023 15:51:21 +0200 Subject: [PATCH] fix: Fixed Build, updated dependencies --- Cargo.toml | 4 +-- lib/src/lib.rs | 4 +-- lib/src/mqtt/mod.rs | 11 ++++++-- lib/src/process_monitor/mod.rs | 2 -- src/main.rs | 46 ++++++++++++++++++---------------- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b72b0f4..6e065d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,10 +26,10 @@ mqtt-protocol = "0.11.2" rand = "0.8.5" rand_xoshiro = "0.6.0" # Command line interface -clap = { version = "4.3.24", features = ["derive"] } +clap = { version = "4.4.3", features = ["derive"] } # For serialization serde = { version = "1.0.186", features = ["derive"] } -toml = "0.7.6" +toml = "0.8.0" # For serialization of raw bytes serde_with = {version="3.1.0", features = ["hex"]} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index a248bcb..a2fa652 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -21,7 +21,7 @@ use crate::markov::MAX_PACKETS; use crate::packets::PacketQueue; use clap::{Parser, Subcommand}; -use rand::{thread_rng, Rng}; +use rand::Rng; use serde::{Deserialize, Serialize}; use std::str::FromStr; @@ -32,10 +32,8 @@ mod packet_pool; pub mod packets; pub mod process_monitor; pub mod runtime; -// TODO: Clean up main // TODO: Try fuzzing a basic mongoose server? // TODO: Fuzz mosquitto compiled with sanitizers -// TODO: Lib-split to allow benchmarking /// Struct to serialize threads once they are done(aka the broker has crashed). #[derive(Serialize, Deserialize, Debug)] diff --git a/lib/src/mqtt/mod.rs b/lib/src/mqtt/mod.rs index 5a1e0ae..a070637 100644 --- a/lib/src/mqtt/mod.rs +++ b/lib/src/mqtt/mod.rs @@ -1,4 +1,5 @@ use crate::markov::ByteStream; +use crate::network::connect_to_broker; use crate::packets::{PacketQueue, Packets}; use std::sync::Arc; use std::time::Duration; @@ -37,13 +38,19 @@ pub(crate) fn generate_pingreq_packet() -> [u8; 2] { [192, 0] } -pub async fn test_connection(stream: &mut impl ByteStream) -> color_eyre::Result<()> { +pub async fn test_conn_from_address(address: &str) -> color_eyre::Result<()> { + let mut stream = connect_to_broker(address).await?; + test_connection(&mut stream).await?; + Ok(()) +} + +async fn test_connection(stream: &mut impl ByteStream) -> color_eyre::Result<()> { stream .write_all(generate_connect_packet().as_slice()) .await?; let mut buf = [0; 1024]; let _ = timeout(Duration::from_secs(1), stream.read(&mut buf)).await; - debug!("Received Packet hex encoded: {:?}", hex::encode(buf)); + debug!("Connection established"); Ok(()) } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] diff --git a/lib/src/process_monitor/mod.rs b/lib/src/process_monitor/mod.rs index d976210..d776d8f 100644 --- a/lib/src/process_monitor/mod.rs +++ b/lib/src/process_monitor/mod.rs @@ -7,10 +7,8 @@ use tokio::sync::oneshot::channel; use tokio::time::{sleep, timeout}; use tracing::{debug, info}; -// TODO: How do the tasks ask if the server has exited? And better yet, how do they get the message back? // TODO: Also, how do the tasks know when it has caused new stdout/stderr output? // TODO: Allow the user to specify where to write the stdout/stderr of the monitored process. Maybe gzip compress it? -// TODO: Ask threads what their last packets were and dump it. /// Start the broker process and monitor it. If it crashes, we stop our execution. pub async fn start_supervised_process( sender: Sender<()>, diff --git a/src/main.rs b/src/main.rs index edf8008..35c1bc5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use clap::{Parser, Subcommand}; use futures::future::join_all; -use lib::mqtt::test_connection; -use lib::network::connect_to_broker; +use lib::mqtt::test_conn_from_address; use lib::packets::PacketQueue; use lib::process_monitor::start_supervised_process; use lib::runtime::{iterations_tracker, run_thread}; @@ -60,8 +59,7 @@ async fn main() -> color_eyre::Result<()> { } start_supervised_process(sender, cli.broker_command).await?; let address = cli.target.clone(); - let mut stream = connect_to_broker(&cli.target).await?; - test_connection(&mut stream).await?; + test_conn_from_address(&address).await?; info!("Connection established, starting fuzzing!"); let mut rng = thread_rng(); let _ = fs::create_dir("./threads").await; @@ -95,20 +93,7 @@ async fn main() -> color_eyre::Result<()> { let mut files = fs::read_dir("./threads") .await .expect("Failed to find threads folder. Cannot Replay"); - let mut filtered_files = vec![]; - trace!( - "Found files: {:?} in folder {:?}", - files, - std::env::current_dir() - ); - - while let Some(entry) = files.next_entry().await? { - let path = entry.path(); - let path_str = path.to_string_lossy().to_string(); - if path_str.starts_with("./threads/fuzzing_") && path_str.ends_with(".txt") { - filtered_files.push(path_str); - } - } + let mut filtered_files = collect_threads().await; trace!("Found {} files", filtered_files.len()); let (sender, receiver) = tokio::sync::broadcast::channel::<()>(1); let mut subscribers = vec![]; @@ -116,9 +101,7 @@ async fn main() -> color_eyre::Result<()> { subscribers.push(sender.subscribe()); } start_supervised_process(sender, cli.broker_command).await?; - let mut stream = connect_to_broker(&cli.target).await?; - test_connection(&mut stream).await?; - debug!("Connection established"); + test_conn_from_address(&cli.target).await?; debug!("Starting replay with {} seeds", filtered_files.len()); let mut threads = vec![]; let unused_it_channel = mpsc_channel::(filtered_files.len()).0; @@ -153,3 +136,24 @@ async fn main() -> color_eyre::Result<()> { } Ok(()) } + +async fn collect_threads() -> Vec { + let mut files = fs::read_dir("./threads") + .await + .expect("Failed to find threads folder. Cannot Replay"); + let mut filtered_files = vec![]; + trace!( + "Found files: {:?} in folder {:?}", + files, + std::env::current_dir() + ); + + while let Ok(Some(entry)) = files.next_entry().await { + let path = entry.path(); + let path_str = path.to_string_lossy().to_string(); + if path_str.starts_with("./threads/fuzzing_") && path_str.ends_with(".txt") { + filtered_files.push(path_str); + } + } + filtered_files +}