diff --git a/src/lib.rs b/src/lib.rs index 31c7ab1c..97006032 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ mod stream; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; +use std::time::Instant; use mlua::{prelude::*, LuaSerdeExt}; use mlua::{Function, Value}; @@ -109,6 +110,8 @@ pub fn stop(_: &Lua, _: ()) -> LuaResult<()> { #[no_mangle] pub fn next(lua: &Lua, (env, callback): (i32, Function)) -> LuaResult { + let start = Instant::now(); + if let Some(Server { ref ipc_mission, ref ipc_hook, @@ -116,6 +119,8 @@ pub fn next(lua: &Lua, (env, callback): (i32, Function)) -> LuaResult { .. }) = *SERVER.read().unwrap() { + let _guard = stats.track_block_time(start); + let next = match env { 1 => ipc_mission.try_next(), 2 => ipc_hook.try_next(), @@ -172,6 +177,8 @@ pub fn next(lua: &Lua, (env, callback): (i32, Function)) -> LuaResult { #[no_mangle] pub fn event(lua: &Lua, event: Value) -> LuaResult<()> { + let start = Instant::now(); + let event: Event = match lua.from_value(event) { Ok(event) => event, Err(err) => { @@ -186,9 +193,13 @@ pub fn event(lua: &Lua, event: Value) -> LuaResult<()> { if let Some(Server { ref ipc_mission, ref runtime, + ref stats, .. }) = *SERVER.read().unwrap() { + let _guard = stats.track_block_time(start); + stats.track_event(); + log::debug!("Received event: {:#?}", event); runtime.block_on(ipc_mission.event(event)); } diff --git a/src/rpc.rs b/src/rpc.rs index 2c0e77ab..9d0d066b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -62,7 +62,7 @@ impl MissionRpc { I: serde::Serialize + Send + Sync + 'static, for<'de> O: serde::Deserialize<'de> + Send + Sync + std::fmt::Debug + 'static, { - let _guard = self.stats.track_queue(); + let _guard = self.stats.track_queue_size(); self.ipc .request(method, Some(request.into_inner())) .await @@ -73,7 +73,7 @@ impl MissionRpc { where I: serde::Serialize + Send + Sync + 'static, { - let _guard = self.stats.track_queue(); + let _guard = self.stats.track_queue_size(); self.ipc .notification(method, Some(request.into_inner())) .await @@ -100,7 +100,7 @@ impl HookRpc { I: serde::Serialize + Send + Sync + 'static, for<'de> O: serde::Deserialize<'de> + Send + Sync + std::fmt::Debug + 'static, { - let _guard = self.stats.track_queue(); + let _guard = self.stats.track_queue_size(); self.ipc .request(method, Some(request.into_inner())) .await @@ -111,7 +111,7 @@ impl HookRpc { where I: serde::Serialize + Send + Sync + 'static, { - let _guard = self.stats.track_queue(); + let _guard = self.stats.track_queue_size(); self.ipc .notification(method, Some(request.into_inner())) .await diff --git a/src/stats.rs b/src/stats.rs index 7184f39c..67acad2f 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -15,6 +15,8 @@ struct Inner { shutdown_signal: ShutdownHandle, /// Total numer of calls into the MSE. calls_count: AtomicU32, + /// Total numer of events received from the MSE. + events_count: AtomicU32, /// Total numer of calls in the queue. queue_size: AtomicU32, /// Time spent waiting for MSE calls to complete (since last report). @@ -27,21 +29,22 @@ struct Inner { struct IntervalStats { /// Highest TPS count of calls into the MSE. tps_highest: f64, + /// Highest events per second. + eps_highest: f64, /// Sum of the queue sizes at each tick (neccessary to calculate the average). queue_size_total: u32, /// Highest queue size at a tick of the interval. queue_size_highest: u32, } -/// This guard is used to track call completion and time spend until completed (completed is -/// equivalent to this guard being dropped). -pub struct TrackCallGuard { +/// This guard is used to keep track of the time the gRPC server blocked DCS. +pub struct TrackBlockTimeGuard { start: Instant, stats: Arc, } /// This guard is used to keep track of calls in the queue. -pub struct TrackQueueGuard { +pub struct TrackQueueSizeGuard { stats: Arc, } @@ -50,23 +53,32 @@ impl Stats { Stats(Arc::new(Inner { shutdown_signal, calls_count: AtomicU32::new(0), + events_count: AtomicU32::new(0), queue_size: AtomicU32::new(0), nanoseconds_waited: AtomicUsize::new(0), interval_stats: Arc::new(Mutex::new(IntervalStats::default())), })) } - pub fn track_call(&self) -> TrackCallGuard { + pub fn track_call(&self) { self.0.calls_count.fetch_add(1, Ordering::Relaxed); - TrackCallGuard { - start: Instant::now(), + } + + pub fn track_event(&self) { + self.0.events_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn track_block_time(&self, start: Instant) -> TrackBlockTimeGuard { + self.0.calls_count.fetch_add(1, Ordering::Relaxed); + TrackBlockTimeGuard { + start, stats: self.0.clone(), } } - pub fn track_queue(&self) -> TrackQueueGuard { + pub fn track_queue_size(&self) -> TrackQueueSizeGuard { self.0.queue_size.fetch_add(1, Ordering::Relaxed); - TrackQueueGuard { + TrackQueueSizeGuard { stats: self.0.clone(), } } @@ -81,6 +93,7 @@ impl Stats { loop { let calls_count_before = self.0.calls_count.load(Ordering::Relaxed); + let events_count_before = self.0.events_count.load(Ordering::Relaxed); let start = Instant::now(); // wait for either the shutdown signal or the next interval tick, whatever happens first @@ -93,6 +106,7 @@ impl Stats { let mut interval_stats = self.0.interval_stats.lock().await; let calls_count = self.0.calls_count.load(Ordering::Relaxed); + let events_count = self.0.events_count.load(Ordering::Relaxed); // update report for elapsed second let elapsed = start.elapsed().as_secs_f64(); @@ -103,6 +117,12 @@ impl Stats { interval_stats.tps_highest = tps; } + // update highest events per second + let eps = f64::from(events_count - events_count_before) / elapsed; + if eps > interval_stats.eps_highest { + interval_stats.eps_highest = eps; + } + // update queue size let queue_size = self.0.queue_size.load(Ordering::Relaxed); interval_stats.queue_size_total += queue_size; @@ -118,6 +138,10 @@ impl Stats { let tps_average = f64::try_from(calls_count).unwrap_or(f64::MAX) / elapsed.as_secs_f64(); + // average events per second + let eps_average = + f64::try_from(events_count).unwrap_or(f64::MAX) / elapsed.as_secs_f64(); + // total block time let block_time_total = Duration::from_nanos( u64::try_from(self.0.nanoseconds_waited.swap(0, Ordering::Relaxed)) @@ -137,6 +161,11 @@ impl Stats { tps_average, interval_stats.tps_highest ); + log::info!( + "Events per second: average={:.2}, highest={:.2}", + eps_average, + interval_stats.eps_highest + ); log::info!( "Blocking time: total={:?} (≙ {:.3}%)", block_time_total, @@ -158,7 +187,7 @@ impl Stats { } } -impl Drop for TrackCallGuard { +impl Drop for TrackBlockTimeGuard { fn drop(&mut self) { self.stats.nanoseconds_waited.fetch_add( usize::try_from(self.start.elapsed().as_nanos()).unwrap_or(usize::MAX), @@ -167,7 +196,7 @@ impl Drop for TrackCallGuard { } } -impl Drop for TrackQueueGuard { +impl Drop for TrackQueueSizeGuard { fn drop(&mut self) { self.stats.queue_size.fetch_sub(1, Ordering::Relaxed); }