Skip to content

Commit

Permalink
10.0.2 (#322)
Browse files Browse the repository at this point in the history
* fix: fix intermittent transaction timeouts
  • Loading branch information
aembke authored Dec 23, 2024
1 parent d3ea5ca commit bdfe4f8
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 10.0.2

* Fix intermittent transaction timeouts

## 10.0.1

* Fix reconnection errors when no connections exist
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ name = "fred"
readme = "README.md"
repository = "https://github.com/aembke/fred.rs"
rust-version = "1.75"
version = "10.0.1"
version = "10.0.2"

[package.metadata.docs.rs]
# do not show the glommio version of the docs
Expand Down
7 changes: 6 additions & 1 deletion src/router/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ pub async fn write_command(
router: &mut Router,
mut command: Command,
) -> Result<(), Error> {
_trace!(inner, "Writing command: {:?} ({})", command, command.debug_id());
_trace!(
inner,
"Writing command: {:?} ({})",
command.kind.to_str_debug(),
command.debug_id()
);
if let Err(err) = command.decr_check_attempted() {
command.respond_to_caller(Err(err));
return Ok(());
Expand Down
1 change: 1 addition & 0 deletions src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ impl Router {
pub async fn drain_all(&mut self, inner: &RefCount<ClientInner>) -> Result<(), Error> {
let inner = inner.clone();
_trace!(inner, "Draining all connections...");
self.flush().await?;

let primary_ft = async {
match self.connections {
Expand Down
1 change: 1 addition & 0 deletions src/router/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub async fn send(
}

_debug!(inner, "Starting transaction {}", id);
// command buffer length checked above
let max_attempts = commands.last().unwrap().attempts_remaining;
let max_redirections = commands.last().unwrap().redirections_remaining;
let mut attempted = 0;
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/centralized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ mod other {
#[cfg(all(feature = "i-keys", feature = "i-lists"))]
centralized_test!(other, should_manually_connect_twice);
#[cfg(all(feature = "transactions", feature = "i-keys"))]
centralized_test!(other, should_mix_trx_and_get);
#[cfg(all(feature = "transactions", feature = "i-keys"))]
centralized_test!(other, should_support_options_with_trx);

//#[cfg(feature = "dns")]
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/clustered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ mod other {
#[cfg(all(feature = "i-keys", feature = "i-lists"))]
cluster_test!(other, should_manually_connect_twice);
#[cfg(all(feature = "transactions", feature = "i-keys"))]
cluster_test!(other, should_mix_trx_and_get);
#[cfg(all(feature = "transactions", feature = "i-keys"))]
cluster_test!(other, should_support_options_with_trx);

//#[cfg(feature = "dns")]
Expand Down
18 changes: 18 additions & 0 deletions tests/integration/other/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use fred::types::Resolve;
use hickory_resolver::{config::*, TokioAsyncResolver};
#[cfg(feature = "dns")]
use std::net::{IpAddr, SocketAddr};
use tokio::task::JoinSet;

#[cfg(all(feature = "i-keys", feature = "i-hashes"))]
fn hash_to_btree(vals: &Map) -> BTreeMap<Key, u16> {
Expand Down Expand Up @@ -904,3 +905,20 @@ pub async fn should_create_non_lazy_replica_connections(client: Client, config:
assert_eq!(client.active_connections().len(), 6);
Ok(())
}

#[cfg(all(feature = "transactions", feature = "i-keys"))]
pub async fn should_mix_trx_and_get(client: Client, _: Config) -> Result<(), Error> {
let mut set = JoinSet::new();
for _ in 0 .. 200 {
let client = client.clone();
set.spawn(async move {
let tx = client.multi();
let _: () = tx.incr("foo").await.unwrap();
let _: () = tx.exec(true).await.unwrap();
let _: () = client.get("bar").await.unwrap();
});
}

set.join_all().await;
Ok(())
}

0 comments on commit bdfe4f8

Please sign in to comment.