From 773fff93b1b383cf99d9eab852e0cd605b90d8e2 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Sun, 21 Jul 2024 23:15:48 +0200 Subject: [PATCH 1/2] pass controller in handle_event calls --- src/reactor.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index a7f8bb1..1206fe7 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -120,7 +120,7 @@ pub trait Handler: Send + Iterator::Event, time: Timestamp, + sender: Controller, ); /// Method called by the reactor upon I/O event on a transport resource. @@ -156,6 +157,7 @@ pub trait Handler: Send + Iterator::Event, time: Timestamp, + sender: Controller, ); /// Method called by the reactor when a given resource was successfully registered and provided @@ -528,10 +530,11 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Got `{io}` event from listener {id}"); + let sender = self.controller(); let listener = self.listeners.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = listener.handle_io(io) { - self.service.handle_listener_event(id, event, time); + self.service.handle_listener_event(id, event, time, sender.clone()); } } } @@ -549,10 +552,12 @@ impl Runtime { #[cfg(feature = "log")] log::trace!(target: "reactor", "Got `{io}` event from transport {id}"); + let sender = self.controller(); let transport = self.transports.get_mut(&id).expect("resource disappeared"); for io in io { if let Some(event) = transport.handle_io(io) { - self.service.handle_transport_event(id, event, time); + let sender = sender.clone(); + self.service.handle_transport_event(id, event, time, sender); } } } @@ -790,6 +795,7 @@ mod test { _d: ResourceId, _event: ::Event, _time: Timestamp, + _sender: Controller, ) { unreachable!() } @@ -798,6 +804,7 @@ mod test { _id: ResourceId, _event: ::Event, _time: Timestamp, + _sender: Controller, ) { unreachable!() } From d89c3f91bcbc905dc1b2227fc8ae2a34c82988c7 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Sun, 21 Jul 2024 23:27:45 +0200 Subject: [PATCH 2/2] Sender type able to send data directly to the source of an I/O event --- src/lib.rs | 2 +- src/reactor.rs | 33 +++++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a730b5d..66cf9d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,4 +81,4 @@ pub use resource::{ }; pub use timeouts::{Timer, Timestamp}; -pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime}; +pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime, Sender}; diff --git a/src/reactor.rs b/src/reactor.rs index 1206fe7..935b874 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -23,7 +23,7 @@ #![allow(unused_variables)] // because we need them for feature-gated logger -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::os::unix::io::{AsRawFd, RawFd}; use std::thread::JoinHandle; @@ -31,6 +31,7 @@ use std::time::Duration; use std::{io, thread}; use crossbeam_channel as chan; +use crossbeam_channel::SendError; use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend}; use crate::resource::WriteError; @@ -294,6 +295,7 @@ impl Reactor { transports: empty!(), waker: waker_reader, timeouts: Timer::new(), + actions: empty!(), }; #[cfg(feature = "log")] @@ -317,8 +319,9 @@ impl Reactor { pub fn join(self) -> thread::Result<()> { self.thread.join() } } -enum Ctl { +pub enum Ctl { Cmd(C), + Send(ResourceId, Vec), Shutdown, } @@ -388,6 +391,23 @@ impl Controller { } } +/// Sender, holding [`Controller`], aware of a specific resource which was the source of the reactor +/// I/O event. +pub struct Sender { + controller: Controller, + resource_id: ResourceId, +} + +impl Sender { + /// Sends data to a source of the I/O event, generated inside the reactor and passed to this + /// [`Sender`] instance. + pub fn send(&self, data: impl ToOwned>) -> Result<(), SendError>> + where C: 'static { + self.controller.ctl_send.send(Ctl::Send(self.resource_id, data.to_owned()))?; + Ok(()) + } +} + /// Internal [`Reactor`] runtime which is run in a dedicated thread. /// /// Use this structure direactly only if you'd like to have the full control over the reactor @@ -404,6 +424,7 @@ pub struct Runtime { transports: HashMap, waker: ::Recv, timeouts: Timer, + actions: VecDeque>, } impl Runtime { @@ -432,11 +453,12 @@ impl Runtime { transports: empty!(), waker: waker_reader, timeouts: Timer::new(), + actions: empty!(), }) } /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service - /// running inside of its thread. + /// running inside its thread. /// /// See [`Handler::Command`] for the details. pub fn controller(&self) -> Controller::Send> { @@ -496,6 +518,9 @@ impl Runtime { panic!("control channel is broken") } Ok(Ctl::Shutdown) => return self.handle_shutdown(), + Ok(Ctl::Send(id, data)) => { + self.actions.push_back(Action::Send(id, data)); + } Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd), } } @@ -580,7 +605,7 @@ impl Runtime { } fn handle_actions(&mut self, time: Timestamp) { - while let Some(action) = self.service.next() { + while let Some(action) = self.actions.pop_front().or_else(|| self.service.next()) { #[cfg(feature = "log")] log::trace!(target: "reactor", "Handling action {action} from the service");