diff --git a/src/lib.rs b/src/lib.rs index 420ae01..cecd07f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,7 +76,7 @@ mod reactor; mod resource; mod timeouts; -pub use resource::{Io, Resource, ResourceId, WriteAtomic, WriteError}; +pub use resource::{Io, Resource, ResourceId, ResourceIdGenerator, WriteAtomic, WriteError}; pub use timeouts::{Timer, Timestamp}; pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime}; diff --git a/src/poller/mod.rs b/src/poller/mod.rs index cd7b2e2..4dd3cbd 100644 --- a/src/poller/mod.rs +++ b/src/poller/mod.rs @@ -154,6 +154,8 @@ where /// Waker type used by the poll provider. type Waker: Waker; + /// Registers a waker object. + fn register_waker(&mut self, fd: &impl AsRawFd); /// Registers a file-descriptor based resource for a poll. fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId; /// Unregisters a file-descriptor based resource from a poll. diff --git a/src/poller/popol.rs b/src/poller/popol.rs index a01224b..d791a06 100644 --- a/src/poller/popol.rs +++ b/src/poller/popol.rs @@ -30,14 +30,14 @@ use std::sync::Arc; use std::time::Duration; use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend}; -use crate::ResourceId; +use crate::{ResourceId, ResourceIdGenerator}; /// Manager for a set of reactor which are polled for an event loop by the /// re-actor by using [`popol`] library. pub struct Poller { poll: popol::Sources, events: VecDeque>, - id_top: ResourceId, + id_gen: ResourceIdGenerator, } impl Default for Poller { @@ -50,7 +50,7 @@ impl Poller { Self { poll: popol::Sources::new(), events: empty!(), - id_top: ResourceId::ZERO, + id_gen: ResourceIdGenerator::default(), } } @@ -60,7 +60,7 @@ impl Poller { Self { poll: popol::Sources::with_capacity(capacity), events: VecDeque::with_capacity(capacity), - id_top: ResourceId::ZERO, + id_gen: ResourceIdGenerator::default(), } } } @@ -68,9 +68,19 @@ impl Poller { impl Poll for Poller { type Waker = PopolWaker; + fn register_waker(&mut self, fd: &impl AsRawFd) { + let id = ResourceId::WAKER; + if self.poll.get(&id).is_some() { + #[cfg(feature = "log")] + log::error!(target: "popol", "Reactor waker is already registered, terminating"); + panic!("Reactor waker is already registered"); + } + + self.poll.register(id, fd, popol::interest::READ); + } + fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId { - let id = self.id_top; - self.id_top.inc(); + let id = self.id_gen.next(); #[cfg(feature = "log")] log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id); diff --git a/src/reactor.rs b/src/reactor.rs index 1457187..61df204 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -279,7 +279,7 @@ impl Reactor { let thread = builder.spawn(move || { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - let waker_id = poller.register(&waker_reader, IoType::read_only()); + poller.register_waker(&waker_reader); let runtime = Runtime { service, @@ -289,7 +289,6 @@ impl Reactor { listeners: empty!(), transports: empty!(), waker: waker_reader, - waker_id, timeouts: Timer::new(), }; @@ -400,7 +399,6 @@ pub struct Runtime { listeners: HashMap, transports: HashMap, waker: ::Recv, - waker_id: ResourceId, timeouts: Timer, } @@ -414,7 +412,7 @@ impl Runtime { #[cfg(feature = "log")] log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd()); - let waker_id = poller.register(&waker_reader, IoType::read_only()); + poller.register_waker(&waker_reader); let controller = Controller { ctl_send, @@ -429,7 +427,6 @@ impl Runtime { listeners: empty!(), transports: empty!(), waker: waker_reader, - waker_id, timeouts: Timer::new(), }) } @@ -512,7 +509,7 @@ impl Runtime { let mut unregister_queue = vec![]; while let Some((id, res)) = self.poller.next() { - if id == self.waker_id { + if id == ResourceId::WAKER { if let Err(err) = res { #[cfg(feature = "log")] log::error!(target: "reactor", "Polling waker has failed: {err}"); diff --git a/src/resource.rs b/src/resource.rs index 2977c63..d94d8a4 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -38,17 +38,31 @@ pub enum Io { Write, } +#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] +#[display(inner)] +pub struct ResourceIdGenerator(u64); + +impl Default for ResourceIdGenerator { + fn default() -> Self { ResourceIdGenerator(1) } +} + +#[allow(dead_code)] // We need this before we've got non-popol implementations +impl ResourceIdGenerator { + pub fn next(&mut self) -> ResourceId { + let id = self.0; + self.0 += 1; + ResourceId(id) + } +} + /// The resource identifier must be globally unique and non-reusable object. Because of this, /// things like [`RawFd`] and socket addresses can't operate like resource identifiers. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)] #[display(inner)] pub struct ResourceId(u64); -#[allow(dead_code)] // We need this before we've got non-popol implementations impl ResourceId { - pub(crate) const ZERO: ResourceId = ResourceId(0); - - pub(crate) fn inc(&mut self) { self.0 += 1 } + pub const WAKER: ResourceId = ResourceId(0); } /// A resource which can be managed by the reactor.