From bf6852261ba98e3c1d254b756a9e286e4607a8c4 Mon Sep 17 00:00:00 2001 From: Nereuxofficial <37740907+Nereuxofficial@users.noreply.github.com> Date: Thu, 7 Sep 2023 12:26:11 +0200 Subject: [PATCH] Added Benchmarking, bin-lib split and refactorings (#1) --- .github/workflows/bench.yml | 64 ++++++++++ .github/workflows/ci.yml | 10 +- .gitignore | 2 + Cargo.toml | 32 +++-- Readme.md | 2 +- chain.yaml | 35 ------ lib/.gitignore | 6 + lib/Cargo.toml | 60 ++++++++++ lib/benches/markov_models.rs | 37 ++++++ lib/src/lib.rs | 58 ++++++++++ {src => lib/src}/markov/mod.rs | 41 ++++--- {src => lib/src}/markov/mutations.rs | 4 +- {src => lib/src}/mqtt/mod.rs | 28 ++--- lib/src/network.rs | 109 +++++++++++++++++ {src => lib/src}/packet_pool.rs | 5 +- lib/src/packets.rs | 66 +++++++++++ {src => lib/src}/process_monitor/mod.rs | 18 ++- {src => lib/src}/runtime/mod.rs | 35 +++--- src/main.rs | 148 +++--------------------- 19 files changed, 528 insertions(+), 232 deletions(-) create mode 100644 .github/workflows/bench.yml delete mode 100644 chain.yaml create mode 100644 lib/.gitignore create mode 100644 lib/Cargo.toml create mode 100644 lib/benches/markov_models.rs create mode 100644 lib/src/lib.rs rename {src => lib/src}/markov/mod.rs (88%) rename {src => lib/src}/markov/mutations.rs (97%) rename {src => lib/src}/mqtt/mod.rs (89%) create mode 100644 lib/src/network.rs rename {src => lib/src}/packet_pool.rs (97%) create mode 100644 lib/src/packets.rs rename {src => lib/src}/process_monitor/mod.rs (79%) rename {src => lib/src}/runtime/mod.rs (80%) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 0000000..480bd1b --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,64 @@ +name: Benchmarks +## Adapted from this: https://github.com/infinyon/fluvio/blob/master/.github/workflows/benchmarks.yml +concurrency: + group: benchmark-${{ github.ref }} + cancel-in-progress: true + +on: + pull_request: + branches: + - main + - dev + push: + branches: + - main + - dev + +permissions: + contents: write + +jobs: + markov_model: + name: Markov Model Benchmarks + runs-on: ubuntu-latest + steps: + - name: Checkout Code + uses: actions/checkout@v4 + + - name: Get Rust toolchain + uses: dtolnay/rust-toolchain@v1 + with: + toolchain: nightly + profile: minimal + + - name: Cache Rust Cargo files + uses: Swatinem/rust-cache@v2 + with: + # Additional non workspace directories, separated by newlines + key: benches-${{ runner.os }}-unit_markov_model-rust + + - name: Cache Benchmark data + uses: actions/cache@v3 + if: github.ref == 'refs/heads/main' + with: + path: ./benches_cache + key: benches-${{ runner.os }}-unit_markov_model-rust + + - name: Run Benchmarks + run: cd lib; cargo bench -- --output-format bencher | tee markov_model_bench.txt + + - name: Store benchmark result + uses: benchmark-action/github-action-benchmark@v1 + if: github.ref == 'refs/heads/main' + with: + # What benchmark tool the output.txt came from + tool: 'cargo' + # Where the output from the benchmark tool is stored + output-file-path: markov_model_bench.txt + # GitHub API token to make a commit comment + github-token: ${{ secrets.GITHUB_TOKEN }} + # Leave a job summary with benchmark result comparison + summary-always: true + # Where the previous data file is stored + external-data-json-path: ./benches_cache/benchmark-data.json + alert-comment-cc-users: '@Nereuxofficial' diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4d93876..41b6a6b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: env: CARGO_TERM_COLOR: always - +# TODO: Add benchmarking in CI(see https://github.com/benchmark-action/github-action-benchmark/pull/138) jobs: build: strategy: @@ -41,7 +41,7 @@ jobs: run: cargo test -- --test-threads=1 - name: Run cargo doc if: ${{ runner.os == 'Linux' }} - run: cargo doc --no-deps --document-private-items --all-features + run: cargo doc --no-deps --document-private-items - name: Run build --release --all-targets run: cargo build --release --all-targets @@ -62,7 +62,7 @@ jobs: components: rustfmt - name: Run cargo fmt --all -- --check - run: cargo fmt --all -- --check + run: cd lib && cargo fmt --all -- --check clippy: runs-on: ubuntu-20.04 @@ -79,5 +79,5 @@ jobs: override: true components: clippy - - name: Run cargo clippy --package rusty-FUME --all-targets - run: cargo clippy --package rusty-fume --all-targets + - name: Run cargo clippy --package lib --all-targets + run: cargo clippy --package lib --all-targets diff --git a/.gitignore b/.gitignore index 991239a..889353d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ Cargo.lock .vscode *.log .env +crashes +threads \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index fedd9f4..b72b0f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,30 +1,48 @@ [package] name = "rusty-fume" -version = "0.1.0" +version = "0.8.0" authors = ["Nereuxofficial <37740907+Nereuxofficial@users.noreply.github.com>"] license = "GPLv3" readme = "README.md" edition = "2021" +[workspace] + [dependencies] +lib = {path = "lib"} +# Convenient error handling color-eyre = "0.6.2" +# Logging tracing = "0.1.37" tracing-subscriber = "0.3.17" -dotenvy = "0.15.7" +# Futures tokio = { version = "1.32.0", features = ["full"] } -markov = "1.1.0" -rand = "0.8.5" +futures = "0.3.28" +# Hex en/decoding hex = "0.4.3" +# MQTT Packet generation mqtt-protocol = "0.11.2" +# Random number generation with xoshiro for faster PRNG +rand = "0.8.5" rand_xoshiro = "0.6.0" +# Command line interface clap = { version = "4.3.24", features = ["derive"] } +# For serialization serde = { version = "1.0.186", features = ["derive"] } toml = "0.7.6" -futures = "0.3.28" -tokio-uring = "0.4.0" -console-subscriber = "0.1.10" +# For serialization of raw bytes serde_with = {version="3.1.0", features = ["hex"]} +# Tokio Console Support +console-subscriber = "0.1.10" [profile.release] debug = true codegen-units = 1 + +[features] +default = ["tcp"] +tcp = [] +# TODO: Add quic, ws support +quic = [] +websocket = [] +tls = ["lib/tls"] diff --git a/Readme.md b/Readme.md index 2b8e882..fc788fe 100644 --- a/Readme.md +++ b/Readme.md @@ -36,7 +36,7 @@ Pull requests are welcome. For major changes, please open an issue first to disc Currently, the Windows build is failing in the ci, however i've only tested this on Linux so far. Maybe it works on Windows, maybe it doesn't. I don't know. Pull Requests to fix this if necessary are welcome. ## Trophies -All bugs found with this software. If you find a bug using rusty-FUME, please open an issue and i'll add it to the list. +All bugs found with this software. If you find a bug using rusty-FUME, please open an issue and I'll add it to the list once it is patched. - [FlashMQ Null pointer dereference](https://github.com/halfgaar/FlashMQ/commit/eb3acf88771af3eeddf086e4c9dc51d703456eee) diff --git a/chain.yaml b/chain.yaml deleted file mode 100644 index 946678a..0000000 --- a/chain.yaml +++ /dev/null @@ -1,35 +0,0 @@ ---- -map: - ? - ~ - : I: 2 - ? - like - : "them,": 1 - green: 1 - ? - I - : do: 2 - am.: 2 - am: 1 - ? - am. - : I: 1 - ~: 1 - ? - do - : not: 2 - ? - "them," - : Sam: 1 - ? - not - : like: 2 - ? - and - : ham.: 1 - ? - green - : eggs: 1 - ? - eggs - : and: 1 - ? - ham. - : ~: 1 - ? - Sam - : I: 2 - ? - Sam. - : Sam: 1 - ? - am - : Sam.: 1 -order: 1 diff --git a/lib/.gitignore b/lib/.gitignore new file mode 100644 index 0000000..991239a --- /dev/null +++ b/lib/.gitignore @@ -0,0 +1,6 @@ +/target +Cargo.lock +.idea +.vscode +*.log +.env diff --git a/lib/Cargo.toml b/lib/Cargo.toml new file mode 100644 index 0000000..6448e5f --- /dev/null +++ b/lib/Cargo.toml @@ -0,0 +1,60 @@ +[package] +name = "lib" +version = "0.8.0" +edition = "2021" + +[lib] +bench=false + +[dependencies] +# Convenient error handling +color-eyre = "0.6.2" +# Logging +tracing = "0.1.37" +tracing-subscriber = "0.3.17" +# Futures +tokio = { version = "1.32.0", features = ["full"] } +futures = "0.3.28" +# Hex en/decoding +hex = "0.4.3" +# MQTT Packet generation +mqtt-protocol = "0.11.2" +# Random number generation with xoshiro for faster PRNG +rand = "0.8.5" +rand_xoshiro = "0.6.0" +# Command line interface +clap = { version = "4.3.24", features = ["derive"] } +# For serialization +serde = { version = "1.0.186", features = ["derive"] } +toml = "0.7.6" +# For serialization of raw bytes +serde_with = {version="3.1.0", features = ["hex"]} + +# Tokio Console Support +console-subscriber = "0.1.10" +# For Websocket support +tokio-tungstenite = "0.20.0" +# For TLS support +tokio-rustls = { version="0.24.1", features = ["dangerous_configuration"], optional = true } +rustls = { version="0.21.6", features = ["dangerous_configuration"], optional = true } + + + +[dev-dependencies] +criterion = { version = "0.5.1", features=["html_reports", "async_tokio"]} + +[[bench]] +name = "markov_models" +harness = false + +[profile.release] +debug = true +codegen-units = 1 + +[features] +default = ["tcp"] +tcp = [] +# TODO: Add quic, ws support +quic = [] +websocket = [] +tls = ["dep:tokio-rustls", "dep:rustls"] diff --git a/lib/benches/markov_models.rs b/lib/benches/markov_models.rs new file mode 100644 index 0000000..82a7440 --- /dev/null +++ b/lib/benches/markov_models.rs @@ -0,0 +1,37 @@ +use criterion::BenchmarkId; +use criterion::Criterion; +use criterion::{criterion_group, criterion_main}; +use rand::SeedableRng; +use rand_xoshiro::Xoshiro256PlusPlus; +use std::sync::Arc; +use tokio::io::duplex; +// This is a struct that tells Criterion.rs to use the "futures" crate's current-thread executor +use lib::markov::{Mode, StateMachine}; +use lib::packets::PacketQueue; +use tokio::runtime::Runtime; +use tokio::sync::RwLock; + +async fn exec_markov_fast(m: Mode) { + let stream = duplex(10 * 1024).0; + let mut machine = StateMachine::new(stream, 100); + let mut rng = Xoshiro256PlusPlus::from_seed([5; 32]); + machine + .execute(m, &mut rng, &Arc::new(RwLock::new(PacketQueue::default()))) + .await; +} + +fn markov_model_execution(c: &mut Criterion) { + for mode in [Mode::MutationGuided, Mode::GenerationGuided] { + c.bench_with_input( + BenchmarkId::new("execute_markov_model", mode), + &mode, + |b, &m| { + b.to_async(Runtime::new().unwrap()) + .iter(|| exec_markov_fast(m)); + }, + ); + } +} + +criterion_group!(benches, markov_model_execution); +criterion_main!(benches); diff --git a/lib/src/lib.rs b/lib/src/lib.rs new file mode 100644 index 0000000..a248bcb --- /dev/null +++ b/lib/src/lib.rs @@ -0,0 +1,58 @@ +//! # rusty-FUME +//! rusty-FUME is a fuzzer for the MQTT protocol. It is based on [FUME-Fuzzing-MQTT Brokers](https://github.com/PBearson/FUME-Fuzzing-MQTT-Brokers) +//! and uses markov chains to generate new packet chains. If it discovers a new response behaviour the chain is added to the fuzzing queue. +//! We use [tokio](https://tokio.rs/) for async networking. +//! ## The state machine +//! We implement a State machine with a markov chain. All probabilities are configurable for this process(except the ones with only one option). +//! The state machine is defined as follows for the Mutation Guided Fuzzing: +//! - S0: Initial State: Either goto CONNECT state or select a packet from the queue and go to MUTATION state +//! - CONNECT: Add connect to the current chain and go to ADDING State +//! - ADDING: Either add a new packet(configurable probability for each one) to the chain or go to MUTATION state +//! - MUTATION: Mutate, delete, inject or SEND the current chain +//! - SEND: Send the current chain and either go to Sf or MUTATION state +//! - Sf: Final State +//! And this way for Generation Guided Fuzzing: +//! - S0: Initial State: Goto ADD(CONNECT) state +//! - CONNECT: Add connect to the current chain and go to S1 +//! - S1: Either add a new packet or go to S2 +//! - S2: Inject/Delete/Mutate the current chain or go to SEND +//! - SEND: Send the current chain and either go to Sf or S2 +//! Once they get to S2 they behave the same way. +use crate::markov::MAX_PACKETS; +use crate::packets::PacketQueue; +use clap::{Parser, Subcommand}; +use rand::{thread_rng, Rng}; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; + +pub mod markov; +pub mod mqtt; +pub mod network; +mod packet_pool; +pub mod packets; +pub mod process_monitor; +pub mod runtime; +// TODO: Clean up main +// TODO: Try fuzzing a basic mongoose server? +// TODO: Fuzz mosquitto compiled with sanitizers +// TODO: Lib-split to allow benchmarking + +/// Struct to serialize threads once they are done(aka the broker has crashed). +#[derive(Serialize, Deserialize, Debug)] +pub struct SeedAndIterations { + pub seed: String, + pub iterations: String, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::packets::Packets; + #[test] + fn test_serialize_packet_queue() { + let mut packet_queue = PacketQueue::default(); + packet_queue.inner.insert(vec![0x10], Packets::default()); + let serialized = toml::to_string(&packet_queue).unwrap(); + println!("{}", serialized); + } +} diff --git a/src/markov/mod.rs b/lib/src/markov/mod.rs similarity index 88% rename from src/markov/mod.rs rename to lib/src/markov/mod.rs index 41db56c..6b8b150 100644 --- a/src/markov/mod.rs +++ b/lib/src/markov/mod.rs @@ -23,13 +23,13 @@ use crate::mqtt::{ generate_publish_packet, generate_subscribe_packet, generate_unsubscribe_packet, send_packets, SendError, }; -use crate::{PacketQueue, Packets}; +use crate::packets::{PacketQueue, Packets}; use rand::distributions::Standard; use rand::prelude::Distribution; use rand::Rng; use rand_xoshiro::Xoshiro256PlusPlus; use std::default::Default; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::RwLock; @@ -75,9 +75,9 @@ impl Distribution for Standard { } } -pub trait ByteStream: AsyncReadExt + AsyncWriteExt + Unpin {} +pub trait ByteStream: AsyncReadExt + AsyncWriteExt + Unpin + Debug + Send {} -impl ByteStream for T where T: AsyncReadExt + AsyncWriteExt + Unpin {} +impl ByteStream for T where T: AsyncReadExt + AsyncWriteExt + Unpin + Debug + Send {} pub struct StateMachine where B: ByteStream, @@ -90,6 +90,7 @@ where pub previous_packets: Vec, // The current stream, TlsStream TcpStream or WebsocketStream stream: B, + timeout: u16, } #[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] pub enum Mode { @@ -97,6 +98,15 @@ pub enum Mode { GenerationGuided, } +impl Display for Mode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MutationGuided => write!(f, "Mutation Guided"), + GenerationGuided => write!(f, "Generation Guided"), + } + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Mode { match rng.gen_range(0..10) { @@ -110,15 +120,16 @@ impl StateMachine where B: ByteStream, { - pub(crate) fn new(stream: B) -> Self { + pub fn new(stream: B, timeout: u16) -> Self { Self { stream, state: Default::default(), packets: Packets::new(), previous_packets: Vec::new(), + timeout, } } - pub(crate) async fn execute( + pub async fn execute( &mut self, mode: Mode, rng: &mut Xoshiro256PlusPlus, @@ -164,25 +175,22 @@ where State::ADD(packet_type) => { match packet_type { PacketType::CONNECT => { - self.packets.append(&mut generate_connect_packet().to_vec()); + self.packets.append(&generate_connect_packet()); } PacketType::PUBLISH => { - self.packets.append(&mut generate_publish_packet().to_vec()); + self.packets.append(&generate_publish_packet()); } PacketType::SUBSCRIBE => { - self.packets - .append(&mut generate_subscribe_packet().to_vec()); + self.packets.append(&generate_subscribe_packet()); } PacketType::UNSUBSCRIBE => { - self.packets - .append(&mut generate_unsubscribe_packet().to_vec()); + self.packets.append(&generate_unsubscribe_packet()); } PacketType::PINGREQ => { - self.packets.append(&mut generate_pingreq_packet().to_vec()); + self.packets.append(&generate_pingreq_packet()); } PacketType::DISCONNECT => { - self.packets - .append(&mut generate_disconnect_packet().to_vec()); + self.packets.append(&generate_disconnect_packet()); } _ => unreachable!(), } @@ -218,7 +226,8 @@ where } State::SEND => { self.previous_packets.push(self.packets.clone()); - let res = send_packets(&mut self.stream, &self.packets, packet_queue).await; + let res = + send_packets(&mut self.stream, &self.packets, packet_queue, self.timeout).await; if let Err(e) = res { match e { SendError::Timeout => {} diff --git a/src/markov/mutations.rs b/lib/src/markov/mutations.rs similarity index 97% rename from src/markov/mutations.rs rename to lib/src/markov/mutations.rs index 58e9b49..5dd6df6 100644 --- a/src/markov/mutations.rs +++ b/lib/src/markov/mutations.rs @@ -1,4 +1,4 @@ -use crate::Packets; +use crate::packets::Packets; use rand::distributions::Standard; use rand::prelude::Distribution; use rand::Rng; @@ -36,7 +36,7 @@ impl Distribution for Standard { fn inject_bof(packet: &mut Vec, rng: &mut Xoshiro256PlusPlus) { let idx = rng.gen_range(0..packet.len()); // To fight big packets - let byte_length = 10000 / packet.len(); + let byte_length = 350 / packet.len(); let mut bytes = vec![0; byte_length]; rng.fill(&mut bytes[..]); packet.splice(idx..idx, bytes); diff --git a/src/mqtt/mod.rs b/lib/src/mqtt/mod.rs similarity index 89% rename from src/mqtt/mod.rs rename to lib/src/mqtt/mod.rs index 4265b7a..5a1e0ae 100644 --- a/src/mqtt/mod.rs +++ b/lib/src/mqtt/mod.rs @@ -1,15 +1,11 @@ use crate::markov::ByteStream; -use crate::{PacketQueue, Packets}; +use crate::packets::{PacketQueue, Packets}; use std::sync::Arc; use std::time::Duration; -use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use tokio::time::timeout; use tracing::{debug, info, trace}; -// TODO: Maybe we can begin the packet queue with some more interesting packets that triggered bugs in the past from CVEs -pub(crate) fn generate_auth_packet() -> Vec { - unimplemented!("Auth packet not implemented yet. Switch to MQTT V5") -} + pub(crate) fn generate_connect_packet() -> [u8; 62] { [ 16, 60, 0, 4, 77, 81, 84, 84, 4, 4, 0, 0, 0, 17, 72, 101, 108, 108, 111, 32, 77, 81, 84, @@ -41,13 +37,13 @@ pub(crate) fn generate_pingreq_packet() -> [u8; 2] { [192, 0] } -pub(crate) async fn test_connection(stream: &mut impl ByteStream) -> color_eyre::Result<()> { +pub async fn test_connection(stream: &mut impl ByteStream) -> color_eyre::Result<()> { stream .write_all(generate_connect_packet().as_slice()) .await?; let mut buf = [0; 1024]; let _ = timeout(Duration::from_secs(1), stream.read(&mut buf)).await; - debug!("Received Packet hex encoded: {:?}", hex::encode(&buf)); + debug!("Received Packet hex encoded: {:?}", hex::encode(buf)); Ok(()) } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -64,23 +60,23 @@ pub(crate) async fn send_packets( stream: &mut impl ByteStream, packets: &Packets, packet_queue: &Arc>, + timeout: u16, ) -> Result<(), SendError> { for packet in packets.inner.iter().filter(|p| !p.is_empty()) { - send_packet(stream, packet.as_slice(), packets, packet_queue).await?; + send_packet(stream, packet.as_slice(), packets, packet_queue, timeout).await?; } Ok(()) } -const PACKET_TIMEOUT: u64 = 30; - pub(crate) async fn send_packet( stream: &mut impl ByteStream, packet: &[u8], packets: &Packets, packet_queue: &Arc>, + timeout_ms: u16, ) -> Result<(), SendError> { let write_result = timeout( - Duration::from_millis(PACKET_TIMEOUT), + Duration::from_millis(timeout_ms as u64), stream.write_all(packet), ) .await; @@ -95,7 +91,11 @@ pub(crate) async fn send_packet( } } let mut buf = [0; 1024]; - let res = timeout(Duration::from_millis(PACKET_TIMEOUT), stream.read(&mut buf)).await; + let res = timeout( + Duration::from_millis(timeout_ms as u64), + stream.read(&mut buf), + ) + .await; match res { Ok(Ok(p)) => { known_packet(&buf[..p], packets, packet_queue).await; @@ -120,7 +120,7 @@ async fn known_packet( packet_queue: &Arc>, ) -> bool { // TODO: decode the packet and extract user id, payload, topic etc. because those don't matter to see if it is a known packet - let mut queue_lock = packet_queue.read().await; + let queue_lock = packet_queue.read().await; let response_packet = response_packet.to_vec(); if !queue_lock.inner.contains_key(&response_packet) { info!("New behavior discovered, adding it to the queue",); diff --git a/lib/src/network.rs b/lib/src/network.rs new file mode 100644 index 0000000..d1e6e0b --- /dev/null +++ b/lib/src/network.rs @@ -0,0 +1,109 @@ +use color_eyre::Result; +// Avoid double protocols +// We have a lot of features which are mutually exclusive, so we need compile time errors if someone tries to use them together. +// These are the features: tcp, websocket, tls, quic +#[cfg(all( + feature = "tcp", + any(feature = "websocket", feature = "tls", feature = "quic") +))] +compile_error!( + "You can only use one protocol at a time. Please disable tcp with --no-default-features" +); +#[cfg(all(feature = "websocket", any(feature = "tls", feature = "quic")))] +compile_error!( + "You can only use one protocol at a time. Please disable websocket with --no-default-features" +); +#[cfg(all(feature = "tls", feature = "quic"))] +compile_error!( + "You can only use one protocol at a time. Please disable tls with --no-default-features" +); + +#[cfg(feature = "tcp")] +pub use tcp::*; +#[cfg(feature = "tcp")] +mod tcp { + use super::*; + use tokio::net::{TcpStream, ToSocketAddrs}; + + pub async fn connect_to_broker(address: impl ToSocketAddrs) -> Result { + let tcpstream = TcpStream::connect(address).await?; + Ok(tcpstream) + } +} + +#[cfg(feature = "websocket")] +pub use websocket::*; + +#[cfg(feature = "websocket")] +mod websocket { + compile_error!("Websocket is not implemented yet"); + /* + use super::*; + use tokio::net::TcpStream; + use tokio_tungstenite::tungstenite::client::IntoClientRequest; + use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; + + pub async fn connect_to_broker( + address: impl IntoClientRequest + Unpin, + ) -> Result>> { + let (ws_stream, _) = tokio_tungstenite::connect_async(address).await?; + Ok(ws_stream) + } + */ +} + +#[cfg(feature = "tls")] +pub use tls::*; + +#[cfg(feature = "tls")] +mod tls { + use super::*; + use crate::markov::ByteStream; + use futures::FutureExt; + use rustls::client::{ServerCertVerified, ServerCertVerifier}; + use rustls::{Certificate, ClientConfig, ClientConnection, Error, ServerName}; + use std::net::IpAddr; + use std::str::FromStr; + use std::sync::Arc; + use std::time::SystemTime; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::{TcpStream, ToSocketAddrs}; + use tokio_rustls::client::TlsStream; + use tokio_rustls::TlsConnector; + + struct NoCertificateVerifier; + + impl ServerCertVerifier for NoCertificateVerifier { + fn verify_server_cert( + &self, + end_entity: &Certificate, + intermediates: &[Certificate], + server_name: &ServerName, + scts: &mut dyn Iterator, + ocsp_response: &[u8], + now: SystemTime, + ) -> std::result::Result { + Ok(ServerCertVerified::assertion()) + } + } + /// Connects to the broker using TLS ignoring the server certificate since that would just be + /// wasted iterations + pub async fn connect_to_broker(address: &str) -> Result> { + let mut socket = TcpStream::connect(address).await?; + let mut config = ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(Arc::new(NoCertificateVerifier)) + .with_no_client_auth(); + let connector = TlsConnector::from(Arc::new(config)); + let stream = connector + .connect( + // Trim everything including the port beginning ':' + ServerName::IpAddress( + IpAddr::from_str(address.split(':').next().unwrap()).unwrap(), + ), + socket, + ) + .await?; + Ok(stream) + } +} diff --git a/src/packet_pool.rs b/lib/src/packet_pool.rs similarity index 97% rename from src/packet_pool.rs rename to lib/src/packet_pool.rs index 37137ca..5b73b2f 100644 --- a/src/packet_pool.rs +++ b/lib/src/packet_pool.rs @@ -1,8 +1,8 @@ //! Here are packets from previously discovered CVEs use crate::MAX_PACKETS; - /// https://www.cvedetails.com/cve/CVE-2021-34432/ +#[allow(unused)] const CVE_2021_34432: &[&[u8]; MAX_PACKETS] = &[ &[ 16, 60, 0, 4, 77, 81, 84, 84, 4, 4, 0, 0, 0, 17, 72, 101, 108, 108, 111, 32, 77, 81, 84, @@ -19,6 +19,7 @@ const CVE_2021_34432: &[&[u8]; MAX_PACKETS] = &[ &[], &[], ]; +#[allow(unused)] const OTHER_MOSQUITTO_CVE: &[&[u8]; MAX_PACKETS] = &[ &[ 16, 96, 0, 4, 77, 81, 84, 84, 5, 192, 93, 85, 34, 21, 0, 15, 98, 99, 82, 85, 100, 109, 83, @@ -43,7 +44,7 @@ const OTHER_MOSQUITTO_CVE: &[&[u8]; MAX_PACKETS] = &[ ]; mod tests { use super::*; - use crate::{PacketQueue, Packets}; + use crate::packets::{PacketQueue, Packets}; use std::fs::write; #[test] diff --git a/lib/src/packets.rs b/lib/src/packets.rs new file mode 100644 index 0000000..adc39e0 --- /dev/null +++ b/lib/src/packets.rs @@ -0,0 +1,66 @@ +use crate::markov::MAX_PACKETS; +use serde::{Deserialize, Serialize}; +use serde_with::formats::CommaSeparator; +use serde_with::serde_as; +use serde_with::StringWithSeparator; +use std::cmp::min; +use std::collections::BTreeMap; +use std::fmt::Display; +use std::path::Path; +use tokio::fs::File; +use tokio::io::AsyncReadExt; +#[serde_as] +#[derive(Debug, PartialEq, Eq, Clone, Hash, Default, Serialize, Deserialize)] +pub struct Packets { + #[serde_as(as = "[StringWithSeparator::; MAX_PACKETS]")] + pub(crate) inner: [Vec; MAX_PACKETS], +} +impl Display for Packets { + // Hex dump the packets + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut s = String::new(); + for i in 0..MAX_PACKETS { + if !self.inner[i].is_empty() { + s.push_str(hex::encode(&self.inner[i]).as_str()); + s.push('\n'); + } + } + write!(f, "{}", s) + } +} +impl Packets { + pub fn append(&mut self, packet: &[u8]) { + // Search the first free slot and insert it there + let size = self.size(); + if size < MAX_PACKETS { + self.inner[size] = packet.to_vec(); + } + } + pub fn is_full(&self) -> bool { + self.inner.iter().all(|x| !x.is_empty()) + } + pub fn size(&self) -> usize { + min(1, self.inner.iter().filter(|x| !x.is_empty()).count()) + } + pub fn new() -> Self { + Self { + inner: Default::default(), + } + } +} + +#[serde_as] +#[derive(Debug, PartialEq, Eq, Clone, Hash, Default, Serialize, Deserialize)] +pub struct PacketQueue { + #[serde_as(as = "BTreeMap, _>")] + pub(crate) inner: BTreeMap, Packets>, +} + +impl PacketQueue { + pub async fn read_from_file(path: impl AsRef) -> color_eyre::Result { + let mut content = String::new(); + File::open(path).await?.read_to_string(&mut content).await?; + let queue = toml::from_str(&content)?; + Ok(queue) + } +} diff --git a/src/process_monitor/mod.rs b/lib/src/process_monitor/mod.rs similarity index 79% rename from src/process_monitor/mod.rs rename to lib/src/process_monitor/mod.rs index a74447e..d976210 100644 --- a/src/process_monitor/mod.rs +++ b/lib/src/process_monitor/mod.rs @@ -1,7 +1,9 @@ use std::time::Duration; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; +use tokio::signal; use tokio::sync::broadcast::Sender; +use tokio::sync::oneshot::channel; use tokio::time::{sleep, timeout}; use tracing::{debug, info}; @@ -20,12 +22,20 @@ pub async fn start_supervised_process( .stderr(std::process::Stdio::piped()) .spawn() .expect("failed to execute process"); - assert!(child.id().is_some()); + debug_assert!(child.id().is_some()); debug!("Started broker process"); // No broker should take longer than 2 seconds to start. But we could make this configurable. - sleep(tokio::time::Duration::from_secs(2)).await; + sleep(tokio::time::Duration::from_secs(5)).await; + // Buffers for stdout and stderr let mut stdout_reader = BufReader::new(child.stdout.take().unwrap()).lines(); let mut stderr_reader = BufReader::new(child.stderr.take().unwrap()).lines(); + // For handling crtlc + let (tx, mut rx) = channel(); + tokio::spawn(async move { + signal::ctrl_c().await.unwrap(); + info!("Crtl C received, stopping..."); + tx.send(()).expect("Could not send to crtlc_receiver"); + }); tokio::spawn(async move { let mut last_stdout: String = String::new(); let mut last_stderr: String = String::new(); @@ -47,6 +57,10 @@ pub async fn start_supervised_process( info!("Stdout: {:?}", last_stdout); info!("Stderr: {:?}", last_stderr); break; + } else if rx.try_recv().is_ok() { + child.kill().await.unwrap(); + sender.send(()).unwrap(); + break; } } }); diff --git a/src/runtime/mod.rs b/lib/src/runtime/mod.rs similarity index 80% rename from src/runtime/mod.rs rename to lib/src/runtime/mod.rs index ce78e8d..6d33b92 100644 --- a/src/runtime/mod.rs +++ b/lib/src/runtime/mod.rs @@ -1,40 +1,40 @@ use crate::markov::StateMachine; -use crate::{PacketQueue, SeedAndIterations}; +use crate::network::connect_to_broker; +use crate::packets::PacketQueue; +use crate::SeedAndIterations; use rand::{Rng, SeedableRng}; use rand_xoshiro::Xoshiro256PlusPlus; use std::sync::Arc; use std::time::Duration; -use tokio::net::{TcpStream, ToSocketAddrs}; use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::Receiver as MpscReceiver; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use tokio::time::sleep; use tokio::{fs, task}; -use tracing::{debug, error, info}; +use tracing::*; -// TODO: Change address to allow other kinds of Streams /// Runs a task that connects to the broker and fuzzes it -pub(crate) async fn run_thread( +pub async fn run_thread( seed: u64, receiver_clone: Receiver<()>, - address: impl ToSocketAddrs + Clone + Send + Sync + 'static, + address: String, iterations: u64, packet_queue: Arc>, it_sender_clone: Sender, + timeout: u16, ) { let task_handle = task::spawn(async move { - let start_time = std::time::Instant::now(); let mut last_packets = Vec::new(); let mut counter: u64 = 0; let mut rng = Xoshiro256PlusPlus::seed_from_u64(seed); while counter < iterations { - let new_tcpstream = TcpStream::connect(address.clone()).await; - if new_tcpstream.is_err() { + let new_stream = connect_to_broker(&address.clone()).await; + if new_stream.is_err() { // Workaround for connections not being closed fast enough. See https://stackoverflow.com/questions/76238841/cant-assign-requested-address-in-request error!( "Error connecting to broker: {:?}. See recommendations", - new_tcpstream + new_stream ); if !receiver_clone.is_empty() { break; @@ -43,13 +43,12 @@ pub(crate) async fn run_thread( sleep(Duration::from_millis(100)).await; continue; } - let new_tcpstream = new_tcpstream.unwrap(); - let mut state_machine = StateMachine::new(new_tcpstream); + let new_tcpstream = new_stream.unwrap(); + let mut state_machine = StateMachine::new(new_tcpstream, timeout); let mode = rng.gen(); state_machine.execute(mode, &mut rng, &packet_queue).await; last_packets = state_machine.previous_packets.clone(); // We receive a message once the broker is stopped - // TODO: Also save last packets upon crash if !receiver_clone.is_empty() { break; } @@ -75,7 +74,7 @@ pub(crate) async fn run_thread( } // Dump the packet we crashed on let _ = fs::create_dir("crashes").await; - let res = fs::write( + let _ = fs::write( format!("crashes/crash_{}.txt", seed), format!("{:?}", last_packets), ) @@ -92,11 +91,15 @@ pub async fn iterations_tracker(threads: usize, mut it_receiver: MpscReceiver iteration_buffer[i] = v, + None => break, + } } let sum: u64 = iteration_buffer.iter().sum(); let elapsed = start.elapsed().as_millis(); - let it_per_second = (sum - last_iterations) as f64 / elapsed as f64 * 1000f64; + let it_per_second = (sum.saturating_sub(last_iterations)) as f64 / elapsed as f64 * 1000f64; info!("{} it/s", it_per_second); last_iterations = sum; } diff --git a/src/main.rs b/src/main.rs index 7fdc5c8..edf8008 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,117 +1,19 @@ -//! # rusty-FUME -//! rusty-FUME is a fuzzer for the MQTT protocol. It is based on [FUME-Fuzzing-MQTT Brokers](https://github.com/PBearson/FUME-Fuzzing-MQTT-Brokers) -//! and uses markov chains to generate new packet chains. If it discovers a new response behaviour the chain is added to the fuzzing queue. -//! We use [tokio](https://tokio.rs/) for async networking. -//! ## The state machine -//! We implement a State machine with a markov chain. All probabilities are configurable for this process(except the ones with only one option). -//! The state machine is defined as follows for the Mutation Guided Fuzzing: -//! - S0: Initial State: Either goto CONNECT state or select a packet from the queue and go to MUTATION state -//! - CONNECT: Add connect to the current chain and go to ADDING State -//! - ADDING: Either add a new packet(configurable probability for each one) to the chain or go to MUTATION state -//! - MUTATION: Mutate, delete, inject or SEND the current chain -//! - SEND: Send the current chain and either go to Sf or MUTATION state -//! - Sf: Final State -//! And this way for Generation Guided Fuzzing: -//! - S0: Initial State: Goto ADD(CONNECT) state -//! - CONNECT: Add connect to the current chain and go to S1 -//! - S1: Either add a new packet or go to S2 -//! - S2: Inject/Delete/Mutate the current chain or go to SEND -//! - SEND: Send the current chain and either go to Sf or S2 -//! Once they get to S2 they behave the same way. -use std::cmp::min; -use std::collections::BTreeMap; -use std::fmt::Display; -use std::path::Path; -use std::sync::Arc; -// TODO: Pick a mqtt packet generation/decoding library that is customizable for the purpose of this project and also supports v3,v4 and v5. -// FIXME: Fix ranges... -// TODO: crtl_c handling -// TODO: Try fuzzing a basic mongoose server? -// TODO: Fuzz mosquitto compiled with sanitizers -use crate::markov::MAX_PACKETS; -use crate::mqtt::test_connection; -use crate::process_monitor::start_supervised_process; -use crate::runtime::{iterations_tracker, run_thread}; use clap::{Parser, Subcommand}; use futures::future::join_all; +use lib::mqtt::test_connection; +use lib::network::connect_to_broker; +use lib::packets::PacketQueue; +use lib::process_monitor::start_supervised_process; +use lib::runtime::{iterations_tracker, run_thread}; +use lib::SeedAndIterations; use rand::{thread_rng, Rng}; -use serde::{Deserialize, Serialize}; -use serde_with::formats::CommaSeparator; -use serde_with::serde_as; -use serde_with::StringWithSeparator; use std::str::FromStr; -use tokio::fs::File; -use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; +use std::sync::Arc; use tokio::sync::mpsc::channel as mpsc_channel; use tokio::sync::RwLock; -use tokio::time::sleep; use tokio::{fs, task}; use tracing::{debug, info, trace}; -mod markov; -pub mod mqtt; -mod packet_pool; -mod process_monitor; -mod runtime; - -// TODO: All threads should also dump their last packets for fast replaying -#[serde_as] -#[derive(Debug, PartialEq, Eq, Clone, Hash, Default, Serialize, Deserialize)] -pub struct Packets { - #[serde_as(as = "[StringWithSeparator::; MAX_PACKETS]")] - inner: [Vec; MAX_PACKETS], -} -impl Display for Packets { - // Hex dump the packets - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut s = String::new(); - for i in 0..MAX_PACKETS { - if !self.inner[i].is_empty() { - s.push_str(hex::encode(&self.inner[i]).as_str()); - s.push('\n'); - } - } - write!(f, "{}", s) - } -} -impl Packets { - pub fn append(&mut self, packet: &mut Vec) { - // Search the first free slot and insert it there - let size = self.size(); - if size < MAX_PACKETS { - self.inner[size] = packet.clone(); - } - } - pub fn is_full(&self) -> bool { - self.inner.iter().all(|x| !x.is_empty()) - } - pub fn size(&self) -> usize { - min(1, self.inner.iter().filter(|x| !x.is_empty()).count()) - } - pub fn new() -> Self { - Self { - inner: Default::default(), - } - } -} - -#[serde_as] -#[derive(Debug, PartialEq, Eq, Clone, Hash, Default, Serialize, Deserialize)] -struct PacketQueue { - #[serde_as(as = "BTreeMap, _>")] - inner: BTreeMap, Packets>, -} - -impl PacketQueue { - async fn read_from_file(path: impl AsRef) -> color_eyre::Result { - let mut content = String::new(); - File::open(path).await?.read_to_string(&mut content).await?; - let queue = toml::from_str(&content)?; - Ok(queue) - } -} - #[derive(Parser, Debug)] #[command(author, version, about)] struct Cli { @@ -122,8 +24,8 @@ struct Cli { #[arg(short, long)] broker_command: String, // TODO: Make the timeout configurable - #[arg(short, long, default_value = "200")] - timeout: u64, + #[arg(long, default_value = "200")] + timeout: u16, } #[derive(Subcommand, Debug)] @@ -138,18 +40,10 @@ enum SubCommands { sequential: bool, }, } -/// Struct to serialize threads once they are done(aka the broker has crashed). -#[derive(Serialize, Deserialize, Debug)] -struct SeedAndIterations { - pub seed: String, - pub iterations: String, -} - #[tokio::main] async fn main() -> color_eyre::Result<()> { console_subscriber::init(); color_eyre::install()?; - dotenvy::dotenv().ok(); let cli = Cli::parse(); let packet_queue = Arc::new(RwLock::new( PacketQueue::read_from_file("./packet_pool.toml").await?, @@ -157,7 +51,7 @@ async fn main() -> color_eyre::Result<()> { match &cli.subcommand { SubCommands::Fuzz { threads } => { // The channel used for iteration counting - let (it_sender, mut it_receiver) = mpsc_channel::(*threads as usize); + let (it_sender, it_receiver) = mpsc_channel::(*threads as usize); // This receiver is necessary to dump the packets once the broker is stopped let (sender, _) = tokio::sync::broadcast::channel(1); let mut subscribers = vec![]; @@ -166,8 +60,8 @@ async fn main() -> color_eyre::Result<()> { } start_supervised_process(sender, cli.broker_command).await?; let address = cli.target.clone(); - let mut tcpstream = TcpStream::connect(&address).await?; - test_connection(&mut tcpstream).await?; + let mut stream = connect_to_broker(&cli.target).await?; + test_connection(&mut stream).await?; info!("Connection established, starting fuzzing!"); let mut rng = thread_rng(); let _ = fs::create_dir("./threads").await; @@ -183,6 +77,7 @@ async fn main() -> color_eyre::Result<()> { u64::MAX, packet_queue.clone(), it_sender_clone, + cli.timeout, )); } // Track it/s @@ -221,8 +116,8 @@ async fn main() -> color_eyre::Result<()> { subscribers.push(sender.subscribe()); } start_supervised_process(sender, cli.broker_command).await?; - let mut tcpstream = TcpStream::connect(&cli.target).await?; - test_connection(&mut tcpstream).await?; + let mut stream = connect_to_broker(&cli.target).await?; + test_connection(&mut stream).await?; debug!("Connection established"); debug!("Starting replay with {} seeds", filtered_files.len()); let mut threads = vec![]; @@ -238,6 +133,7 @@ async fn main() -> color_eyre::Result<()> { u64::from_str(&seed_and_iterations.iterations).unwrap(), packet_queue.clone(), unused_it_channel.clone(), + cli.timeout, )); } if *sequential { @@ -257,15 +153,3 @@ async fn main() -> color_eyre::Result<()> { } Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_serialize_packet_queue() { - let mut packet_queue = PacketQueue::default(); - packet_queue.inner.insert(vec![0x10], Packets::default()); - let serialized = toml::to_string(&packet_queue).unwrap(); - println!("{}", serialized); - } -}