diff --git a/src/lib.rs b/src/lib.rs index a730b5d..66cf9d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,4 +81,4 @@ pub use resource::{ }; pub use timeouts::{Timer, Timestamp}; -pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime}; +pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime, Sender}; diff --git a/src/reactor.rs b/src/reactor.rs index 1206fe7..935b874 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -23,7 +23,7 @@ #![allow(unused_variables)] // because we need them for feature-gated logger -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::os::unix::io::{AsRawFd, RawFd}; use std::thread::JoinHandle; @@ -31,6 +31,7 @@ use std::time::Duration; use std::{io, thread}; use crossbeam_channel as chan; +use crossbeam_channel::SendError; use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend}; use crate::resource::WriteError; @@ -294,6 +295,7 @@ impl Reactor { transports: empty!(), waker: waker_reader, timeouts: Timer::new(), + actions: empty!(), }; #[cfg(feature = "log")] @@ -317,8 +319,9 @@ impl Reactor { pub fn join(self) -> thread::Result<()> { self.thread.join() } } -enum Ctl { +pub enum Ctl { Cmd(C), + Send(ResourceId, Vec), Shutdown, } @@ -388,6 +391,23 @@ impl Controller { } } +/// Sender, holding [`Controller`], aware of a specific resource which was the source of the reactor +/// I/O event. +pub struct Sender { + controller: Controller, + resource_id: ResourceId, +} + +impl Sender { + /// Sends data to a source of the I/O event, generated inside the reactor and passed to this + /// [`Sender`] instance. + pub fn send(&self, data: impl ToOwned>) -> Result<(), SendError>> + where C: 'static { + self.controller.ctl_send.send(Ctl::Send(self.resource_id, data.to_owned()))?; + Ok(()) + } +} + /// Internal [`Reactor`] runtime which is run in a dedicated thread. /// /// Use this structure direactly only if you'd like to have the full control over the reactor @@ -404,6 +424,7 @@ pub struct Runtime { transports: HashMap, waker: ::Recv, timeouts: Timer, + actions: VecDeque>, } impl Runtime { @@ -432,11 +453,12 @@ impl Runtime { transports: empty!(), waker: waker_reader, timeouts: Timer::new(), + actions: empty!(), }) } /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service - /// running inside of its thread. + /// running inside its thread. /// /// See [`Handler::Command`] for the details. pub fn controller(&self) -> Controller::Send> { @@ -496,6 +518,9 @@ impl Runtime { panic!("control channel is broken") } Ok(Ctl::Shutdown) => return self.handle_shutdown(), + Ok(Ctl::Send(id, data)) => { + self.actions.push_back(Action::Send(id, data)); + } Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd), } } @@ -580,7 +605,7 @@ impl Runtime { } fn handle_actions(&mut self, time: Timestamp) { - while let Some(action) = self.service.next() { + while let Some(action) = self.actions.pop_front().or_else(|| self.service.next()) { #[cfg(feature = "log")] log::trace!(target: "reactor", "Handling action {action} from the service");