diff --git a/src/reactor.rs b/src/reactor.rs index 21dafa3..ac7eebf 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -444,7 +444,7 @@ impl Runtime { fn run(mut self) { loop { let before_poll = Timestamp::now(); - let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT); + let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT); for (id, res) in &self.listeners { self.poller.set_interest(*id, res.interests()); @@ -463,7 +463,7 @@ impl Runtime { // Nb. The way this is currently used basically ignores which keys have // timed out. So as long as *something* timed out, we wake the service. - let timers_fired = self.timeouts.expire(now); + let timers_fired = self.timeouts.remove_expired_by(now); if timers_fired > 0 { #[cfg(feature = "log")] log::trace!(target: "reactor", "Timer has fired"); @@ -688,7 +688,7 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Adding timer {duration:?} from now"); - self.timeouts.set_timer(duration, time); + self.timeouts.set_timeout(duration, time); } } Ok(()) diff --git a/src/timeouts.rs b/src/timeouts.rs index 97c257e..cc91c3b 100644 --- a/src/timeouts.rs +++ b/src/timeouts.rs @@ -88,26 +88,23 @@ pub struct Timer { } impl Timer { - /// Create a new timeout manager. - /// - /// Takes a threshold below which two timeouts cannot overlap. + /// Create a new timer containing no timeouts. pub fn new() -> Self { Self { timeouts: bset! {} } } /// Return the number of timeouts being tracked. - pub fn len(&self) -> usize { self.timeouts.len() } + pub fn count(&self) -> usize { self.timeouts.len() } /// Check whether there are timeouts being tracked. - pub fn is_empty(&self) -> bool { self.timeouts.is_empty() } + pub fn has_timeouts(&self) -> bool { !self.timeouts.is_empty() } - /// Register a new timeout with an associated key and wake-up time from a - /// UNIX time epoch. - pub fn set_timer(&mut self, span: Duration, after: Timestamp) { - let time = after + Timestamp(span.as_millis()); + /// Register a new timeout relative to a certain point in time. + pub fn set_timeout(&mut self, timeout: Duration, after: Timestamp) { + let time = after + Timestamp(timeout.as_millis()); self.timeouts.insert(time); } - /// Get the minimum time duration we should wait for at least one timeout - /// to be reached. Returns `None` if there are no timeouts. + /// Get the first timeout expiring right at or after certain moment of time. + /// Returns `None` if there are no timeouts. /// /// ``` /// # use std::time::{Duration}; @@ -116,30 +113,33 @@ impl Timer { /// let mut tm = Timer::new(); /// /// let now = Timestamp::now(); - /// tm.set_timer(Duration::from_secs(16), now); - /// tm.set_timer(Duration::from_secs(8), now); - /// tm.set_timer(Duration::from_secs(64), now); + /// tm.set_timeout(Duration::from_secs(16), now); + /// tm.set_timeout(Duration::from_secs(8), now); + /// tm.set_timeout(Duration::from_secs(64), now); /// /// let mut now = Timestamp::now(); /// // We need to wait 8 secs to trigger the next timeout (1). - /// assert!(tm.next(now) <= Some(Duration::from_secs(8))); + /// assert!(tm.next_expiring_from(now) <= Some(Duration::from_secs(8))); /// /// // ... sleep for a sec ... /// now += Duration::from_secs(1); /// /// // Now we don't need to wait as long! - /// assert!(tm.next(now).unwrap() <= Duration::from_secs(7)); + /// assert!(tm.next_expiring_from(now).unwrap() <= Duration::from_secs(7)); /// ``` - pub fn next(&self, after: impl Into) -> Option { - let after = after.into(); - self.timeouts - .iter() - .find(|t| **t >= after) - .map(|t| Duration::from_millis((*t - after).as_millis())) + pub fn next_expiring_from(&self, time: impl Into) -> Option { + let time = time.into(); + let last = *self.timeouts.first()?; + Some(if last >= time { + Duration::from_secs(last.as_secs() - time.as_secs()) + } else { + Duration::from_secs(0) + }) } - /// Returns vector of timers which has fired before certain time. - pub fn expire(&mut self, time: Timestamp) -> usize { + /// Removes timeouts which expire by a certain moment of time (inclusive), + /// returning total number of timeouts which were removed. + pub fn remove_expired_by(&mut self, time: Timestamp) -> usize { // Since `split_off` returns everything *after* the given key, including the key, // if a timer is set for exactly the given time, it would remain in the "after" // set of unexpired keys. This isn't what we want, therefore we add `1` to the @@ -162,12 +162,12 @@ mod tests { let mut tm = Timer::new(); let now = Timestamp::now(); - tm.set_timer(Duration::from_secs(8), now); - tm.set_timer(Duration::from_secs(9), now); - tm.set_timer(Duration::from_secs(10), now); + tm.set_timeout(Duration::from_secs(8), now); + tm.set_timeout(Duration::from_secs(9), now); + tm.set_timeout(Duration::from_secs(10), now); - assert_eq!(tm.expire(now + Duration::from_secs(9)), 2); - assert_eq!(tm.len(), 1); + assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2); + assert_eq!(tm.count(), 1); } #[test] @@ -175,21 +175,43 @@ mod tests { let mut tm = Timer::new(); let now = Timestamp::now(); - tm.set_timer(Duration::from_secs(8), now); - tm.set_timer(Duration::from_secs(16), now); - tm.set_timer(Duration::from_secs(64), now); - tm.set_timer(Duration::from_secs(72), now); + tm.set_timeout(Duration::from_secs(8), now); + tm.set_timeout(Duration::from_secs(16), now); + tm.set_timeout(Duration::from_secs(64), now); + tm.set_timeout(Duration::from_secs(72), now); + + assert_eq!(tm.remove_expired_by(now), 0); + assert_eq!(tm.count(), 4); + + assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1); + assert_eq!(tm.count(), 3, "one timeout has expired"); + + assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2); + assert_eq!(tm.count(), 1, "another two timeouts have expired"); + + assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1); + assert!(tm.has_timeouts(), "all timeouts have expired"); + } + + #[test] + fn test_next() { + let mut tm = Timer::new(); + + let mut now = Timestamp::now(); + tm.set_timeout(Duration::from_secs(3), now); + assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3))); - assert_eq!(tm.expire(now), 0); - assert_eq!(tm.len(), 4); + now += Duration::from_secs(2); + assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1))); - assert_eq!(tm.expire(now + Duration::from_secs(9)), 1); - assert_eq!(tm.len(), 3, "one timeout has expired"); + now += Duration::from_secs(1); + assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0))); - assert_eq!(tm.expire(now + Duration::from_secs(66)), 2); - assert_eq!(tm.len(), 1, "another two timeouts have expired"); + now += Duration::from_secs(1); + assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0))); - assert_eq!(tm.expire(now + Duration::from_secs(96)), 1); - assert!(tm.is_empty(), "all timeouts have expired"); + assert_eq!(tm.remove_expired_by(now), 1); + assert_eq!(tm.count(), 0); + assert_eq!(tm.next_expiring_from(now), None); } }