Skip to content

Commit

Permalink
feat: Made timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Nereuxofficial committed Sep 7, 2023
1 parent ce391c7 commit 4b8140a
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
2 changes: 1 addition & 1 deletion lib/benches/markov_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::RwLock;

async fn exec_markov_fast(m: Mode) {
let stream = duplex(10 * 1024).0;
let mut machine = StateMachine::new(stream);
let mut machine = StateMachine::new(stream, 100);
let mut rng = Xoshiro256PlusPlus::from_seed([5; 32]);
machine
.execute(m, &mut rng, &Arc::new(RwLock::new(PacketQueue::default())))
Expand Down
7 changes: 5 additions & 2 deletions lib/src/markov/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ where
pub previous_packets: Vec<Packets>,
// The current stream, TlsStream TcpStream or WebsocketStream
stream: B,
timeout: u16,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum Mode {
Expand Down Expand Up @@ -119,12 +120,13 @@ impl<B> StateMachine<B>
where
B: ByteStream,
{
pub fn new(stream: B) -> Self {
pub fn new(stream: B, timeout: u16) -> Self {
Self {
stream,
state: Default::default(),
packets: Packets::new(),
previous_packets: Vec::new(),
timeout,
}
}
pub async fn execute(
Expand Down Expand Up @@ -224,7 +226,8 @@ where
}
State::SEND => {
self.previous_packets.push(self.packets.clone());
let res = send_packets(&mut self.stream, &self.packets, packet_queue).await;
let res =
send_packets(&mut self.stream, &self.packets, packet_queue, self.timeout).await;
if let Err(e) = res {
match e {
SendError::Timeout => {}
Expand Down
14 changes: 9 additions & 5 deletions lib/src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ pub(crate) async fn send_packets(
stream: &mut impl ByteStream,
packets: &Packets,
packet_queue: &Arc<RwLock<PacketQueue>>,
timeout: u16,
) -> Result<(), SendError> {
for packet in packets.inner.iter().filter(|p| !p.is_empty()) {
send_packet(stream, packet.as_slice(), packets, packet_queue).await?;
send_packet(stream, packet.as_slice(), packets, packet_queue, timeout).await?;
}
Ok(())
}

const PACKET_TIMEOUT: u64 = 30;

pub(crate) async fn send_packet(
stream: &mut impl ByteStream,
packet: &[u8],
packets: &Packets,
packet_queue: &Arc<RwLock<PacketQueue>>,
timeout_ms: u16,
) -> Result<(), SendError> {
let write_result = timeout(
Duration::from_millis(PACKET_TIMEOUT),
Duration::from_millis(timeout_ms as u64),
stream.write_all(packet),
)
.await;
Expand All @@ -91,7 +91,11 @@ pub(crate) async fn send_packet(
}
}
let mut buf = [0; 1024];
let res = timeout(Duration::from_millis(PACKET_TIMEOUT), stream.read(&mut buf)).await;
let res = timeout(
Duration::from_millis(timeout_ms as u64),
stream.read(&mut buf),
)
.await;
match res {
Ok(Ok(p)) => {
known_packet(&buf[..p], packets, packet_queue).await;
Expand Down
4 changes: 2 additions & 2 deletions lib/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub async fn run_thread(
iterations: u64,
packet_queue: Arc<RwLock<PacketQueue>>,
it_sender_clone: Sender<u64>,
timeout: u16,
) {
let task_handle = task::spawn(async move {
let mut last_packets = Vec::new();
Expand All @@ -43,12 +44,11 @@ pub async fn run_thread(
continue;
}
let new_tcpstream = new_stream.unwrap();
let mut state_machine = StateMachine::new(new_tcpstream);
let mut state_machine = StateMachine::new(new_tcpstream, timeout);
let mode = rng.gen();
state_machine.execute(mode, &mut rng, &packet_queue).await;
last_packets = state_machine.previous_packets.clone();
// We receive a message once the broker is stopped
// TODO: Also save last packets upon crash
if !receiver_clone.is_empty() {
break;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct Cli {
broker_command: String,
// TODO: Make the timeout configurable
#[arg(long, default_value = "200")]
timeout: u64,
timeout: u16,
}

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -77,6 +77,7 @@ async fn main() -> color_eyre::Result<()> {
u64::MAX,
packet_queue.clone(),
it_sender_clone,
cli.timeout,
));
}
// Track it/s
Expand Down Expand Up @@ -132,6 +133,7 @@ async fn main() -> color_eyre::Result<()> {
u64::from_str(&seed_and_iterations.iterations).unwrap(),
packet_queue.clone(),
unused_it_channel.clone(),
cli.timeout,
));
}
if *sequential {
Expand Down

0 comments on commit 4b8140a

Please sign in to comment.