Skip to content

Commit

Permalink
10.0.1 (#317)
Browse files Browse the repository at this point in the history
* fix: failed reconnections with empty connection map
  • Loading branch information
aembke authored Dec 5, 2024
1 parent fdedd5e commit d3ea5ca
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 10.0.1

* Fix reconnection errors when no connections exist

## 10.0.0

* Reduced memory footprint and significant write throughput improvements
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ name = "fred"
readme = "README.md"
repository = "https://github.com/aembke/fred.rs"
rust-version = "1.75"
version = "10.0.0"
version = "10.0.1"

[package.metadata.docs.rs]
# do not show the glommio version of the docs
Expand Down
8 changes: 4 additions & 4 deletions bin/inf_loop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ async fn main() -> Result<(), Error> {
let pool = Builder::from_config(config)
.with_connection_config(|config| {
config.max_command_attempts = 3;
config.unresponsive = UnresponsiveConfig {
interval: Duration::from_secs(1),
max_timeout: Some(Duration::from_secs(5)),
};
// config.unresponsive = UnresponsiveConfig {
// interval: Duration::from_secs(1),
// max_timeout: Some(Duration::from_secs(5)),
//};
config.connection_timeout = Duration::from_secs(3);
config.internal_command_timeout = Duration::from_secs(2);
// config.cluster_cache_update_delay = Duration::from_secs(20);
Expand Down
2 changes: 2 additions & 0 deletions src/modules/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ impl ClientInner {
use tokio::sync::mpsc::error::TrySendError;

if let Err(v) = self.command_tx.load().try_send(command) {
trace!("{}: Failed sending command to router.", self.id);

match v {
TrySendError::Closed(c) => Err(c),
TrySendError::Full(c) => match c {
Expand Down
14 changes: 7 additions & 7 deletions src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use crate::{
utils as client_utils,
};
use futures::future::join_all;
#[cfg(feature = "replicas")]
use futures::future::try_join;
use std::{
collections::{HashSet, VecDeque},
future::pending,
hash::{Hash, Hasher},
};

#[cfg(feature = "replicas")]
use futures::future::try_join;
#[cfg(feature = "transactions")]
pub mod transactions;
#[cfg(feature = "replicas")]
Expand Down Expand Up @@ -459,7 +459,7 @@ impl Router {
}
}

/// Try to read from all sockets concurrently.
/// Try to read from all sockets concurrently in a select loop.
#[cfg(feature = "replicas")]
pub async fn select_read(
&mut self,
Expand All @@ -475,7 +475,7 @@ impl Router {
if let Some(writer) = writer {
ReadFuture::new(inner, writer, &mut self.replicas.connections).await
} else {
Vec::new()
pending().await
}
},
Connections::Clustered {
Expand All @@ -485,7 +485,7 @@ impl Router {
}
}

/// Try to read from all sockets concurrently.
/// Try to read from all sockets concurrently in a select loop.
#[cfg(not(feature = "replicas"))]
pub async fn select_read(
&mut self,
Expand All @@ -501,7 +501,7 @@ impl Router {
if let Some(writer) = writer {
ReadFuture::new(inner, writer).await
} else {
Vec::new()
pending().await
}
},
Connections::Clustered {
Expand Down
4 changes: 2 additions & 2 deletions src/router/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub fn preprocess_frame(

/// Handle an error in the reader task that should end the connection.
pub fn broadcast_reader_error(inner: &RefCount<ClientInner>, server: &Server, error: Option<Error>) {
_warn!(inner, "Ending reader task from {} due to {:?}", server, error);
_warn!(inner, "Broadcasting error {:?} from {}", error, server);

if utils::read_locked(&inner.state) != ClientState::Disconnecting {
inner
Expand All @@ -320,7 +320,7 @@ pub fn broadcast_reader_error(inner: &RefCount<ClientInner>, server: &Server, er

#[cfg(feature = "replicas")]
pub fn broadcast_replica_error(inner: &RefCount<ClientInner>, server: &Server, error: Option<Error>) {
_warn!(inner, "Ending replica reader task from {} due to {:?}", server, error);
_warn!(inner, "Broadcasting replica error {:?} from {}", error, server);

if utils::read_locked(&inner.state) != ClientState::Disconnecting {
inner
Expand Down
7 changes: 5 additions & 2 deletions src/router/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ impl Default for ClusterChange {
}
}

// The following future types are used in the context of a select! loop, so they return Pending when there are no
// available connections to poll.

fn poll_connection(
inner: &RefCount<ClientInner>,
conn: &mut Connection,
Expand Down Expand Up @@ -96,11 +99,11 @@ impl Future for ReadAllFuture<'_, '_> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(feature = "replicas")]
if self.connections.is_empty() && self.replicas.is_empty() {
return Poll::Ready(Vec::new());
return Poll::Pending;
}
#[cfg(not(feature = "replicas"))]
if self.connections.is_empty() {
return Poll::Ready(Vec::new());
return Poll::Pending;
}

let _self = self.get_mut();
Expand Down
2 changes: 1 addition & 1 deletion src/router/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ pub fn defer_reconnection(
_debug!(inner, "Skip defer reconnection.");
Ok(())
} else {
_debug!(inner, "Defer reconnection to {:?} after {:?}", server, error);
// keep track of pending reconnection commands to dedup them before they're sent
if let Some(server) = server {
router.pending_reconnection.insert(ReconnectServer::One(server.clone()));
} else {
router.pending_reconnection.insert(ReconnectServer::All);
};
_debug!(inner, "Defer reconnection to {:?} after {:?}", server, error);

interfaces::send_to_router(inner, RouterCommand::Reconnect {
server: server.cloned(),
Expand Down

0 comments on commit d3ea5ca

Please sign in to comment.