Skip to content

Commit

Permalink
Merge pull request #43 from DCS-gRPC/stats2
Browse files Browse the repository at this point in the history
include events and lock acquisition to stats
  • Loading branch information
rkusa authored Oct 22, 2021
2 parents d7033e8 + 192473f commit b572050
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 15 deletions.
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod stream;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Instant;

use mlua::{prelude::*, LuaSerdeExt};
use mlua::{Function, Value};
Expand Down Expand Up @@ -109,13 +110,17 @@ pub fn stop(_: &Lua, _: ()) -> LuaResult<()> {

#[no_mangle]
pub fn next(lua: &Lua, (env, callback): (i32, Function)) -> LuaResult<bool> {
let start = Instant::now();

if let Some(Server {
ref ipc_mission,
ref ipc_hook,
ref stats,
..
}) = *SERVER.read().unwrap()
{
let _guard = stats.track_block_time(start);

let next = match env {
1 => ipc_mission.try_next(),
2 => ipc_hook.try_next(),
Expand Down Expand Up @@ -172,6 +177,8 @@ pub fn next(lua: &Lua, (env, callback): (i32, Function)) -> LuaResult<bool> {

#[no_mangle]
pub fn event(lua: &Lua, event: Value) -> LuaResult<()> {
let start = Instant::now();

let event: Event = match lua.from_value(event) {
Ok(event) => event,
Err(err) => {
Expand All @@ -186,9 +193,13 @@ pub fn event(lua: &Lua, event: Value) -> LuaResult<()> {
if let Some(Server {
ref ipc_mission,
ref runtime,
ref stats,
..
}) = *SERVER.read().unwrap()
{
let _guard = stats.track_block_time(start);
stats.track_event();

log::debug!("Received event: {:#?}", event);
runtime.block_on(ipc_mission.event(event));
}
Expand Down
8 changes: 4 additions & 4 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl MissionRpc {
I: serde::Serialize + Send + Sync + 'static,
for<'de> O: serde::Deserialize<'de> + Send + Sync + std::fmt::Debug + 'static,
{
let _guard = self.stats.track_queue();
let _guard = self.stats.track_queue_size();
self.ipc
.request(method, Some(request.into_inner()))
.await
Expand All @@ -73,7 +73,7 @@ impl MissionRpc {
where
I: serde::Serialize + Send + Sync + 'static,
{
let _guard = self.stats.track_queue();
let _guard = self.stats.track_queue_size();
self.ipc
.notification(method, Some(request.into_inner()))
.await
Expand All @@ -100,7 +100,7 @@ impl HookRpc {
I: serde::Serialize + Send + Sync + 'static,
for<'de> O: serde::Deserialize<'de> + Send + Sync + std::fmt::Debug + 'static,
{
let _guard = self.stats.track_queue();
let _guard = self.stats.track_queue_size();
self.ipc
.request(method, Some(request.into_inner()))
.await
Expand All @@ -111,7 +111,7 @@ impl HookRpc {
where
I: serde::Serialize + Send + Sync + 'static,
{
let _guard = self.stats.track_queue();
let _guard = self.stats.track_queue_size();
self.ipc
.notification(method, Some(request.into_inner()))
.await
Expand Down
51 changes: 40 additions & 11 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct Inner {
shutdown_signal: ShutdownHandle,
/// Total numer of calls into the MSE.
calls_count: AtomicU32,
/// Total numer of events received from the MSE.
events_count: AtomicU32,
/// Total numer of calls in the queue.
queue_size: AtomicU32,
/// Time spent waiting for MSE calls to complete (since last report).
Expand All @@ -27,21 +29,22 @@ struct Inner {
struct IntervalStats {
/// Highest TPS count of calls into the MSE.
tps_highest: f64,
/// Highest events per second.
eps_highest: f64,
/// Sum of the queue sizes at each tick (neccessary to calculate the average).
queue_size_total: u32,
/// Highest queue size at a tick of the interval.
queue_size_highest: u32,
}

/// This guard is used to track call completion and time spend until completed (completed is
/// equivalent to this guard being dropped).
pub struct TrackCallGuard {
/// This guard is used to keep track of the time the gRPC server blocked DCS.
pub struct TrackBlockTimeGuard {
start: Instant,
stats: Arc<Inner>,
}

/// This guard is used to keep track of calls in the queue.
pub struct TrackQueueGuard {
pub struct TrackQueueSizeGuard {
stats: Arc<Inner>,
}

Expand All @@ -50,23 +53,32 @@ impl Stats {
Stats(Arc::new(Inner {
shutdown_signal,
calls_count: AtomicU32::new(0),
events_count: AtomicU32::new(0),
queue_size: AtomicU32::new(0),
nanoseconds_waited: AtomicUsize::new(0),
interval_stats: Arc::new(Mutex::new(IntervalStats::default())),
}))
}

pub fn track_call(&self) -> TrackCallGuard {
pub fn track_call(&self) {
self.0.calls_count.fetch_add(1, Ordering::Relaxed);
TrackCallGuard {
start: Instant::now(),
}

pub fn track_event(&self) {
self.0.events_count.fetch_add(1, Ordering::Relaxed);
}

pub fn track_block_time(&self, start: Instant) -> TrackBlockTimeGuard {
self.0.calls_count.fetch_add(1, Ordering::Relaxed);
TrackBlockTimeGuard {
start,
stats: self.0.clone(),
}
}

pub fn track_queue(&self) -> TrackQueueGuard {
pub fn track_queue_size(&self) -> TrackQueueSizeGuard {
self.0.queue_size.fetch_add(1, Ordering::Relaxed);
TrackQueueGuard {
TrackQueueSizeGuard {
stats: self.0.clone(),
}
}
Expand All @@ -81,6 +93,7 @@ impl Stats {

loop {
let calls_count_before = self.0.calls_count.load(Ordering::Relaxed);
let events_count_before = self.0.events_count.load(Ordering::Relaxed);
let start = Instant::now();

// wait for either the shutdown signal or the next interval tick, whatever happens first
Expand All @@ -93,6 +106,7 @@ impl Stats {

let mut interval_stats = self.0.interval_stats.lock().await;
let calls_count = self.0.calls_count.load(Ordering::Relaxed);
let events_count = self.0.events_count.load(Ordering::Relaxed);

// update report for elapsed second
let elapsed = start.elapsed().as_secs_f64();
Expand All @@ -103,6 +117,12 @@ impl Stats {
interval_stats.tps_highest = tps;
}

// update highest events per second
let eps = f64::from(events_count - events_count_before) / elapsed;
if eps > interval_stats.eps_highest {
interval_stats.eps_highest = eps;
}

// update queue size
let queue_size = self.0.queue_size.load(Ordering::Relaxed);
interval_stats.queue_size_total += queue_size;
Expand All @@ -118,6 +138,10 @@ impl Stats {
let tps_average =
f64::try_from(calls_count).unwrap_or(f64::MAX) / elapsed.as_secs_f64();

// average events per second
let eps_average =
f64::try_from(events_count).unwrap_or(f64::MAX) / elapsed.as_secs_f64();

// total block time
let block_time_total = Duration::from_nanos(
u64::try_from(self.0.nanoseconds_waited.swap(0, Ordering::Relaxed))
Expand All @@ -137,6 +161,11 @@ impl Stats {
tps_average,
interval_stats.tps_highest
);
log::info!(
"Events per second: average={:.2}, highest={:.2}",
eps_average,
interval_stats.eps_highest
);
log::info!(
"Blocking time: total={:?} (≙ {:.3}%)",
block_time_total,
Expand All @@ -158,7 +187,7 @@ impl Stats {
}
}

impl Drop for TrackCallGuard {
impl Drop for TrackBlockTimeGuard {
fn drop(&mut self) {
self.stats.nanoseconds_waited.fetch_add(
usize::try_from(self.start.elapsed().as_nanos()).unwrap_or(usize::MAX),
Expand All @@ -167,7 +196,7 @@ impl Drop for TrackCallGuard {
}
}

impl Drop for TrackQueueGuard {
impl Drop for TrackQueueSizeGuard {
fn drop(&mut self) {
self.stats.queue_size.fetch_sub(1, Ordering::Relaxed);
}
Expand Down

0 comments on commit b572050

Please sign in to comment.