From e269739f6a609b6e3db0386220291d7a989d7c74 Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Sun, 28 Apr 2024 18:06:54 +0800 Subject: [PATCH] Ensure the fd will be release completely This problem is caused by the reference count not being able to return to 0 properly. Signed-off-by: jokemanfire --- src/sync/client.rs | 36 +++++++++++++++++++++--------------- src/sync/sys/unix/net.rs | 7 +++++-- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/sync/client.rs b/src/sync/client.rs index dfeb15f7..8f70f2fb 100644 --- a/src/sync/client.rs +++ b/src/sync/client.rs @@ -62,7 +62,7 @@ impl Client { fn new_client(pipe_client: ClientConnection) -> Result { let client = Arc::new(pipe_client); - + let weak_client = Arc::downgrade(&client); let (sender_tx, rx): (Sender, Receiver) = mpsc::channel(); let recver_map_orig = Arc::new(Mutex::new(HashMap::new())); @@ -98,20 +98,28 @@ impl Client { trace!("Sender quit"); }); - //Recver + //Reciver let receiver_connection = connection; - let receiver_client = client.clone(); + //this thread should use weak arc for ClientConnection, otherwise the thread will occupy a reference count of ClientConnection's arc, + //ClientConnection's drop will be not call until the thread finished. It means if all the external references are finished, + //this thread should be release. + let receiver_client = weak_client.clone(); thread::spawn(move || { loop { - match receiver_client.ready() { - Ok(None) => { - continue; - } - Ok(_) => {} - Err(e) => { - error!("pipeConnection ready error {:?}", e); - break; + //The count of ClientConnection's Arc will be add one , and back to original value when this code ends. + if let Some(receiver_client) = receiver_client.upgrade(){ + match receiver_client.ready() { + Ok(None) => { + continue; + } + Ok(_) => {} + Err(e) => { + error!("pipeConnection ready error {:?}", e); + break; + } } + }else{ + break; } match read_message(&receiver_connection) { @@ -140,10 +148,6 @@ impl Client { }; } - let _ = receiver_client - .close_receiver() - .map_err(|e| warn!("failed to close with error: {:?}", e)); - trace!("Receiver quit"); }); @@ -191,7 +195,9 @@ impl Client { impl Drop for ClientConnection { fn drop(&mut self) { + //close all fd , make sure all fd have been release self.close().unwrap(); + self.close_receiver().unwrap(); trace!("Client is dropped"); } } diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index 3fdf47b8..6245fe6e 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -28,6 +28,9 @@ use crate::common::{self, client_connect, SOCK_CLOEXEC}; use crate::common::set_fd_close_exec; use nix::sys::socket::{self}; +//The libc::poll's max wait time +const POLL_MAX_TIME: i32 = 10; + pub struct PipeListener { fd: RawFd, monitor_fd: (RawFd, RawFd), @@ -104,7 +107,7 @@ impl PipeListener { libc::poll( pollers as *mut _ as *mut libc::pollfd, pollers.len() as _, - -1, + POLL_MAX_TIME, ) }; @@ -278,7 +281,7 @@ impl ClientConnection { libc::poll( pollers as *mut _ as *mut libc::pollfd, pollers.len() as _, - -1, + POLL_MAX_TIME, ) };