Skip to content

Commit

Permalink
Sender type able to send data directly to the source of an I/O event
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed Jul 21, 2024
1 parent 773fff9 commit d89c3f9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ pub use resource::{
};
pub use timeouts::{Timer, Timestamp};

pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime};
pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime, Sender};
33 changes: 29 additions & 4 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@

#![allow(unused_variables)] // because we need them for feature-gated logger

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Display, Formatter};
use std::os::unix::io::{AsRawFd, RawFd};
use std::thread::JoinHandle;
use std::time::Duration;
use std::{io, thread};

use crossbeam_channel as chan;
use crossbeam_channel::SendError;

use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::resource::WriteError;
Expand Down Expand Up @@ -294,6 +295,7 @@ impl<C, P: Poll> Reactor<C, P> {
transports: empty!(),
waker: waker_reader,
timeouts: Timer::new(),
actions: empty!(),
};

#[cfg(feature = "log")]
Expand All @@ -317,8 +319,9 @@ impl<C, P: Poll> Reactor<C, P> {
pub fn join(self) -> thread::Result<()> { self.thread.join() }
}

enum Ctl<C> {
pub enum Ctl<C> {
Cmd(C),
Send(ResourceId, Vec<u8>),
Shutdown,
}

Expand Down Expand Up @@ -388,6 +391,23 @@ impl<C, W: WakerSend> Controller<C, W> {
}
}

/// Sender, holding [`Controller`], aware of a specific resource which was the source of the reactor
/// I/O event.
pub struct Sender<C, W: WakerSend> {
controller: Controller<C, W>,
resource_id: ResourceId,
}

impl<C, W: WakerSend> Sender<C, W> {
/// Sends data to a source of the I/O event, generated inside the reactor and passed to this
/// [`Sender`] instance.
pub fn send(&self, data: impl ToOwned<Owned = Vec<u8>>) -> Result<(), SendError<Ctl<C>>>
where C: 'static {
self.controller.ctl_send.send(Ctl::Send(self.resource_id, data.to_owned()))?;
Ok(())
}
}

/// Internal [`Reactor`] runtime which is run in a dedicated thread.
///
/// Use this structure direactly only if you'd like to have the full control over the reactor
Expand All @@ -404,6 +424,7 @@ pub struct Runtime<H: Handler, P: Poll> {
transports: HashMap<ResourceId, H::Transport>,
waker: <P::Waker as Waker>::Recv,
timeouts: Timer,
actions: VecDeque<Action<H::Listener, H::Transport>>,
}

impl<H: Handler, P: Poll> Runtime<H, P> {
Expand Down Expand Up @@ -432,11 +453,12 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
transports: empty!(),
waker: waker_reader,
timeouts: Timer::new(),
actions: empty!(),
})
}

/// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service
/// running inside of its thread.
/// running inside its thread.
///
/// See [`Handler::Command`] for the details.
pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
Expand Down Expand Up @@ -496,6 +518,9 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
panic!("control channel is broken")
}
Ok(Ctl::Shutdown) => return self.handle_shutdown(),
Ok(Ctl::Send(id, data)) => {
self.actions.push_back(Action::Send(id, data));
}
Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd),
}
}
Expand Down Expand Up @@ -580,7 +605,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
}

fn handle_actions(&mut self, time: Timestamp) {
while let Some(action) = self.service.next() {
while let Some(action) = self.actions.pop_front().or_else(|| self.service.next()) {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Handling action {action} from the service");

Expand Down

0 comments on commit d89c3f9

Please sign in to comment.