Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sender: new parameter to I/O event callbacks allowing to talk to the source #29

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ pub use resource::{
};
pub use timeouts::{Timer, Timestamp};

pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime};
pub use self::reactor::{Action, Controller, Error, Handler, Reactor, Runtime, Sender};
46 changes: 39 additions & 7 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@

#![allow(unused_variables)] // because we need them for feature-gated logger

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Display, Formatter};
use std::os::unix::io::{AsRawFd, RawFd};
use std::thread::JoinHandle;
use std::time::Duration;
use std::{io, thread};

use crossbeam_channel as chan;
use crossbeam_channel::SendError;

use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::resource::WriteError;
Expand Down Expand Up @@ -120,7 +121,7 @@ pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport
/// connections, database connections etc are all fall into this category.
type Transport: Resource;

/// A command which may be sent to the [`Handler`] from outside of the [`Reactor`], including
/// A command which may be sent to the [`Handler`] from outside the [`Reactor`], including
/// other threads.
///
/// The handler object is owned by the reactor runtime and executes always in the context of the
Expand Down Expand Up @@ -148,6 +149,7 @@ pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport
id: ResourceId,
event: <Self::Listener as Resource>::Event,
time: Timestamp,
sender: Controller<Self::Command, impl WakerSend>,
);

/// Method called by the reactor upon I/O event on a transport resource.
Expand All @@ -156,6 +158,7 @@ pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport
id: ResourceId,
event: <Self::Transport as Resource>::Event,
time: Timestamp,
sender: Controller<Self::Command, impl WakerSend>,
);

/// Method called by the reactor when a given resource was successfully registered and provided
Expand Down Expand Up @@ -292,6 +295,7 @@ impl<C, P: Poll> Reactor<C, P> {
transports: empty!(),
waker: waker_reader,
timeouts: Timer::new(),
actions: empty!(),
};

#[cfg(feature = "log")]
Expand All @@ -315,8 +319,9 @@ impl<C, P: Poll> Reactor<C, P> {
pub fn join(self) -> thread::Result<()> { self.thread.join() }
}

enum Ctl<C> {
pub enum Ctl<C> {
Cmd(C),
Send(ResourceId, Vec<u8>),
Shutdown,
}

Expand Down Expand Up @@ -386,6 +391,23 @@ impl<C, W: WakerSend> Controller<C, W> {
}
}

/// Sender, holding [`Controller`], aware of a specific resource which was the source of the reactor
/// I/O event.
pub struct Sender<C, W: WakerSend> {
controller: Controller<C, W>,
resource_id: ResourceId,
}

impl<C, W: WakerSend> Sender<C, W> {
/// Sends data to a source of the I/O event, generated inside the reactor and passed to this
/// [`Sender`] instance.
pub fn send(&self, data: impl ToOwned<Owned = Vec<u8>>) -> Result<(), SendError<Ctl<C>>>
where C: 'static {
self.controller.ctl_send.send(Ctl::Send(self.resource_id, data.to_owned()))?;
Ok(())
}
}

/// Internal [`Reactor`] runtime which is run in a dedicated thread.
///
/// Use this structure direactly only if you'd like to have the full control over the reactor
Expand All @@ -402,6 +424,7 @@ pub struct Runtime<H: Handler, P: Poll> {
transports: HashMap<ResourceId, H::Transport>,
waker: <P::Waker as Waker>::Recv,
timeouts: Timer,
actions: VecDeque<Action<H::Listener, H::Transport>>,
}

impl<H: Handler, P: Poll> Runtime<H, P> {
Expand Down Expand Up @@ -430,11 +453,12 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
transports: empty!(),
waker: waker_reader,
timeouts: Timer::new(),
actions: empty!(),
})
}

/// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service
/// running inside of its thread.
/// running inside its thread.
///
/// See [`Handler::Command`] for the details.
pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
Expand Down Expand Up @@ -494,6 +518,9 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
panic!("control channel is broken")
}
Ok(Ctl::Shutdown) => return self.handle_shutdown(),
Ok(Ctl::Send(id, data)) => {
self.actions.push_back(Action::Send(id, data));
}
Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd),
}
}
Expand Down Expand Up @@ -528,10 +555,11 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Got `{io}` event from listener {id}");

let sender = self.controller();
let listener = self.listeners.get_mut(&id).expect("resource disappeared");
for io in io {
if let Some(event) = listener.handle_io(io) {
self.service.handle_listener_event(id, event, time);
self.service.handle_listener_event(id, event, time, sender.clone());
}
}
}
Expand All @@ -549,10 +577,12 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Got `{io}` event from transport {id}");

let sender = self.controller();
let transport = self.transports.get_mut(&id).expect("resource disappeared");
for io in io {
if let Some(event) = transport.handle_io(io) {
self.service.handle_transport_event(id, event, time);
let sender = sender.clone();
self.service.handle_transport_event(id, event, time, sender);
}
}
}
Expand All @@ -575,7 +605,7 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
}

fn handle_actions(&mut self, time: Timestamp) {
while let Some(action) = self.service.next() {
while let Some(action) = self.actions.pop_front().or_else(|| self.service.next()) {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Handling action {action} from the service");

Expand Down Expand Up @@ -790,6 +820,7 @@ mod test {
_d: ResourceId,
_event: <Self::Listener as Resource>::Event,
_time: Timestamp,
_sender: Controller<Cmd, impl WakerSend>,
) {
unreachable!()
}
Expand All @@ -798,6 +829,7 @@ mod test {
_id: ResourceId,
_event: <Self::Transport as Resource>::Event,
_time: Timestamp,
_sender: Controller<Cmd, impl WakerSend>,
) {
unreachable!()
}
Expand Down
Loading