diff --git a/CHANGELOG.md b/CHANGELOG.md index 9277efd5..e1745f68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 6.2.1 + +* Fix cluster failover with paused nodes. + ## 6.2.0 * Add `Pipeline::try_all` diff --git a/Cargo.toml b/Cargo.toml index 2b19c872..403bc0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fred" -version = "6.2.0" +version = "6.2.1" authors = ["Alec Embke "] edition = "2021" description = "An async Redis client built on Tokio." diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index b075b284..146a00d0 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -644,8 +644,13 @@ impl RedisTransport { /// Send `QUIT` and close the connection. pub async fn disconnect(&mut self, inner: &Arc) -> Result<(), RedisError> { + let timeout = globals().default_connection_timeout_ms(); let command: RedisCommand = RedisCommandKind::Quit.into(); - let _ = self.request_response(command, inner.is_resp3()).await?; + let quit_ft = self.request_response(command, inner.is_resp3()); + + if let Err(e) = client_utils::apply_timeout(quit_ft, timeout).await { + _warn!(inner, "Error calling QUIT on backchannel: {:?}", e); + } let _ = self.transport.close().await; Ok(()) diff --git a/src/router/clustered.rs b/src/router/clustered.rs index 2be5d374..e6cd6941 100644 --- a/src/router/clustered.rs +++ b/src/router/clustered.rs @@ -1,5 +1,6 @@ use crate::{ error::{RedisError, RedisErrorKind}, + globals::globals, interfaces, interfaces::Resp3Frame, modules::inner::RedisClientInner, @@ -13,6 +14,7 @@ use crate::{ }, router::{responses, types::ClusterChange, utils, Connections, Written}, types::ClusterStateChange, + utils as client_utils, }; use std::{ collections::{BTreeSet, HashMap}, @@ -552,6 +554,8 @@ pub async fn cluster_slots_backchannel( inner: &Arc, cache: Option<&ClusterRouting>, ) -> Result { + let timeout = globals().default_connection_timeout_ms(); + let (response, host) = { let command: RedisCommand = RedisCommandKind::ClusterSlots.into(); @@ -561,8 +565,8 @@ pub async fn cluster_slots_backchannel( if let Some(ref mut transport) = backchannel.transport { let default_host = transport.default_host.clone(); - transport - .request_response(command, inner.is_resp3()) + _trace!(inner, "Sending backchannel CLUSTER SLOTS to {}", transport.server); + client_utils::apply_timeout(transport.request_response(command, inner.is_resp3()), timeout) .await .ok() .map(|frame| (frame, default_host)) @@ -582,7 +586,8 @@ pub async fn cluster_slots_backchannel( if frame.is_error() { // try connecting to any of the nodes, then try again let mut transport = connect_any(inner, old_cache).await?; - let frame = transport.request_response(command, inner.is_resp3()).await?; + let frame = + client_utils::apply_timeout(transport.request_response(command, inner.is_resp3()), timeout).await?; let host = transport.default_host.clone(); inner.update_backchannel(transport).await; @@ -594,7 +599,7 @@ pub async fn cluster_slots_backchannel( } else { // try connecting to any of the nodes, then try again let mut transport = connect_any(inner, old_cache).await?; - let frame = transport.request_response(command, inner.is_resp3()).await?; + let frame = client_utils::apply_timeout(transport.request_response(command, inner.is_resp3()), timeout).await?; let host = transport.default_host.clone(); inner.update_backchannel(transport).await; diff --git a/src/router/responses.rs b/src/router/responses.rs index ee2ba905..adcebc2c 100644 --- a/src/router/responses.rs +++ b/src/router/responses.rs @@ -310,7 +310,7 @@ pub fn check_special_errors(inner: &Arc, frame: &Resp3Frame) - /// Handle an error in the reader task that should end the connection. pub fn broadcast_reader_error(inner: &Arc, server: &Server, error: Option) { - _debug!(inner, "Ending reader task from {} due to {:?}", server, error); + _warn!(inner, "Ending reader task from {} due to {:?}", server, error); if inner.should_reconnect() { inner.send_reconnect(Some(server.clone()), false, None); @@ -329,7 +329,7 @@ pub fn broadcast_replica_error(inner: &Arc, server: &Server, e #[cfg(feature = "replicas")] pub fn broadcast_replica_error(inner: &Arc, server: &Server, error: Option) { - _debug!(inner, "Ending replica reader task from {} due to {:?}", server, error); + _warn!(inner, "Ending replica reader task from {} due to {:?}", server, error); if inner.should_reconnect() { inner.send_replica_reconnect(server);