diff --git a/CHANGELOG.md b/CHANGELOG.md index 080cb7f6..ea23a9bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 6.3.2 + +* Fix a bug with connection errors unexpectedly ending the connection task. + ## 6.3.1 * Update various dependencies diff --git a/Cargo.toml b/Cargo.toml index 3795343a..57109d65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fred" -version = "6.3.1" +version = "6.3.2" authors = ["Alec Embke "] edition = "2021" description = "An async Redis client built on Tokio." diff --git a/src/router/centralized.rs b/src/router/centralized.rs index ebbb067a..e7610bca 100644 --- a/src/router/centralized.rs +++ b/src/router/centralized.rs @@ -26,7 +26,11 @@ pub async fn send_command( Ok(utils::write_command(inner, writer, command, force_flush).await) } else { _debug!(inner, "Failed to read connection for {}", command.kind.to_str_debug()); - Err((RedisError::new(RedisErrorKind::IO, "Missing connection."), command)) + Ok(Written::Disconnect(( + None, + Some(command), + RedisError::new(RedisErrorKind::IO, "Missing connection."), + ))) } } diff --git a/src/router/clustered.rs b/src/router/clustered.rs index e6cd6941..6b68bf64 100644 --- a/src/router/clustered.rs +++ b/src/router/clustered.rs @@ -106,7 +106,12 @@ pub async fn send_command( server, command.kind.to_str_debug() ); - Err((RedisError::new(RedisErrorKind::IO, "Missing connection."), command)) + + Ok(Written::Disconnect(( + Some(server.clone()), + Some(command), + RedisError::new(RedisErrorKind::IO, "Missing connection."), + ))) } } @@ -162,7 +167,7 @@ pub async fn send_all_cluster_command( if let Written::Disconnect((server, _, err)) = utils::write_command(inner, writer, cmd, true).await { _debug!( inner, - "Exit all nodes command early ({}/{}: {}) from error: {:?}", + "Exit all nodes command early ({}/{}: {:?}) from error: {:?}", idx + 1, num_nodes, server, diff --git a/src/router/commands.rs b/src/router/commands.rs index 33a9af16..ba197912 100644 --- a/src/router/commands.rs +++ b/src/router/commands.rs @@ -139,8 +139,8 @@ async fn write_with_backpressure( continue; }, Ok(Written::Disconnect((server, command, error))) => { - _debug!(inner, "Handle disconnect for {} from {:?}", server, error); - let commands = router.connections.disconnect(inner, Some(&server)).await; + _debug!(inner, "Handle disconnect for {:?} due to {:?}", server, error); + let commands = router.connections.disconnect(inner, server.as_ref()).await; router.buffer_commands(commands); if let Some(command) = command { router.buffer_command(command); diff --git a/src/router/mod.rs b/src/router/mod.rs index 83541af2..f204c9b5 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -44,7 +44,7 @@ pub enum Written { /// Indicates that the command was sent to all servers. SentAll, /// Disconnect from the provided server and retry the command later. - Disconnect((Server, Option, RedisError)), + Disconnect((Option, Option, RedisError)), /// Indicates that the result should be ignored since the command will not be retried. Ignore, /// (Cluster only) Synchronize the cached cluster routing table and retry. @@ -294,7 +294,7 @@ impl Connections { if let Some(writer) = writers.remove(server) { _debug!(inner, "Disconnecting from {}", writer.server); let commands = writer.graceful_close().await; - out.extend(commands.into_iter()); + out.extend(commands); } } out @@ -784,7 +784,7 @@ impl Router { match write_result { Written::Disconnect((server, command, error)) => { - let buffer = self.connections.disconnect(&inner, Some(&server)).await; + let buffer = self.connections.disconnect(&inner, server.as_ref()).await; self.buffer_commands(buffer); self.sync_network_timeout_state(); @@ -996,7 +996,7 @@ impl Router { } warn!( - "{}: Disconnect from {} while replaying command: {:?}", + "{}: Disconnect from {:?} while replaying command: {:?}", self.inner.id, server, error ); self.disconnect_all().await; // triggers a reconnect if needed diff --git a/src/router/utils.rs b/src/router/utils.rs index bcd6b312..1383ab7c 100644 --- a/src/router/utils.rs +++ b/src/router/utils.rs @@ -173,10 +173,10 @@ pub async fn write_command( _debug!(inner, "Error sending command {}: {:?}", command.kind.to_str_debug(), e); if command.should_send_write_error(inner) { command.respond_to_caller(Err(e.clone())); - Written::Disconnect((writer.server.clone(), None, e)) + Written::Disconnect((Some(writer.server.clone()), None, e)) } else { inner.notifications.broadcast_error(e.clone()); - Written::Disconnect((writer.server.clone(), Some(command), e)) + Written::Disconnect((Some(writer.server.clone()), Some(command), e)) } } else { Written::Sent((writer.server.clone(), should_flush))