From d4111baba88920ea0c2ae9b9e7720a9b6c823d36 Mon Sep 17 00:00:00 2001 From: dswij Date: Sat, 27 Apr 2024 16:32:41 +0800 Subject: [PATCH 1/5] fix: insufficient buf size when reading windows named pipe message --- src/sys/windows/named_pipe.rs | 59 +++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index eb35d3797..00eea8e2b 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -8,13 +8,13 @@ use std::{fmt, mem, slice}; use windows_sys::Win32::Foundation::{ ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, - ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE, + ERROR_PIPE_LISTENING, ERROR_MORE_DATA, HANDLE, INVALID_HANDLE_VALUE, }; use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, }; use windows_sys::Win32::System::Pipes::{ - ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE, + ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PeekNamedPipe, PIPE_TYPE_BYTE, PIPE_UNLIMITED_INSTANCES, }; use windows_sys::Win32::System::IO::{ @@ -307,6 +307,18 @@ impl Inner { Ok(transferred as usize) } } + + /// Calls the `PeekNamedPipe` function to get the remaining size of message in NamedPipe + #[inline] + unsafe fn remaining_size(&self) -> io::Result { + let mut remaining = 0; + let r = PeekNamedPipe(self.handle.raw(), std::ptr::null_mut(), 0, std::ptr::null_mut(), std::ptr::null_mut(), &mut remaining); + if r == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(remaining as usize) + } + } } #[test] @@ -349,6 +361,7 @@ enum State { Pending(Vec, usize), Ok(Vec, usize), Err(io::Error), + InsufficientBufferSize(Vec, usize), } // Odd tokens are for named pipes @@ -535,7 +548,7 @@ impl<'a> Read for &'a NamedPipe { } // We previously read something into `data`, try to copy out some - // data. If we copy out all the data schedule a new read and + // data. If we copy out all the data, schedule a new read // otherwise store the buffer to get read later. State::Ok(data, cur) => { let n = { @@ -552,6 +565,13 @@ impl<'a> Read for &'a NamedPipe { Ok(n) } + // Schedule another read with a bigger buffer + e @ State::InsufficientBufferSize(..) => { + state.read = e; + Inner::schedule_read(&self.inner, &mut state, None); + Err(would_block()) + } + // Looks like an in-flight read hit an error, return that here while // we schedule a new one. State::Err(e) => { @@ -703,19 +723,24 @@ impl Inner { /// scheduled. fn schedule_read(me: &Arc, io: &mut Io, events: Option<&mut Vec>) -> bool { // Check to see if a read is already scheduled/completed - match io.read { - State::None => {} - _ => return true, - } + let mut buf = match mem::replace(&mut io.read, State::None) { + State::None => me.get_buffer(), + State::InsufficientBufferSize(mut buf, rem) => { + buf.reserve(rem); + buf + } + e @ _ => { + io.read = e; + return true; + } + }; // Allocate a buffer and schedule the read. - let mut buf = me.get_buffer(); let e = unsafe { let overlapped = me.read.as_ptr() as *mut _; let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); me.read_overlapped(slice, overlapped) }; - match e { // See `NamedPipe::connect` above for the rationale behind `forget` Ok(_) => { @@ -874,9 +899,23 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { match me.result(status.overlapped()) { Ok(n) => { debug_assert_eq!(status.bytes_transferred() as usize, n); - buf.set_len(status.bytes_transferred() as usize); + // Extend the len depending on the initial len is necessary + // when we call `ReadFile` again after resizing + // our internal buffer + buf.set_len(buf.len() + status.bytes_transferred() as usize); io.read = State::Ok(buf, 0); } + Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => { + match me.remaining_size() { + Ok(rem) => { + buf.set_len(status.bytes_transferred() as usize); + io.read = State::InsufficientBufferSize(buf, rem); + } + Err(e) => { + io.read = State::Err(e); + } + } + } Err(e) => { debug_assert_eq!(status.bytes_transferred(), 0); io.read = State::Err(e); From 6d816220388e6b2799d6694763ff1f1baf18051f Mon Sep 17 00:00:00 2001 From: dswij Date: Sun, 28 Apr 2024 10:53:55 +0800 Subject: [PATCH 2/5] handling `ERROR_MORE_DATA` does not notify read event --- src/sys/windows/named_pipe.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index 00eea8e2b..8eca67a8a 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -7,8 +7,8 @@ use std::sync::{Arc, Mutex}; use std::{fmt, mem, slice}; use windows_sys::Win32::Foundation::{ - ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED, - ERROR_PIPE_LISTENING, ERROR_MORE_DATA, HANDLE, INVALID_HANDLE_VALUE, + ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_MORE_DATA, ERROR_NO_DATA, + ERROR_PIPE_CONNECTED, ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE, }; use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX, @@ -312,7 +312,14 @@ impl Inner { #[inline] unsafe fn remaining_size(&self) -> io::Result { let mut remaining = 0; - let r = PeekNamedPipe(self.handle.raw(), std::ptr::null_mut(), 0, std::ptr::null_mut(), std::ptr::null_mut(), &mut remaining); + let r = PeekNamedPipe( + self.handle.raw(), + std::ptr::null_mut(), + 0, + std::ptr::null_mut(), + std::ptr::null_mut(), + &mut remaining, + ); if r == 0 { Err(io::Error::last_os_error()) } else { @@ -565,12 +572,9 @@ impl<'a> Read for &'a NamedPipe { Ok(n) } - // Schedule another read with a bigger buffer - e @ State::InsufficientBufferSize(..) => { - state.read = e; - Inner::schedule_read(&self.inner, &mut state, None); - Err(would_block()) - } + // We scheduled another read with a bigger buffer after the first read (see `read_done`) + // This is not possible in theory, just like `State::None` case, but return would block for now. + State::InsufficientBufferSize(..) => Err(would_block()), // Looks like an in-flight read hit an error, return that here while // we schedule a new one. @@ -723,10 +727,11 @@ impl Inner { /// scheduled. fn schedule_read(me: &Arc, io: &mut Io, events: Option<&mut Vec>) -> bool { // Check to see if a read is already scheduled/completed + let mut buf = match mem::replace(&mut io.read, State::None) { State::None => me.get_buffer(), State::InsufficientBufferSize(mut buf, rem) => { - buf.reserve(rem); + buf.reserve_exact(rem); buf } e @ _ => { @@ -739,7 +744,7 @@ impl Inner { let e = unsafe { let overlapped = me.read.as_ptr() as *mut _; let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); - me.read_overlapped(slice, overlapped) + me.read_overlapped(&mut slice[buf.len()..], overlapped) }; match e { // See `NamedPipe::connect` above for the rationale behind `forget` @@ -910,6 +915,8 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { Ok(rem) => { buf.set_len(status.bytes_transferred() as usize); io.read = State::InsufficientBufferSize(buf, rem); + Inner::schedule_read(&me, &mut io, None); + return; } Err(e) => { io.read = State::Err(e); From ce98957dd73d3a55a7be51b893e63835fb651504 Mon Sep 17 00:00:00 2001 From: dswij Date: Sun, 28 Apr 2024 10:54:33 +0800 Subject: [PATCH 3/5] test: test handling for ERROR_MORE_DATA --- tests/win_named_pipe.rs | 84 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/tests/win_named_pipe.rs b/tests/win_named_pipe.rs index e79d2fba4..044934849 100644 --- a/tests/win_named_pipe.rs +++ b/tests/win_named_pipe.rs @@ -1,15 +1,24 @@ #![cfg(all(windows, feature = "os-poll", feature = "os-ext"))] +use std::ffi::OsStr; use std::fs::OpenOptions; use std::io::{self, Read, Write}; +use std::os::windows::ffi::OsStrExt; use std::os::windows::fs::OpenOptionsExt; -use std::os::windows::io::{FromRawHandle, IntoRawHandle}; +use std::os::windows::io::{FromRawHandle, IntoRawHandle, RawHandle}; use std::time::Duration; use mio::windows::NamedPipe; use mio::{Events, Interest, Poll, Token}; use rand::Rng; -use windows_sys::Win32::{Foundation::ERROR_NO_DATA, Storage::FileSystem::FILE_FLAG_OVERLAPPED}; +use windows_sys::Win32::Foundation::ERROR_NO_DATA; +use windows_sys::Win32::Storage::FileSystem::{ + CreateFileW, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, OPEN_EXISTING, + PIPE_ACCESS_DUPLEX, +}; +use windows_sys::Win32::System::Pipes::{ + CreateNamedPipeW, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES, +}; fn _assert_kinds() { fn _assert_send() {} @@ -43,6 +52,38 @@ fn client(name: &str) -> NamedPipe { unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) } } +fn pipe_msg_mode() -> (NamedPipe, NamedPipe) { + let num: u64 = rand::thread_rng().gen(); + let name = format!(r"\\.\pipe\my-pipe-{}", num); + let name: Vec<_> = OsStr::new(&name).encode_wide().chain(Some(0)).collect(); + unsafe { + let h = CreateNamedPipeW( + name.as_ptr(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + let server = NamedPipe::from_raw_handle(h as RawHandle); + + let h = CreateFileW( + name.as_ptr(), + PIPE_ACCESS_DUPLEX, + 0, + std::ptr::null_mut(), + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + let client = NamedPipe::from_raw_handle(h as RawHandle); + (server, client) + } +} + fn pipe() -> (NamedPipe, NamedPipe) { let (pipe, name) = server(); (pipe, client(&name)) @@ -108,6 +149,45 @@ fn write_then_read() { assert_eq!(&buf[..4], b"1234"); } +#[test] +fn read_sz_greater_than_default_buf_size() { + let (mut server, mut client) = pipe_msg_mode(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + + let mut events = Events::with_capacity(128); + let msg = (0..4106) + .map(|e| e.to_string()) + .collect::>() + .join(""); + + t!(poll.poll(&mut events, None)); + assert_eq!(t!(client.write(msg.as_bytes())), 15314); + + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::>(); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + if event.is_readable() { + break; + } + } + } + + let mut buf = [0; 15314]; + assert_eq!(t!(server.read(&mut buf)), 15314); + assert_eq!(&buf[..15314], msg.as_bytes()); +} + #[test] fn connect_before_client() { let (mut server, name) = server(); From daa2379cea4b14d96e7448f01dd84c515c15529c Mon Sep 17 00:00:00 2001 From: dswij Date: Tue, 7 May 2024 13:14:42 +0800 Subject: [PATCH 4/5] test: add multiple read case for pipe mode --- tests/win_named_pipe.rs | 49 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/tests/win_named_pipe.rs b/tests/win_named_pipe.rs index 044934849..a2c07e829 100644 --- a/tests/win_named_pipe.rs +++ b/tests/win_named_pipe.rs @@ -2,7 +2,8 @@ use std::ffi::OsStr; use std::fs::OpenOptions; -use std::io::{self, Read, Write}; +use std::io::{self, ErrorKind, Read, Write}; +use std::iter; use std::os::windows::ffi::OsStrExt; use std::os::windows::fs::OpenOptionsExt; use std::os::windows::io::{FromRawHandle, IntoRawHandle, RawHandle}; @@ -188,6 +189,52 @@ fn read_sz_greater_than_default_buf_size() { assert_eq!(&buf[..15314], msg.as_bytes()); } +#[test] +fn multi_read_sz_greater_than_default_buf_size() { + let (mut server, mut client) = pipe_msg_mode(); + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + + std::thread::spawn(move || { + let msgs = vec!["hello".repeat(10), "world".repeat(100), "mio".repeat(1000)]; + + let mut poll = t!(Poll::new()); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + let mut events = Events::with_capacity(128); + for msg in msgs.iter() { + t!(poll.poll(&mut events, None)); + t!(client.write(msg.as_bytes())); + } + }); + + let mut events = Events::with_capacity(128); + let msgs = vec!["hello".repeat(10), "world".repeat(100), "mio".repeat(1000)]; + for m in msgs.into_iter() { + let m = m.as_bytes(); + loop { + t!(poll.poll(&mut events, None)); + let events = events.iter().collect::>(); + if let Some(event) = events.iter().find(|e| e.token() == Token(0)) { + let mut buf = [0; 3000]; + let Ok(read) = server.read(&mut buf) else { + continue; + }; + assert_eq!(read, m.len()); + assert_eq!(buf[..read], *m); + break; + } + } + } +} + #[test] fn connect_before_client() { let (mut server, name) = server(); From 43e3bae18df9850cd0be7109041da84aeecb5b8d Mon Sep 17 00:00:00 2001 From: dswij Date: Sat, 25 May 2024 13:48:12 +0800 Subject: [PATCH 5/5] add more checks --- src/sys/windows/named_pipe.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index 8eca67a8a..7eedf4d66 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -27,6 +27,8 @@ use crate::sys::windows::{Event, Handle, Overlapped}; use crate::Registry; use crate::{Interest, Token}; +const MAX_BUFFER_SZ: usize = 65536; + /// Non-blocking windows named pipe. /// /// This structure internally contains a `HANDLE` which represents the named @@ -731,7 +733,8 @@ impl Inner { let mut buf = match mem::replace(&mut io.read, State::None) { State::None => me.get_buffer(), State::InsufficientBufferSize(mut buf, rem) => { - buf.reserve_exact(rem); + let sz_rem = std::cmp::min(rem, MAX_BUFFER_SZ); + buf.reserve_exact(sz_rem); buf } e @ _ => { @@ -911,9 +914,12 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { io.read = State::Ok(buf, 0); } Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => { + buf.set_len(status.bytes_transferred() as usize); match me.remaining_size() { + Ok(rem) if rem == 0 => { + io.read = State::Ok(buf, 0); + } Ok(rem) => { - buf.set_len(status.bytes_transferred() as usize); io.read = State::InsufficientBufferSize(buf, rem); Inner::schedule_read(&me, &mut io, None); return;