Skip to content

Commit

Permalink
new mechanism for resource id generation
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed Jan 4, 2024
1 parent 93b289f commit e337475
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ mod reactor;
mod resource;
mod timeouts;

pub use resource::{Io, Resource, ResourceId, WriteAtomic, WriteError};
pub use resource::{Io, Resource, ResourceId, ResourceIdGenerator, WriteAtomic, WriteError};
pub use timeouts::{Timer, Timestamp};

pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime};
2 changes: 2 additions & 0 deletions src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ where
/// Waker type used by the poll provider.
type Waker: Waker;

/// Registers a waker object.
fn register_waker(&mut self, fd: &impl AsRawFd);
/// Registers a file-descriptor based resource for a poll.
fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId;
/// Unregisters a file-descriptor based resource from a poll.
Expand Down
22 changes: 16 additions & 6 deletions src/poller/popol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use std::sync::Arc;
use std::time::Duration;

use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::ResourceId;
use crate::{ResourceId, ResourceIdGenerator};

/// Manager for a set of reactor which are polled for an event loop by the
/// re-actor by using [`popol`] library.
pub struct Poller {
poll: popol::Sources<ResourceId>,
events: VecDeque<popol::Event<ResourceId>>,
id_top: ResourceId,
id_gen: ResourceIdGenerator,
}

impl Default for Poller {
Expand All @@ -50,7 +50,7 @@ impl Poller {
Self {
poll: popol::Sources::new(),
events: empty!(),
id_top: ResourceId::ZERO,
id_gen: ResourceIdGenerator::default(),
}
}

Expand All @@ -60,17 +60,27 @@ impl Poller {
Self {
poll: popol::Sources::with_capacity(capacity),
events: VecDeque::with_capacity(capacity),
id_top: ResourceId::ZERO,
id_gen: ResourceIdGenerator::default(),
}
}
}

impl Poll for Poller {
type Waker = PopolWaker;

fn register_waker(&mut self, fd: &impl AsRawFd) {
let id = ResourceId::WAKER;
if self.poll.get(&id).is_some() {
#[cfg(feature = "log")]
log::error!(target: "popol", "Reactor waker is already registered, terminating");
panic!("Reactor waker is already registered");
}

self.poll.register(id, fd, popol::interest::READ);
}

fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId {
let id = self.id_top;
self.id_top.inc();
let id = self.id_gen.next();

#[cfg(feature = "log")]
log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id);
Expand Down
9 changes: 3 additions & 6 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl<C, P: Poll> Reactor<C, P> {
let thread = builder.spawn(move || {
#[cfg(feature = "log")]
log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
let waker_id = poller.register(&waker_reader, IoType::read_only());
poller.register_waker(&waker_reader);

let runtime = Runtime {
service,
Expand All @@ -289,7 +289,6 @@ impl<C, P: Poll> Reactor<C, P> {
listeners: empty!(),
transports: empty!(),
waker: waker_reader,
waker_id,
timeouts: Timer::new(),
};

Expand Down Expand Up @@ -400,7 +399,6 @@ pub struct Runtime<H: Handler, P: Poll> {
listeners: HashMap<ResourceId, H::Listener>,
transports: HashMap<ResourceId, H::Transport>,
waker: <P::Waker as Waker>::Recv,
waker_id: ResourceId,
timeouts: Timer,
}

Expand All @@ -414,7 +412,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {

#[cfg(feature = "log")]
log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
let waker_id = poller.register(&waker_reader, IoType::read_only());
poller.register_waker(&waker_reader);

let controller = Controller {
ctl_send,
Expand All @@ -429,7 +427,6 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
listeners: empty!(),
transports: empty!(),
waker: waker_reader,
waker_id,
timeouts: Timer::new(),
})
}
Expand Down Expand Up @@ -512,7 +509,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {

let mut unregister_queue = vec![];
while let Some((id, res)) = self.poller.next() {
if id == self.waker_id {
if id == ResourceId::WAKER {
if let Err(err) = res {
#[cfg(feature = "log")]
log::error!(target: "reactor", "Polling waker has failed: {err}");
Expand Down
22 changes: 18 additions & 4 deletions src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,31 @@ pub enum Io {
Write,
}

#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)]
#[display(inner)]
pub struct ResourceIdGenerator(u64);

impl Default for ResourceIdGenerator {
fn default() -> Self { ResourceIdGenerator(1) }
}

#[allow(dead_code)] // We need this before we've got non-popol implementations
impl ResourceIdGenerator {
pub fn next(&mut self) -> ResourceId {
let id = self.0;
self.0 += 1;
ResourceId(id)
}
}

/// The resource identifier must be globally unique and non-reusable object. Because of this,
/// things like [`RawFd`] and socket addresses can't operate like resource identifiers.
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug, Display)]
#[display(inner)]
pub struct ResourceId(u64);

#[allow(dead_code)] // We need this before we've got non-popol implementations
impl ResourceId {
pub(crate) const ZERO: ResourceId = ResourceId(0);

pub(crate) fn inc(&mut self) { self.0 += 1 }
pub const WAKER: ResourceId = ResourceId(0);
}

/// A resource which can be managed by the reactor.
Expand Down

0 comments on commit e337475

Please sign in to comment.