Skip to content

Commit

Permalink
Add the test analyzer entirely via daemon flags
Browse files Browse the repository at this point in the history
Also consolidate the duplicate AnalysisWriter implementation
  • Loading branch information
wgreenberg committed Oct 8, 2024
1 parent 5056933 commit 3a97d24
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 84 deletions.
22 changes: 15 additions & 7 deletions bin/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio_util::task::TaskTracker;

use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
use crate::dummy_analyzer::TestAnalyzer;

pub struct AnalysisWriter {
writer: BufWriter<File>,
Expand All @@ -34,11 +35,16 @@ pub struct AnalysisWriter {
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
pub async fn new(file: File, enable_dummy_analyzer: bool) -> Result<Self, std::io::Error> {
let mut harness = Harness::new_with_all_analyzers();
if enable_dummy_analyzer {
harness.add_analyzer(Box::new(TestAnalyzer { count: 0 }));
}

let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
harness,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Expand All @@ -47,12 +53,12 @@ impl AnalysisWriter {

// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<usize, std::io::Error> {
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<(usize, bool), std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(self.bytes_written)
Ok((self.bytes_written, row.contains_warnings()))
}

async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -102,6 +108,7 @@ async fn clear_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) {
async fn perform_analysis(
name: &str,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
enable_dummy_analyzer: bool,
) -> Result<(), String> {
info!("Opening QMDL and analysis file for {}...", name);
let (analysis_file, qmdl_file, entry_index) = {
Expand All @@ -121,7 +128,7 @@ async fn perform_analysis(
(analysis_file, qmdl_file, entry_index)
};

let mut analysis_writer = AnalysisWriter::new(analysis_file)
let mut analysis_writer = AnalysisWriter::new(analysis_file, enable_dummy_analyzer)
.await
.map_err(|e| format!("{:?}", e))?;
let file_size = qmdl_file
Expand All @@ -140,7 +147,7 @@ async fn perform_analysis(
.await
.expect("failed getting QMDL container")
{
let size_bytes = analysis_writer
let (size_bytes, _) = analysis_writer
.analyze(container)
.await
.map_err(|e| format!("{:?}", e))?;
Expand All @@ -166,6 +173,7 @@ pub fn run_analysis_thread(
mut analysis_rx: Receiver<AnalysisCtrlMessage>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
enable_dummy_analyzer: bool,
) {
task_tracker.spawn(async move {
loop {
Expand All @@ -174,7 +182,7 @@ pub fn run_analysis_thread(
let count = queued_len(analysis_status_lock.clone()).await;
for _ in 0..count {
let name = dequeue_to_running(analysis_status_lock.clone()).await;
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone()).await {
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone(), enable_dummy_analyzer).await {
error!("failed to analyze {}: {}", name, err);
}
clear_running(analysis_status_lock.clone()).await;
Expand Down
8 changes: 8 additions & 0 deletions bin/src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use tokio::fs::{metadata, read_dir, File};
use clap::Parser;
use futures::TryStreamExt;

mod dummy_analyzer;

#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
Expand All @@ -12,6 +14,9 @@ struct Args {

#[arg(long)]
show_skipped: bool,

#[arg(long)]
enable_dummy_analyzer: bool,
}

async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool) {
Expand Down Expand Up @@ -55,6 +60,9 @@ async fn main() {
let args = Args::parse();

let mut harness = Harness::new_with_all_analyzers();
if args.enable_dummy_analyzer {
harness.add_analyzer(Box::new(dummy_analyzer::TestAnalyzer { count: 0 }));
}
println!("Analyzers:");
for analyzer in harness.get_metadata().analyzers {
println!(" - {}: {}", analyzer.name, analyzer.description);
Expand Down
12 changes: 8 additions & 4 deletions bin/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ struct ConfigFile {
port: Option<u16>,
debug_mode: Option<bool>,
ui_level: Option<u8>,
enable_dummy_analyzer: Option<bool>,
}

#[derive(Debug)]
Expand All @@ -16,6 +17,7 @@ pub struct Config {
pub port: u16,
pub debug_mode: bool,
pub ui_level: u8,
pub enable_dummy_analyzer: bool,
}

impl Default for Config {
Expand All @@ -25,6 +27,7 @@ impl Default for Config {
port: 8080,
debug_mode: false,
ui_level: 1,
enable_dummy_analyzer: false,
}
}
}
Expand All @@ -34,10 +37,11 @@ pub fn parse_config<P>(path: P) -> Result<Config, RayhunterError> where P: AsRef
if let Ok(config_file) = std::fs::read_to_string(&path) {
let parsed_config: ConfigFile = toml::from_str(&config_file)
.map_err(RayhunterError::ConfigFileParsingError)?;
if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path }
if let Some(port) = parsed_config.port { config.port = port }
if let Some(debug_mode) = parsed_config.debug_mode { config.debug_mode = debug_mode }
if let Some(ui_level) = parsed_config.ui_level { config.ui_level = ui_level }
parsed_config.qmdl_store_path.map(|v| config.qmdl_store_path = v);
parsed_config.port.map(|v| config.port = v);
parsed_config.debug_mode.map(|v| config.debug_mode = v);
parsed_config.ui_level.map(|v| config.ui_level = v);
parsed_config.enable_dummy_analyzer.map(|v| config.enable_dummy_analyzer = v);
}
Ok(config)
}
Expand Down
5 changes: 3 additions & 2 deletions bin/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod stats;
mod qmdl_store;
mod diag;
mod framebuffer;
mod dummy_analyzer;

use crate::config::{parse_config, parse_args};
use crate::diag::run_diag_read_thread;
Expand Down Expand Up @@ -223,14 +224,14 @@ async fn main() -> Result<(), RayhunterError> {
.map_err(RayhunterError::DiagInitError)?;

info!("Starting Diag Thread");
run_diag_read_thread(&task_tracker, dev, rx, ui_update_tx.clone(), qmdl_store_lock.clone());
run_diag_read_thread(&task_tracker, dev, rx, ui_update_tx.clone(), qmdl_store_lock.clone(), config.enable_dummy_analyzer);
info!("Starting UI");
update_ui(&task_tracker, &config, ui_shutdown_rx, ui_update_rx);
}
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
info!("create shutdown thread");
let analysis_status_lock = Arc::new(RwLock::new(AnalysisStatus::default()));
run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone());
run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone(), config.enable_dummy_analyzer);
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, maybe_ui_shutdown_tx, qmdl_store_lock.clone(), analysis_tx.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, ui_update_tx, tx, analysis_tx, analysis_status_lock).await;

Expand Down
64 changes: 6 additions & 58 deletions bin/src/diag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,93 +6,41 @@ use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use rayhunter::analysis::analyzer::Harness;
use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::diag::DataType;
use rayhunter::diag_device::DiagDevice;
use serde::Serialize;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, Sender};
use rayhunter::qmdl::QmdlWriter;
use log::{debug, error, info};
use tokio::fs::File;
use tokio::io::BufWriter;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};

use crate::framebuffer;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
use crate::analysis::AnalysisWriter;

pub enum DiagDeviceCtrlMessage {
StopRecording,
StartRecording((QmdlWriter<File>, File)),
Exit,
}

struct AnalysisWriter {
writer: BufWriter<File>,
harness: Harness,
bytes_written: usize,
}

// We write our analysis results to a file immediately to minimize the amount of
// state Rayhunter has to keep track of in memory. The analysis file's format is
// Newline Delimited JSON
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Ok(result)
}

// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<(usize, bool), std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok((self.bytes_written, ! &row.analysis.is_empty()))
}

async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n');
self.bytes_written += value_str.len();
self.writer.write_all(value_str.as_bytes()).await?;
self.writer.flush().await?;
Ok(())
}

// Flushes any pending I/O to disk before dropping the writer
pub async fn close(mut self) -> Result<(), std::io::Error> {
self.writer.flush().await?;
Ok(())
}
}

pub fn run_diag_read_thread(
task_tracker: &TaskTracker,
mut dev: DiagDevice,
mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>,
ui_update_sender: Sender<framebuffer::DisplayState>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
enable_dummy_analyzer: bool,
) {
task_tracker.spawn(async move {
let (initial_qmdl_file, initial_analysis_file) = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut maybe_qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_qmdl_file));
let mut diag_stream = pin!(dev.as_stream().into_stream());
let mut maybe_analysis_writer = Some(AnalysisWriter::new(initial_analysis_file).await
let mut maybe_analysis_writer = Some(AnalysisWriter::new(initial_analysis_file, enable_dummy_analyzer).await
.expect("failed to create analysis writer"));
loop {
tokio::select! {
Expand All @@ -103,7 +51,7 @@ pub fn run_diag_read_thread(
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
maybe_analysis_writer = Some(AnalysisWriter::new(new_analysis_file).await
maybe_analysis_writer = Some(AnalysisWriter::new(new_analysis_file, enable_dummy_analyzer).await
.expect("failed to write to analysis file"));
},
Some(DiagDeviceCtrlMessage::StopRecording) => {
Expand Down
45 changes: 45 additions & 0 deletions bin/src/dummy_analyzer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::borrow::Cow;

use rayhunter::telcom_parser::lte_rrc::{PCCH_MessageType, PCCH_MessageType_c1, PagingUE_Identity};

use rayhunter::analysis::analyzer::{Analyzer, Event, EventType, Severity};
use rayhunter::analysis::information_element::{InformationElement, LteInformationElement};

pub struct TestAnalyzer{
pub count: i32,
}

impl Analyzer for TestAnalyzer{
fn get_name(&self) -> Cow<str> {
Cow::from("Example Analyzer")
}

fn get_description(&self) -> Cow<str> {
Cow::from("Always returns true, if you are seeing this you are either a developer or you are about to have problems.")
}

fn analyze_information_element(&mut self, ie: &InformationElement) -> Option<Event> {
self.count += 1;
if self.count % 100 == 0 {
return Some(Event {
event_type: EventType::Informational ,
message: "multiple of 100 events processed".to_string(),
})
}
let InformationElement::LTE(LteInformationElement::PCCH(pcch_msg)) = ie else {
return None;
};
let PCCH_MessageType::C1(PCCH_MessageType_c1::Paging(paging)) = &pcch_msg.message else {
return None;
};
for record in &paging.paging_record_list.as_ref()?.0 {
if let PagingUE_Identity::S_TMSI(_) = record.ue_identity {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::Low },
message: "TMSI was provided to cell".to_string(),
})
}
}
None
}
}
3 changes: 0 additions & 3 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,3 @@ tokio = { version = "1.35.1", features = ["full"] }
futures-core = "0.3.30"
futures = "0.3.30"
serde = { version = "1.0.197", features = ["derive"] }

[features]
debug = []
23 changes: 13 additions & 10 deletions lib/src/analysis/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ use crate::{diag::MessagesContainer, gsmtap_parser};

use super::{imsi_provided::ImsiProvidedAnalyzer, information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer, null_cipher::NullCipherAnalyzer};

#[cfg(feature="debug")]
use log::warn;

#[cfg(feature="debug")]
use super::test_analyzer::TestAnalyzer;

/// Qualitative measure of how severe a Warning event type is.
/// The levels should break down like this:
/// * Low: if combined with a large number of other Warnings, user should investigate
Expand Down Expand Up @@ -92,6 +86,19 @@ impl AnalysisRow {
pub fn is_empty(&self) -> bool {
self.skipped_message_reasons.is_empty() && self.analysis.is_empty()
}

pub fn contains_warnings(&self) -> bool {
for analysis in &self.analysis {
for maybe_event in &analysis.events {
if let Some(event) = maybe_event {
if matches!(event.event_type, EventType::QualitativeWarning { .. }) {
return true;
}
}
}
}
false
}
}

pub struct Harness {
Expand All @@ -109,10 +116,6 @@ impl Harness {
harness.add_analyzer(Box::new(ImsiProvidedAnalyzer{}));
harness.add_analyzer(Box::new(NullCipherAnalyzer{}));

#[cfg(feature="debug")] {
warn!("Loading test analyzers!");
harness.add_analyzer(Box::new(TestAnalyzer{count:0}));
}
harness
}

Expand Down
3 changes: 3 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ pub mod gsmtap;
pub mod gsmtap_parser;
pub mod pcap;
pub mod analysis;

// re-export telcom_parser, since we use its types in our API
pub use telcom_parser;

0 comments on commit 3a97d24

Please sign in to comment.