Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timer #26

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
fn run(mut self) {
loop {
let before_poll = Timestamp::now();
let timeout = self.timeouts.next(before_poll).unwrap_or(WAIT_TIMEOUT);
let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT);

for (id, res) in &self.listeners {
self.poller.set_interest(*id, res.interests());
Expand All @@ -463,7 +463,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {

// Nb. The way this is currently used basically ignores which keys have
// timed out. So as long as *something* timed out, we wake the service.
let timers_fired = self.timeouts.expire(now);
let timers_fired = self.timeouts.remove_expired_by(now);
if timers_fired > 0 {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Timer has fired");
Expand Down Expand Up @@ -688,7 +688,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Adding timer {duration:?} from now");

self.timeouts.set_timer(duration, time);
self.timeouts.set_timeout(duration, time);
}
}
Ok(())
Expand Down
104 changes: 63 additions & 41 deletions src/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,23 @@ pub struct Timer {
}

impl Timer {
/// Create a new timeout manager.
///
/// Takes a threshold below which two timeouts cannot overlap.
/// Create a new timer containing no timeouts.
pub fn new() -> Self { Self { timeouts: bset! {} } }

/// Return the number of timeouts being tracked.
pub fn len(&self) -> usize { self.timeouts.len() }
pub fn count(&self) -> usize { self.timeouts.len() }

/// Check whether there are timeouts being tracked.
pub fn is_empty(&self) -> bool { self.timeouts.is_empty() }
pub fn has_timeouts(&self) -> bool { !self.timeouts.is_empty() }

/// Register a new timeout with an associated key and wake-up time from a
/// UNIX time epoch.
pub fn set_timer(&mut self, span: Duration, after: Timestamp) {
let time = after + Timestamp(span.as_millis());
/// Register a new timeout relative to a certain point in time.
pub fn set_timeout(&mut self, timeout: Duration, after: Timestamp) {
let time = after + Timestamp(timeout.as_millis());
self.timeouts.insert(time);
}

/// Get the minimum time duration we should wait for at least one timeout
/// to be reached. Returns `None` if there are no timeouts.
/// Get the first timeout expiring right at or after certain moment of time.
/// Returns `None` if there are no timeouts.
///
/// ```
/// # use std::time::{Duration};
Expand All @@ -116,30 +113,33 @@ impl Timer {
/// let mut tm = Timer::new();
///
/// let now = Timestamp::now();
/// tm.set_timer(Duration::from_secs(16), now);
/// tm.set_timer(Duration::from_secs(8), now);
/// tm.set_timer(Duration::from_secs(64), now);
/// tm.set_timeout(Duration::from_secs(16), now);
/// tm.set_timeout(Duration::from_secs(8), now);
/// tm.set_timeout(Duration::from_secs(64), now);
///
/// let mut now = Timestamp::now();
/// // We need to wait 8 secs to trigger the next timeout (1).
/// assert!(tm.next(now) <= Some(Duration::from_secs(8)));
/// assert!(tm.next_expiring_from(now) <= Some(Duration::from_secs(8)));
///
/// // ... sleep for a sec ...
/// now += Duration::from_secs(1);
///
/// // Now we don't need to wait as long!
/// assert!(tm.next(now).unwrap() <= Duration::from_secs(7));
/// assert!(tm.next_expiring_from(now).unwrap() <= Duration::from_secs(7));
/// ```
pub fn next(&self, after: impl Into<Timestamp>) -> Option<Duration> {
let after = after.into();
self.timeouts
.iter()
.find(|t| **t >= after)
.map(|t| Duration::from_millis((*t - after).as_millis()))
pub fn next_expiring_from(&self, time: impl Into<Timestamp>) -> Option<Duration> {
let time = time.into();
let last = *self.timeouts.first()?;
Some(if last >= time {
Duration::from_millis(last.as_millis() - time.as_millis())
} else {
Duration::from_secs(0)
})
}

/// Returns vector of timers which has fired before certain time.
pub fn expire(&mut self, time: Timestamp) -> usize {
/// Removes timeouts which expire by a certain moment of time (inclusive),
/// returning total number of timeouts which were removed.
pub fn remove_expired_by(&mut self, time: Timestamp) -> usize {
// Since `split_off` returns everything *after* the given key, including the key,
// if a timer is set for exactly the given time, it would remain in the "after"
// set of unexpired keys. This isn't what we want, therefore we add `1` to the
Expand All @@ -162,34 +162,56 @@ mod tests {
let mut tm = Timer::new();

let now = Timestamp::now();
tm.set_timer(Duration::from_secs(8), now);
tm.set_timer(Duration::from_secs(9), now);
tm.set_timer(Duration::from_secs(10), now);
tm.set_timeout(Duration::from_secs(8), now);
tm.set_timeout(Duration::from_secs(9), now);
tm.set_timeout(Duration::from_secs(10), now);

assert_eq!(tm.expire(now + Duration::from_secs(9)), 2);
assert_eq!(tm.len(), 1);
assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 2);
assert_eq!(tm.count(), 1);
}

#[test]
fn test_wake() {
let mut tm = Timer::new();

let now = Timestamp::now();
tm.set_timer(Duration::from_secs(8), now);
tm.set_timer(Duration::from_secs(16), now);
tm.set_timer(Duration::from_secs(64), now);
tm.set_timer(Duration::from_secs(72), now);
tm.set_timeout(Duration::from_secs(8), now);
tm.set_timeout(Duration::from_secs(16), now);
tm.set_timeout(Duration::from_secs(64), now);
tm.set_timeout(Duration::from_secs(72), now);

assert_eq!(tm.remove_expired_by(now), 0);
assert_eq!(tm.count(), 4);

assert_eq!(tm.remove_expired_by(now + Duration::from_secs(9)), 1);
assert_eq!(tm.count(), 3, "one timeout has expired");

assert_eq!(tm.remove_expired_by(now + Duration::from_secs(66)), 2);
assert_eq!(tm.count(), 1, "another two timeouts have expired");

assert_eq!(tm.remove_expired_by(now + Duration::from_secs(96)), 1);
assert!(!tm.has_timeouts(), "all timeouts have expired");
}

#[test]
fn test_next() {
let mut tm = Timer::new();

let mut now = Timestamp::now();
tm.set_timeout(Duration::from_secs(3), now);
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(3)));

assert_eq!(tm.expire(now), 0);
assert_eq!(tm.len(), 4);
now += Duration::from_secs(2);
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(1)));

assert_eq!(tm.expire(now + Duration::from_secs(9)), 1);
assert_eq!(tm.len(), 3, "one timeout has expired");
now += Duration::from_secs(1);
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

assert_eq!(tm.expire(now + Duration::from_secs(66)), 2);
assert_eq!(tm.len(), 1, "another two timeouts have expired");
now += Duration::from_secs(1);
assert_eq!(tm.next_expiring_from(now), Some(Duration::from_secs(0)));

assert_eq!(tm.expire(now + Duration::from_secs(96)), 1);
assert!(tm.is_empty(), "all timeouts have expired");
assert_eq!(tm.remove_expired_by(now), 1);
assert_eq!(tm.count(), 0);
assert_eq!(tm.next_expiring_from(now), None);
}
}
Loading