Skip to content

Commit

Permalink
fix timer
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed Feb 12, 2024
1 parent d43d1df commit 338c983
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 44 deletions.
6 changes: 3 additions & 3 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
fn run(mut self) {
loop {
let before_poll = Timestamp::now();
let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT);
let timeout = self.timeouts.first_expiring_after(before_poll).unwrap_or(WAIT_TIMEOUT);

for (id, res) in &self.listeners {
self.poller.set_interest(*id, res.interests());
Expand All @@ -463,7 +463,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {

// Nb. The way this is currently used basically ignores which keys have
// timed out. So as long as *something* timed out, we wake the service.
let timers_fired = self.timeouts.expire(now);
let timers_fired = self.timeouts.remove_expired_by(now);
if timers_fired > 0 {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Timer has fired");
Expand Down Expand Up @@ -688,7 +688,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Adding timer {duration:?} from now");

self.timeouts.set_timer(duration, time);
self.timeouts.set_timeout(duration, time);
}
}
Ok(())
Expand Down
104 changes: 63 additions & 41 deletions src/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,23 @@ pub struct Timer {
}

impl Timer {
/// Create a new timeout manager.
///
/// Takes a threshold below which two timeouts cannot overlap.
/// Create a new timer containing no timeouts.
pub fn new() -> Self { Self { timeouts: bset! {} } }

/// Return the number of timeouts being tracked.
pub fn len(&self) -> usize { self.timeouts.len() }
pub fn count(&self) -> usize { self.timeouts.len() }

/// Check whether there are timeouts being tracked.
pub fn is_empty(&self) -> bool { self.timeouts.is_empty() }
pub fn has_timeouts(&self) -> bool { !self.timeouts.is_empty() }

/// Register a new timeout with an associated key and wake-up time from a
/// UNIX time epoch.
pub fn set_timer(&mut self, span: Duration, after: Timestamp) {
let time = after + Timestamp(span.as_millis());
/// Register a new timeout relative to a certain point in time.
pub fn set_timeout(&mut self, timeout: Duration, after: Timestamp) {
let time = after + Timestamp(timeout.as_millis());
self.timeouts.insert(time);
}

/// Get the minimum time duration we should wait for at least one timeout
/// to be reached. Returns `None` if there are no timeouts.
/// Get the first timeout expiring right at or after certain moment of time.
/// Returns `None` if there are no timeouts.
///
/// ```
/// # use std::time::{Duration};
Expand All @@ -116,30 +113,33 @@ impl Timer {
/// let mut tm = Timer::new();
///
/// let now = Timestamp::now();
/// tm.set_timer(Duration::from_secs(16), now);
/// tm.set_timer(Duration::from_secs(8), now);
/// tm.set_timer(Duration::from_secs(64), now);
/// tm.set_timeout(Duration::from_secs(16), now);
/// tm.set_timeout(Duration::from_secs(8), now);
/// tm.set_timeout(Duration::from_secs(64), now);
///
/// let mut now = Timestamp::now();
/// // We need to wait 8 secs to trigger the next timeout (1).
/// assert!(tm.next(now) <= Some(Duration::from_secs(8)));
/// assert!(tm.first_expiring_after(now) <= Some(Duration::from_secs(8)));
///
/// // ... sleep for a sec ...
/// now += Duration::from_secs(1);
///
/// // Now we don't need to wait as long!
/// assert!(tm.next(now).unwrap() <= Duration::from_secs(7));
/// assert!(tm.first_expiring_after(now).unwrap() <= Duration::from_secs(7));
/// ```
pub fn next(&self, after: impl Into<Timestamp>) -> Option<Duration> {
let after = after.into();
self.timeouts
.iter()
.find(|t| **t >= after)
.map(|t| Duration::from_millis((*t - after).as_millis()))
pub fn first_expiring_after(&self, time: impl Into<Timestamp>) -> Option<Duration> {
let time = time.into();
let last = *self.timeouts.first()?;
Some(if last >= time {
Duration::from_secs(last.as_secs() - time.as_secs())
} else {
Duration::from_secs(0)
})
}

/// Returns vector of timers which has fired before certain time.
pub fn expire(&mut self, time: Timestamp) -> usize {
/// Removes timeouts which expire by a certain moment of time (inclusive),
/// returning total number of timeouts which were removed.
pub fn remove_expired_by(&mut self, time: Timestamp) -> usize {
// Since `split_off` returns everything *after* the given key, including the key,
// if a timer is set for exactly the given time, it would remain in the "after"
// set of unexpired keys. This isn't what we want, therefore we add `1` to the
Expand All @@ -162,34 +162,56 @@ mod tests {
let mut tm = Timer::new();

let now = Timestamp::now();
tm.set_timer(Duration::from_secs(8), now);
tm.set_timer(Duration::from_secs(9), now);
tm.set_timer(Duration::from_secs(10), now);
tm.set_timeout(Duration::from_secs(8), now);
tm.set_timeout(Duration::from_secs(9), now);
tm.set_timeout(Duration::from_secs(10), now);

assert_eq!(tm.expire(now + Duration::from_secs(9)), 2);
assert_eq!(tm.len(), 1);
assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
assert_eq!(tm.count(), 1);
}

#[test]
fn test_wake() {
let mut tm = Timer::new();

let now = Timestamp::now();
tm.set_timer(Duration::from_secs(8), now);
tm.set_timer(Duration::from_secs(16), now);
tm.set_timer(Duration::from_secs(64), now);
tm.set_timer(Duration::from_secs(72), now);
tm.set_timeout(Duration::from_secs(8), now);
tm.set_timeout(Duration::from_secs(16), now);
tm.set_timeout(Duration::from_secs(64), now);
tm.set_timeout(Duration::from_secs(72), now);

assert_eq!(tm.remove_expired_by(now), 0);
assert_eq!(tm.count(), 4);

assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
assert_eq!(tm.count(), 3, "one timeout has expired");

assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
assert_eq!(tm.count(), 1, "another two timeouts have expired");

assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
assert!(tm.has_timeouts(), "all timeouts have expired");
}

#[test]
fn test_next() {
let mut tm = Timer::new();

let mut now = Timestamp::now();
tm.set_timeout(Duration::from_secs(3), now);
assert_eq!(tm.first_expiring_after(now), Some(Duration::from_secs(3)));

assert_eq!(tm.expire(now), 0);
assert_eq!(tm.len(), 4);
now += Duration::from_secs(2);
assert_eq!(tm.first_expiring_after(now), Some(Duration::from_secs(1)));

assert_eq!(tm.expire(now + Duration::from_secs(9)), 1);
assert_eq!(tm.len(), 3, "one timeout has expired");
now += Duration::from_secs(1);
assert_eq!(tm.first_expiring_after(now), Some(Duration::from_secs(0)));

assert_eq!(tm.expire(now + Duration::from_secs(66)), 2);
assert_eq!(tm.len(), 1, "another two timeouts have expired");
now += Duration::from_secs(1);
assert_eq!(tm.first_expiring_after(now), Some(Duration::from_secs(0)));

assert_eq!(tm.expire(now + Duration::from_secs(96)), 1);
assert!(tm.is_empty(), "all timeouts have expired");
assert_eq!(tm.remove_expired_by(now), 1);
assert_eq!(tm.count(), 0);
assert_eq!(tm.first_expiring_after(now), None);
}
}

0 comments on commit 338c983

Please sign in to comment.