-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
driver: add support for registering file descriptors with user-specified flags #6089
base: master
Are you sure you want to change the base?
Changes from 8 commits
e069888
7697b3a
9df1554
c031872
8f0f23d
64ae8c3
c63ee40
ec801c9
0f9f229
54c3096
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -240,6 +240,27 @@ impl TcpListener { | |
Ok(TcpListener { io }) | ||
} | ||
|
||
/// Create a new TcpListener with the provided raw epoll flags. | ||
/// | ||
/// These flags replace any epoll flags would normally set when registering the fd. | ||
/// | ||
/// **Note**: This is an [unstable API][unstable]. The public API of this may break in 1.x | ||
/// releases. | ||
/// See [the documentation on unstable features][unstable] for details. | ||
/// | ||
/// [unstable]: crate#unstable-features | ||
#[track_caller] | ||
#[cfg(all(target_os = "linux", tokio_unstable))] | ||
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)), doc(cfg(target_os = "linux")))] | ||
pub fn from_std_with_epoll_flags( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rather not have these methods on |
||
listener: net::TcpListener, | ||
flags: u32, | ||
) -> io::Result<TcpListener> { | ||
let io = mio::net::TcpListener::from_std(listener); | ||
let io = PollEvented::new_raw(io, flags)?; | ||
Ok(TcpListener { io }) | ||
} | ||
|
||
/// Turns a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`]. | ||
/// | ||
/// The returned [`std::net::TcpListener`] will have nonblocking mode set as | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,27 @@ impl UnixListener { | |
Ok(UnixListener { io }) | ||
} | ||
|
||
/// Create a new UnixListener with the provided raw epoll flags. | ||
/// | ||
/// These flags replace any epoll flags would normally set when registering the fd. | ||
/// | ||
/// **Note**: This is an [unstable API][unstable]. The public API of this may break in 1.x | ||
/// releases. | ||
/// See [the documentation on unstable features][unstable] for details. | ||
/// | ||
/// [unstable]: crate#unstable-features | ||
#[track_caller] | ||
#[cfg(all(target_os = "linux", tokio_unstable))] | ||
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)), doc(cfg(target_os = "linux")))] | ||
pub fn from_std_with_epoll_flags( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the TCP method is added to TcpSocket, this will need to be removed and either skipped initially or we will need a UnixSocket API. |
||
listener: net::UnixListener, | ||
flags: u32, | ||
) -> io::Result<UnixListener> { | ||
let io = mio::net::UnixListener::from_std(listener); | ||
let io = PollEvented::new_raw(io, flags)?; | ||
Ok(UnixListener { io }) | ||
} | ||
|
||
/// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. | ||
/// | ||
/// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -236,6 +236,54 @@ impl Handle { | |
Ok(scheduled_io) | ||
} | ||
|
||
/// Registers an I/O resource with the reactor, bypassing Mio entirely and using raw epoll_ctl. | ||
/// | ||
/// This is more or less the copy-pasted from the Mio code, and exists so that we can use flags | ||
/// other than the base set of epoll ones we normally use and those representing interests. | ||
/// | ||
/// This is important for supporting things like `EPOLLEXCLUSIVE`, which is very useful for | ||
/// shared-nothing runtimes. | ||
/// | ||
/// The registries token is returned. | ||
#[cfg(all(target_os = "linux", tokio_unstable))] | ||
pub(super) fn add_source_raw( | ||
&self, | ||
source: &mut impl std::os::unix::io::AsRawFd, | ||
flags: u32, | ||
) -> io::Result<Arc<ScheduledIo>> { | ||
use libc::EPOLLET; | ||
use std::os::unix::io::AsRawFd; | ||
|
||
let events = EPOLLET as u32 | flags; | ||
|
||
let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?; | ||
let token = scheduled_io.token(); | ||
|
||
let mut event = libc::epoll_event { | ||
events, | ||
u64: usize::from(token) as u64, | ||
}; | ||
|
||
// TODO: if this returns an err, the `ScheduledIo` leaks... | ||
let res = unsafe { | ||
libc::epoll_ctl( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What guarantees are made by Mio around it's fd? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that this behavior is not guaranteed, but we don't really say what behavior is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which also I suppose begs the question "if it isn't here so people can do this as an escape hatch, why is it here at all?" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That being said, there's a deeper question here, which is "do we need Interest::CUSTOM() in Mio, and how would that work?" Personally, I think if we're going to continue to use Mio within tokio, we probably need something like this. |
||
self.registry.as_raw_fd(), | ||
libc::EPOLL_CTL_ADD, | ||
source.as_raw_fd(), | ||
&mut event, | ||
) | ||
}; | ||
|
||
if res == -1 { | ||
return Err(std::io::Error::last_os_error()); | ||
} | ||
|
||
// TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList` | ||
self.metrics.incr_fd_count(); | ||
|
||
Ok(scheduled_io) | ||
} | ||
|
||
/// Deregisters an I/O resource from the reactor. | ||
pub(super) fn deregister_source( | ||
&self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
#![cfg(all( | ||
target_os = "linux", | ||
feature = "net", | ||
feature = "rt", | ||
feature = "sync", | ||
feature = "macros", | ||
feature = "time", | ||
tokio_unstable, | ||
))] | ||
|
||
use std::sync::Arc; | ||
use std::thread; | ||
use tokio::sync::Barrier; | ||
|
||
const NUM_WORKERS: usize = 8; | ||
const NUM_CONNECTIONS: u64 = 32; | ||
|
||
const FUDGE_MIN: f64 = 0.75; | ||
const FUDGE_MAX: f64 = 1.25; | ||
|
||
#[test] | ||
fn epoll_exclusive() { | ||
let value = count_accepts_with_flags(NUM_WORKERS, NUM_CONNECTIONS, libc::EPOLLEXCLUSIVE as u32); | ||
|
||
let actual_to_expected_ratio = value as f64 / NUM_CONNECTIONS as f64; | ||
|
||
assert!( | ||
actual_to_expected_ratio >= FUDGE_MIN && actual_to_expected_ratio <= FUDGE_MAX, | ||
"expected fuzzy {}, got {}", | ||
NUM_CONNECTIONS, | ||
value | ||
); | ||
} | ||
|
||
fn count_accepts_with_flags(workers: usize, connections: u64, flags: u32) -> u64 { | ||
let barrier = Arc::new(Barrier::new(workers as usize + 1)); | ||
|
||
let mut handles = Vec::with_capacity(workers); | ||
|
||
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); | ||
|
||
let listener_addr = listener.local_addr().unwrap(); | ||
|
||
for _ in 0..workers { | ||
let local_listener = listener.try_clone().unwrap(); | ||
let local_barrier = barrier.clone(); | ||
|
||
handles.push(thread::spawn(move || { | ||
count_accepts(local_listener, flags | libc::EPOLLIN as u32, local_barrier) | ||
})) | ||
} | ||
|
||
let rt = tokio::runtime::Builder::new_current_thread() | ||
.enable_all() | ||
.build() | ||
.unwrap(); | ||
|
||
rt.block_on(async { | ||
barrier.wait().await; | ||
|
||
for _ in 0..connections { | ||
tokio::net::TcpStream::connect(listener_addr).await.unwrap(); | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
} | ||
|
||
barrier.wait().await; | ||
}); | ||
|
||
let mut num_accepts_total = 0; | ||
|
||
for handle in handles { | ||
num_accepts_total += handle.join().unwrap(); | ||
} | ||
|
||
num_accepts_total | ||
} | ||
|
||
fn count_accepts(std: std::net::TcpListener, flags: u32, barrier: Arc<Barrier>) -> u64 { | ||
let rt = tokio::runtime::Builder::new_current_thread() | ||
.enable_all() | ||
.build() | ||
.unwrap(); | ||
|
||
rt.block_on(async { | ||
std.set_nonblocking(true).unwrap(); | ||
|
||
let listener = tokio::net::TcpListener::from_std_with_epoll_flags(std, flags).unwrap(); | ||
|
||
barrier.wait().await; | ||
|
||
let mut barr_wait = std::pin::pin!(barrier.wait()); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = &mut barr_wait => { | ||
return tokio::runtime::Handle::current().metrics().io_driver_ready_count(); | ||
} | ||
a = listener.accept() => { | ||
a.unwrap(); | ||
} | ||
} | ||
} | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add to the documentation that
EPOLLONESHOT
must not be used, and thatEPOLLET
must be set. And please add debug asserts for this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.