diff --git a/CHANGELOG.md b/CHANGELOG.md index 31176c19..0bc687d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 10.0.1 + +* Fix reconnection errors when no connections exist + ## 10.0.0 * Reduced memory footprint and significant write throughput improvements diff --git a/Cargo.toml b/Cargo.toml index 03892aeb..423dc29c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ name = "fred" readme = "README.md" repository = "https://github.com/aembke/fred.rs" rust-version = "1.75" -version = "10.0.0" +version = "10.0.1" [package.metadata.docs.rs] # do not show the glommio version of the docs diff --git a/bin/inf_loop/src/main.rs b/bin/inf_loop/src/main.rs index 2d750992..c386581c 100644 --- a/bin/inf_loop/src/main.rs +++ b/bin/inf_loop/src/main.rs @@ -182,10 +182,10 @@ async fn main() -> Result<(), Error> { let pool = Builder::from_config(config) .with_connection_config(|config| { config.max_command_attempts = 3; - config.unresponsive = UnresponsiveConfig { - interval: Duration::from_secs(1), - max_timeout: Some(Duration::from_secs(5)), - }; + // config.unresponsive = UnresponsiveConfig { + // interval: Duration::from_secs(1), + // max_timeout: Some(Duration::from_secs(5)), + //}; config.connection_timeout = Duration::from_secs(3); config.internal_command_timeout = Duration::from_secs(2); // config.cluster_cache_update_delay = Duration::from_secs(20); diff --git a/src/modules/inner.rs b/src/modules/inner.rs index 1469645a..e60aa32a 100644 --- a/src/modules/inner.rs +++ b/src/modules/inner.rs @@ -783,6 +783,8 @@ impl ClientInner { use tokio::sync::mpsc::error::TrySendError; if let Err(v) = self.command_tx.load().try_send(command) { + trace!("{}: Failed sending command to router.", self.id); + match v { TrySendError::Closed(c) => Err(c), TrySendError::Full(c) => match c { diff --git a/src/router/mod.rs b/src/router/mod.rs index bfc3dcc5..0963d277 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -26,13 +26,13 @@ use crate::{ utils as client_utils, }; use futures::future::join_all; +#[cfg(feature = "replicas")] +use futures::future::try_join; use std::{ collections::{HashSet, VecDeque}, + future::pending, hash::{Hash, Hasher}, }; - -#[cfg(feature = "replicas")] -use futures::future::try_join; #[cfg(feature = "transactions")] pub mod transactions; #[cfg(feature = "replicas")] @@ -459,7 +459,7 @@ impl Router { } } - /// Try to read from all sockets concurrently. + /// Try to read from all sockets concurrently in a select loop. #[cfg(feature = "replicas")] pub async fn select_read( &mut self, @@ -475,7 +475,7 @@ impl Router { if let Some(writer) = writer { ReadFuture::new(inner, writer, &mut self.replicas.connections).await } else { - Vec::new() + pending().await } }, Connections::Clustered { @@ -485,7 +485,7 @@ impl Router { } } - /// Try to read from all sockets concurrently. + /// Try to read from all sockets concurrently in a select loop. #[cfg(not(feature = "replicas"))] pub async fn select_read( &mut self, @@ -501,7 +501,7 @@ impl Router { if let Some(writer) = writer { ReadFuture::new(inner, writer).await } else { - Vec::new() + pending().await } }, Connections::Clustered { diff --git a/src/router/responses.rs b/src/router/responses.rs index 6fc7e938..8fb081e2 100644 --- a/src/router/responses.rs +++ b/src/router/responses.rs @@ -309,7 +309,7 @@ pub fn preprocess_frame( /// Handle an error in the reader task that should end the connection. pub fn broadcast_reader_error(inner: &RefCount, server: &Server, error: Option) { - _warn!(inner, "Ending reader task from {} due to {:?}", server, error); + _warn!(inner, "Broadcasting error {:?} from {}", error, server); if utils::read_locked(&inner.state) != ClientState::Disconnecting { inner @@ -320,7 +320,7 @@ pub fn broadcast_reader_error(inner: &RefCount, server: &Server, er #[cfg(feature = "replicas")] pub fn broadcast_replica_error(inner: &RefCount, server: &Server, error: Option) { - _warn!(inner, "Ending replica reader task from {} due to {:?}", server, error); + _warn!(inner, "Broadcasting replica error {:?} from {}", error, server); if utils::read_locked(&inner.state) != ClientState::Disconnecting { inner diff --git a/src/router/types.rs b/src/router/types.rs index 483c8690..b6a5dde1 100644 --- a/src/router/types.rs +++ b/src/router/types.rs @@ -30,6 +30,9 @@ impl Default for ClusterChange { } } +// The following future types are used in the context of a select! loop, so they return Pending when there are no +// available connections to poll. + fn poll_connection( inner: &RefCount, conn: &mut Connection, @@ -96,11 +99,11 @@ impl Future for ReadAllFuture<'_, '_> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(feature = "replicas")] if self.connections.is_empty() && self.replicas.is_empty() { - return Poll::Ready(Vec::new()); + return Poll::Pending; } #[cfg(not(feature = "replicas"))] if self.connections.is_empty() { - return Poll::Ready(Vec::new()); + return Poll::Pending; } let _self = self.get_mut(); diff --git a/src/router/utils.rs b/src/router/utils.rs index d86475e9..3a02bb04 100644 --- a/src/router/utils.rs +++ b/src/router/utils.rs @@ -130,13 +130,13 @@ pub fn defer_reconnection( _debug!(inner, "Skip defer reconnection."); Ok(()) } else { - _debug!(inner, "Defer reconnection to {:?} after {:?}", server, error); // keep track of pending reconnection commands to dedup them before they're sent if let Some(server) = server { router.pending_reconnection.insert(ReconnectServer::One(server.clone())); } else { router.pending_reconnection.insert(ReconnectServer::All); }; + _debug!(inner, "Defer reconnection to {:?} after {:?}", server, error); interfaces::send_to_router(inner, RouterCommand::Reconnect { server: server.cloned(),