diff --git a/CHANGELOG.md b/CHANGELOG.md index 5af59217..d0931cc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 4.3.2 + +* Fix https://github.com/aembke/fred.rs/issues/27 +* Fix https://github.com/aembke/fred.rs/issues/26 + ## 4.3.1 * Fix authentication bug with `sentinel-auth` tests diff --git a/Cargo.toml b/Cargo.toml index 90e857f2..7cfad2b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fred" -version = "4.3.1" +version = "4.3.2" authors = ["Alec Embke "] edition = "2018" description = "An async Redis client for Rust built on Futures and Tokio." diff --git a/examples/dynamic_pool.rs b/examples/dynamic_pool.rs index f1eb0d8c..6af388e6 100644 --- a/examples/dynamic_pool.rs +++ b/examples/dynamic_pool.rs @@ -8,7 +8,7 @@ async fn main() -> Result<(), RedisError> { // the max size isn't a hard limit - it just determines the size of the client array when the pool is initialized let pool = DynamicRedisPool::new(config, None, 5, 10); - let _ = pool.connect(); + let _ = pool.connect().await; let _ = pool.wait_for_connect().await?; // modify the size of the pool at runtime diff --git a/src/modules/pool.rs b/src/modules/pool.rs index 0befb6e8..d05f358b 100644 --- a/src/modules/pool.rs +++ b/src/modules/pool.rs @@ -76,6 +76,7 @@ impl DynamicRedisPool { /// Wait for all the clients to connect to the server. pub async fn wait_for_connect(&self) -> Result<(), RedisError> { + debug!("Connecting via dynamic pool..."); let clients = self.clients(); let futures = clients.iter().map(|client| client.wait_for_connect()); let _ = try_join_all(futures).await?; diff --git a/src/utils.rs b/src/utils.rs index 8acd6d90..0499bbf6 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -404,7 +404,9 @@ pub async fn wait_for_connect(inner: &Arc) -> Result<(), Redis } pub fn send_command(inner: &Arc, command: RedisCommand) -> Result<(), RedisError> { + incr_atomic(&inner.cmd_buffer_len); if let Err(mut e) = inner.command_tx.send(command) { + decr_atomic(&inner.cmd_buffer_len); if let Some(tx) = e.0.tx.take() { if let Err(_) = tx.send(Err(RedisError::new(RedisErrorKind::Unknown, "Failed to send command."))) { _error!(inner, "Failed to send command {:?}.", e.0.extract_key()); @@ -412,7 +414,6 @@ pub fn send_command(inner: &Arc, command: RedisCommand) -> Res } } - incr_atomic(&inner.cmd_buffer_len); Ok(()) } diff --git a/tests/integration/centralized.rs b/tests/integration/centralized.rs index 2349322d..0eeb1e57 100644 --- a/tests/integration/centralized.rs +++ b/tests/integration/centralized.rs @@ -43,6 +43,7 @@ mod other { mod pool { centralized_test!(pool, should_connect_and_ping_static_pool_single_conn); centralized_test!(pool, should_connect_and_ping_static_pool_two_conn); + centralized_test!(pool, should_connect_and_ping_dynamic_pool); #[cfg(feature = "fd-tests")] centralized_test!(pool, should_connect_and_ping_static_pool_many_conn); #[cfg(feature = "fd-tests")] @@ -219,4 +220,4 @@ pub mod geo { pub mod acl { centralized_test!(acl, should_auth_as_test_user); centralized_test!(acl, should_auth_as_test_user_via_config); -} \ No newline at end of file +} diff --git a/tests/integration/clustered.rs b/tests/integration/clustered.rs index 7648e641..2b787ba8 100644 --- a/tests/integration/clustered.rs +++ b/tests/integration/clustered.rs @@ -47,6 +47,7 @@ mod other { mod pool { cluster_test!(pool, should_connect_and_ping_static_pool_single_conn); cluster_test!(pool, should_connect_and_ping_static_pool_two_conn); + cluster_test!(pool, should_connect_and_ping_dynamic_pool); #[cfg(feature = "fd-tests")] cluster_test!(pool, should_connect_and_ping_static_pool_many_conn); #[cfg(feature = "fd-tests")] diff --git a/tests/integration/pool/mod.rs b/tests/integration/pool/mod.rs index 5b58b457..ecaa8aa7 100644 --- a/tests/integration/pool/mod.rs +++ b/tests/integration/pool/mod.rs @@ -1,6 +1,6 @@ use fred::client::RedisClient; use fred::error::RedisError; -use fred::pool::StaticRedisPool; +use fred::pool::{DynamicRedisPool, StaticRedisPool}; use fred::types::RedisConfig; #[cfg(feature = "fd-tests")] @@ -20,6 +20,19 @@ async fn create_and_ping_pool(config: &RedisConfig, count: usize) -> Result<(), Ok(()) } +async fn create_and_ping_dynamic_pool(config: &RedisConfig, count: usize) -> Result<(), RedisError> { + let pool = DynamicRedisPool::new(config.clone(), None, count, count * 2); + let _ = pool.connect().await; + let _ = pool.wait_for_connect().await?; + + for client in pool.clients().into_iter() { + let _ = client.ping().await?; + } + + let _ = pool.quit_pool().await; + Ok(()) +} + pub async fn should_connect_and_ping_static_pool_single_conn( _: RedisClient, config: RedisConfig, @@ -34,6 +47,10 @@ pub async fn should_connect_and_ping_static_pool_two_conn( create_and_ping_pool(&config, 2).await } +pub async fn should_connect_and_ping_dynamic_pool(_: RedisClient, config: RedisConfig) -> Result<(), RedisError> { + create_and_ping_dynamic_pool(&config, 5).await +} + // this may require increasing the number of allowed file descriptors #[cfg(feature = "fd-tests")] pub async fn should_connect_and_ping_static_pool_many_conn(