From 31f8f56dbb74cb8f750d361529383036e6f66083 Mon Sep 17 00:00:00 2001 From: Alec Embke Date: Fri, 3 Nov 2023 16:24:00 -0700 Subject: [PATCH] 7.1.0 (#187) * fix: don't panic when jitter is 0 * fix: support percent encoding in urls * feat: add from tuple for RedisValue / MultipleKeys * feat: make CLIENT ID checks optional --------- Co-authored-by: 3Ti <3Ti.Site@gmail.com> Co-authored-by: dupu --- Cargo.toml | 24 +++--- examples/README.md | 2 + examples/events.rs | 106 +++++++++++++++++++++++++ examples/keyspace.rs | 136 ++++++++++++++++++++++++++++++++ examples/pool.rs | 20 ++--- examples/scan.rs | 70 ++++++++++++---- src/clients/pipeline.rs | 5 +- src/commands/impls/lua.rs | 6 +- src/commands/impls/sentinel.rs | 2 +- src/commands/impls/strings.rs | 10 --- src/lib.rs | 1 + src/protocol/connection.rs | 4 +- src/protocol/types.rs | 27 ++++--- src/router/reader.rs | 1 + src/trace/disabled.rs | 4 +- src/types/args.rs | 54 +++++++------ src/types/config.rs | 6 +- src/types/from_tuple.rs | 36 +++++++++ src/types/mod.rs | 1 + src/types/multiple.rs | 2 +- src/utils.rs | 53 +++++++++++-- tests/integration/client/mod.rs | 1 + tests/integration/sets/mod.rs | 12 ++- 23 files changed, 469 insertions(+), 114 deletions(-) create mode 100644 examples/events.rs create mode 100644 examples/keyspace.rs create mode 100644 src/types/from_tuple.rs diff --git a/Cargo.toml b/Cargo.toml index 9384a33a..68190f9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fred" -version = "7.0.0" +version = "7.1.0" authors = ["Alec Embke "] edition = "2021" description = "An async Redis client built on Tokio." @@ -38,23 +38,24 @@ features = [ rustdoc-args = ["--cfg", "docsrs"] [dependencies] -arc-swap = "1.5" -tokio = { version = "1.19.0", features = ["net", "sync", "rt", "rt-multi-thread", "macros"] } -tokio-util = { version = "0.7.1", features = ["codec"] } -bytes = "1.1" +arc-swap = "1.6" +tokio = { version = "1.34", features = ["net", "sync", "rt", "rt-multi-thread", "macros"] } +tokio-util = { version = "0.7", features = ["codec"] } +bytes = "1.5" bytes-utils = "0.1" -futures = "0.3" +futures = { version = "0.3", features = ["std"] } parking_lot = "0.12" lazy_static = "1.4" redis-protocol = { version = "4.1", features = ["decode-mut"] } log = "0.4" float-cmp = "0.9" -url = "2.3" -tokio-stream = "0.1.1" +url = "2.4" +tokio-stream = "0.1" sha-1 = { version = "0.10", optional = true } rand = "0.8" semver = "1.0" socket2 = "0.5" +urlencoding = "2.1" async-trait = { version = "0.1" } rustls = { version = "0.21", optional = true } native-tls = { version = "0.2", optional = true } @@ -72,8 +73,9 @@ trust-dns-resolver = { version = "0.23", optional = true } base64 = "0.21" subprocess = "0.2" pretty_env_logger = "0.5" -bollard = "0.14" +bollard = "0.15" serde = "1.0" +tokio-stream = { version = "0.1", features = ["sync"] } [lib] doc = true @@ -108,6 +110,10 @@ required-features = ["client-tracking"] name = "lua" required-features = ["sha-1"] +[[example]] +name = "events" +required-features = ["tokio-stream/sync"] + [features] default = ["ignore-auth-error", "pool-prefer-active"] serde-json = ["serde_json"] diff --git a/examples/README.md b/examples/README.md index adf48d4d..38ba08ad 100644 --- a/examples/README.md +++ b/examples/README.md @@ -18,6 +18,8 @@ Examples * [Custom](./custom.rs) - Send custom commands or operate on RESP frames. * [DNS](./dns.rs) - Customize the DNS resolution logic. * [Client Tracking](./client_tracking.rs) - Implement [client side caching](https://redis.io/docs/manual/client-side-caching/). +* [Events](./events.rs) - Respond to connection events with the `EventsInterface`. +* [Keyspace Notifications](./keyspace.rs) - Use the [keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/) interface. * [Misc](./misc.rs) - Miscellaneous or advanced features. Or see the [tests](../tests/integration) for more examples. \ No newline at end of file diff --git a/examples/events.rs b/examples/events.rs new file mode 100644 index 00000000..20d2b8a1 --- /dev/null +++ b/examples/events.rs @@ -0,0 +1,106 @@ +#![allow(clippy::disallowed_names)] +#![allow(clippy::let_underscore_future)] + +use fred::prelude::*; +use futures::StreamExt; + +// requires tokio_stream 0.1.3 or later +use tokio_stream::wrappers::BroadcastStream; + +/// There are two interfaces for interacting with connection events on the `EventInterface`. +/// +/// * The `on_*` functions are generally easier to use but require spawning a new tokio task. They also currently only +/// support synchronous functions. +/// * The `*_rx` functions are somewhat more complicated to use but allow the caller to control the underlying channel +/// receiver directly. Additionally, these functions do not require spawning a new tokio task. +/// +/// See the source for `on_any` for an example of how one might handle multiple receivers in one task. +/// +/// The best approach depends on how many tasks the caller is willing to create. The `setup_pool` function shows +/// how one might combine multiple receiver streams in a `RedisPool` to minimize the overhead of new tokio tasks for +/// each underlying client. +#[tokio::main] +async fn main() -> Result<(), RedisError> { + let client = Builder::default_centralized().build()?; + + // use the on_* functions + let reconnect_task = client.on_reconnect(|server| { + println!("Reconnected to {}", server); + Ok(()) + }); + let error_task = client.on_error(|error| { + println!("Connection error: {:?}", error); + Ok(()) + }); + + // use the *_rx functions to do the same thing shown above. although not shown here, callers have more freedom to + // reduce the number of spawned tokio tasks with this interface. + let mut reconnect_rx = client.reconnect_rx(); + let reconnect_task_2 = tokio::spawn(async move { + while let Ok(server) = reconnect_rx.recv().await { + println!("Reconnected to {}", server); + } + }); + + let mut error_rx = client.error_rx(); + let error_task_2 = tokio::spawn(async move { + while let Ok(error) = error_rx.recv().await { + println!("Connection error: {:?}", error); + } + }); + + client.connect(); + client.wait_for_connect().await?; + + // ... + + client.quit().await?; + reconnect_task.await??; + error_task.await??; + reconnect_task_2.await?; + error_task_2.await?; + Ok(()) +} + +/// Shows how to combine multiple event streams from multiple clients into one tokio task. +#[allow(dead_code)] +async fn setup_pool() -> Result<(), RedisError> { + let pool = Builder::default_centralized().build_pool(5)?; + + // `select_all` does most of the work here but requires that the channel receivers implement `Stream`. unfortunately + // `tokio::sync::broadcast::Receiver` does not do this, so we use `tokio_stream::wrappers::BroadcastStream`. + let error_rxs: Vec<_> = pool + .clients() + .iter() + .map(|client| BroadcastStream::new(client.error_rx())) + .collect(); + let reconnect_rxs: Vec<_> = pool + .clients() + .iter() + .map(|client| BroadcastStream::new(client.reconnect_rx())) + .collect(); + let mut error_rx = futures::stream::select_all(error_rxs); + let mut reconnect_rx = futures::stream::select_all(reconnect_rxs); + + let all_events_task = tokio::spawn(async move { + loop { + tokio::select! { + Some(Ok(error)) = error_rx.next() => { + println!("Error: {:?}", error); + } + Some(Ok(server)) = reconnect_rx.next() => { + println!("Reconnected to {}", server); + } + } + } + }); + + pool.connect(); + pool.wait_for_connect().await?; + + // ... + + pool.quit().await?; + all_events_task.abort(); + Ok(()) +} diff --git a/examples/keyspace.rs b/examples/keyspace.rs new file mode 100644 index 00000000..6f6ab56b --- /dev/null +++ b/examples/keyspace.rs @@ -0,0 +1,136 @@ +#![allow(clippy::disallowed_names)] +#![allow(clippy::let_underscore_future)] + +use fred::prelude::*; +use std::time::Duration; +use tokio::time::sleep; + +async fn fake_traffic(client: &RedisClient, amount: usize) -> Result<(), RedisError> { + // use a new client since the provided client is subscribed to keyspace events + let client = client.clone_new(); + client.connect(); + client.wait_for_connect().await?; + + for idx in 0 .. amount { + let key: RedisKey = format!("foo-{}", idx).into(); + + client.set(&key, 1, None, None, false).await?; + client.incr(&key).await?; + client.del(&key).await?; + } + + client.quit().await?; + Ok(()) +} + +/// Examples showing how to set up keyspace notifications with clustered or centralized/sentinel deployments. +/// +/// The most complicated part of this process involves safely handling reconnections. Keyspace events rely on the +/// pubsub interface, and clients are required to subscribe or resubscribe whenever a new connection is created. These +/// examples show how to manually handle reconnections, but the caller can also use the `SubscriberClient` interface +/// to remove some of the boilerplate. +/// +/// If callers do not need the keyspace subscriptions to survive reconnects then the process is more +/// straightforward. +/// +/// Both examples assume that the server has been configured to emit keyspace events (via `notify-keyspace-events`). +#[tokio::main] +async fn main() -> Result<(), RedisError> { + clustered_keyspace_events().await?; + centralized_keyspace_events().await?; + Ok(()) +} + +async fn centralized_keyspace_events() -> Result<(), RedisError> { + let subscriber = Builder::default_centralized().build()?; + + let reconnect_subscriber = subscriber.clone(); + // resubscribe to the foo- prefix whenever we reconnect to a server + let reconnect_task = tokio::spawn(async move { + let mut reconnect_rx = reconnect_subscriber.reconnect_rx(); + + while let Ok(server) = reconnect_rx.recv().await { + println!("Reconnected to {}. Subscribing to keyspace events...", server); + reconnect_subscriber.psubscribe("__key__*:foo*").await?; + } + + Ok::<_, RedisError>(()) + }); + + // connect after setting up the reconnection logic + subscriber.connect(); + subscriber.wait_for_connect().await?; + + let mut keyspace_rx = subscriber.on_keyspace_event(); + // set up a task that listens for keyspace events + let keyspace_task = tokio::spawn(async move { + while let Ok(event) = keyspace_rx.recv().await { + println!( + "Recv: {} on {} in db {}", + event.operation, + event.key.as_str_lossy(), + event.db + ); + } + + Ok::<_, RedisError>(()) + }); + + // generate fake traffic and wait a second + fake_traffic(&subscriber, 1_000).await?; + sleep(Duration::from_secs(1)).await; + subscriber.quit().await?; + keyspace_task.await??; + reconnect_task.await??; + + Ok(()) +} + +async fn clustered_keyspace_events() -> Result<(), RedisError> { + let subscriber = Builder::default_clustered().build()?; + + let reconnect_subscriber = subscriber.clone(); + // resubscribe to the foo- prefix whenever we reconnect to a server + let reconnect_task = tokio::spawn(async move { + let mut reconnect_rx = reconnect_subscriber.reconnect_rx(); + + // in 7.x the reconnection interface added a `Server` struct to reconnect events to make this easier. + while let Ok(server) = reconnect_rx.recv().await { + println!("Reconnected to {}. Subscribing to keyspace events...", server); + reconnect_subscriber + .with_cluster_node(server) + .psubscribe("__key__*:foo*") + .await?; + } + + Ok::<_, RedisError>(()) + }); + + // connect after setting up the reconnection logic + subscriber.connect(); + subscriber.wait_for_connect().await?; + + let mut keyspace_rx = subscriber.on_keyspace_event(); + // set up a task that listens for keyspace events + let keyspace_task = tokio::spawn(async move { + while let Ok(event) = keyspace_rx.recv().await { + println!( + "Recv: {} on {} in db {}", + event.operation, + event.key.as_str_lossy(), + event.db + ); + } + + Ok::<_, RedisError>(()) + }); + + // generate fake traffic and wait a second + fake_traffic(&subscriber, 1_000).await?; + sleep(Duration::from_secs(1)).await; + subscriber.quit().await?; + keyspace_task.await??; + reconnect_task.await??; + + Ok(()) +} diff --git a/examples/pool.rs b/examples/pool.rs index eb5c8251..cf89d18f 100644 --- a/examples/pool.rs +++ b/examples/pool.rs @@ -6,9 +6,14 @@ use fred::prelude::*; #[tokio::main] async fn main() -> Result<(), RedisError> { let pool = Builder::default_centralized().build_pool(5)?; - let _ = pool.connect(); + pool.connect(); pool.wait_for_connect().await?; + // use the pool like other clients + pool.get("foo").await?; + pool.set("foo", "bar", None, None, false).await?; + pool.get("foo").await?; + // interact with specific clients via next(), last(), or clients() let pipeline = pool.next().pipeline(); pipeline.incr("foo").await?; @@ -17,19 +22,8 @@ async fn main() -> Result<(), RedisError> { for client in pool.clients() { println!("{} connected to {:?}", client.id(), client.active_connections().await?); - - // set up event listeners on each client - client.on_error(|error| { - println!("Connection error: {:?}", error); - Ok(()) - }); } - // or use the pool like any other RedisClient - pool.get("foo").await?; - pool.set("foo", "bar", None, None, false).await?; - pool.get("foo").await?; - - let _ = pool.quit().await; + pool.quit().await?; Ok(()) } diff --git a/examples/scan.rs b/examples/scan.rs index 800745e2..99a08326 100644 --- a/examples/scan.rs +++ b/examples/scan.rs @@ -1,39 +1,33 @@ #![allow(clippy::disallowed_names)] #![allow(clippy::let_underscore_future)] +#![allow(dead_code)] use fred::{prelude::*, types::Scanner}; -use futures::stream::StreamExt; - -static COUNT: usize = 50; +use futures::stream::TryStreamExt; async fn create_fake_data(client: &RedisClient) -> Result<(), RedisError> { - for idx in 0 .. COUNT { + for idx in 0 .. 50 { client.set(format!("foo-{}", idx), idx, None, None, false).await?; } Ok(()) } async fn delete_fake_data(client: &RedisClient) -> Result<(), RedisError> { - for idx in 0 .. COUNT { - client.del(format!("foo-{}", idx)).await?; - } + let keys: Vec<_> = (0 .. 50).into_iter().map(|i| format!("foo-{}", i)).collect(); + client.del(keys).await?; Ok(()) } #[tokio::main] async fn main() -> Result<(), RedisError> { let client = RedisClient::default(); - let _ = client.connect(); + client.connect(); client.wait_for_connect().await?; create_fake_data(&client).await?; - // build up a buffer of (key, value) pairs from pages (~10 keys per page) - let mut buffer = Vec::with_capacity(COUNT); + // scan all keys in the keyspace, returning 10 keys per page let mut scan_stream = client.scan("foo*", Some(10), None); - - while let Some(result) = scan_stream.next().await { - let mut page = result.expect("SCAN failed with error"); - + while let Some(mut page) = scan_stream.try_next().await? { if let Some(keys) = page.take_results() { // create a client from the scan result, reusing the existing connection(s) let client = page.create_client(); @@ -41,12 +35,10 @@ async fn main() -> Result<(), RedisError> { for key in keys.into_iter() { let value: RedisValue = client.get(&key).await?; println!("Scanned {} -> {:?}", key.as_str_lossy(), value); - buffer.push((key, value)); } } - // **important:** move on to the next page now that we're done reading the values. or move this before we call - // `get` on each key to scan results in the background as quickly as possible. + // **important:** move on to the next page now that we're done reading the values let _ = page.next(); } @@ -54,3 +46,47 @@ async fn main() -> Result<(), RedisError> { client.quit().await?; Ok(()) } + +/// Example showing how to print the memory usage of all keys in a cluster with a `RedisPool`. +/// +/// When using a clustered deployment the keyspace will be spread across multiple nodes. However, the cursor in each +/// SCAN command is used to iterate over keys within a single node. There are several ways to concurrently scan +/// all keys on all nodes: +/// +/// 1. Use `scan_cluster`. +/// 2. Use `split_cluster` and `scan`. +/// 3. Use `with_cluster_node` and `scan`. +/// +/// The best option depends on several factors, but `scan_cluster` is often the easiest approach for most use +/// cases. +async fn pool_scan_cluster_memory_example(pool: RedisPool) -> Result<(), RedisError> { + // the majority of the client traffic in this scenario comes from the MEMORY USAGE call on each key, so we'll use a + // pool to round-robin these commands among multiple clients. a clustered client with `auto_pipeline: true` can scan + // all nodes in the cluster concurrently, so we use a single client rather than a pool to issue the SCAN calls. + let mut total_size = 0; + // if the pattern contains a hash tag then callers can use `scan` instead of `scan_cluster` + let mut scanner = pool.next().scan_cluster("*", Some(100), None); + + while let Some(mut page) = scanner.try_next().await? { + if let Some(page) = page.take_results() { + // pipeline the `MEMORY USAGE` calls + let pipeline = pool.next().pipeline(); + for key in page.iter() { + let _: () = pipeline.memory_usage(key, Some(0)).await?; + } + let sizes: Vec> = pipeline.all().await?; + assert_eq!(page.len(), sizes.len()); + + for (idx, key) in page.into_iter().enumerate() { + let size = sizes[idx].unwrap_or(0); + println!("{}: {}", key.as_str_lossy(), size); + total_size += size; + } + } + + let _ = page.next(); + } + + println!("Total size: {}", total_size); + Ok(()) +} diff --git a/src/clients/pipeline.rs b/src/clients/pipeline.rs index 5b06a5df..827a369e 100644 --- a/src/clients/pipeline.rs +++ b/src/clients/pipeline.rs @@ -274,10 +274,7 @@ async fn try_send_all( }; if let Resp3Frame::Array { data, .. } = frame { - data - .into_iter() - .map(protocol_utils::frame_to_results) - .collect() + data.into_iter().map(protocol_utils::frame_to_results).collect() } else { vec![protocol_utils::frame_to_results(frame)] } diff --git a/src/commands/impls/lua.rs b/src/commands/impls/lua.rs index dfd06699..b1b834bd 100644 --- a/src/commands/impls/lua.rs +++ b/src/commands/impls/lua.rs @@ -1,4 +1,6 @@ use super::*; +#[cfg(feature = "sha-1")] +use crate::util::sha1_hash; use crate::{ error::*, modules::inner::RedisClientInner, @@ -12,8 +14,6 @@ use crate::{ types::*, utils, }; -#[cfg(feature = "sha-1")] -use crate::util::sha1_hash; use bytes::Bytes; use bytes_utils::Str; use std::{convert::TryInto, str, sync::Arc}; @@ -59,7 +59,7 @@ pub async fn script_load(client: &C, script: Str) -> Result(client: &C, script: Str) -> Result { if !client.inner().config.server.is_clustered() { return script_load(client, script).await; diff --git a/src/commands/impls/sentinel.rs b/src/commands/impls/sentinel.rs index 207e9397..2b08e951 100644 --- a/src/commands/impls/sentinel.rs +++ b/src/commands/impls/sentinel.rs @@ -1,6 +1,7 @@ use super::*; use crate::{ error::RedisError, + protocol::{command::RedisCommandKind, utils as protocol_utils}, router::sentinel::{ CKQUORUM, CONFIG, @@ -19,7 +20,6 @@ use crate::{ SET, SIMULATE_FAILURE, }, - protocol::{command::RedisCommandKind, utils as protocol_utils}, types::*, utils, }; diff --git a/src/commands/impls/strings.rs b/src/commands/impls/strings.rs index 1dac6def..8b137891 100644 --- a/src/commands/impls/strings.rs +++ b/src/commands/impls/strings.rs @@ -1,11 +1 @@ - - - - - - - - - - diff --git a/src/lib.rs b/src/lib.rs index 117e1887..379827bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,6 +123,7 @@ pub mod prelude { PerformanceConfig, ReconnectPolicy, RedisConfig, + RedisKey, RedisValue, RedisValueKind, ServerConfig, diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index a2b07304..b849a570 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -620,10 +620,10 @@ impl RedisTransport { /// Read and cache the connection ID. pub async fn cache_connection_id(&mut self, inner: &Arc) -> Result<(), RedisError> { let command = (RedisCommandKind::ClientID, vec![]).into(); - let result = self.request_response(command, inner.is_resp3()).await?; + let result = self.request_response(command, inner.is_resp3()).await; _debug!(inner, "Read client ID: {:?}", result); self.id = match result { - Resp3Frame::Number { data, .. } => Some(data), + Ok(Resp3Frame::Number { data, .. }) => Some(data), _ => None, }; diff --git a/src/protocol/types.rs b/src/protocol/types.rs index 7f554970..125b0a0b 100644 --- a/src/protocol/types.rs +++ b/src/protocol/types.rs @@ -1,15 +1,3 @@ -use super::utils as protocol_utils; -use crate::{ - error::{RedisError, RedisErrorKind}, - modules::inner::RedisClientInner, - protocol::{cluster, utils::server_to_parts}, - types::*, - utils, -}; -use bytes_utils::Str; -use rand::Rng; -pub use redis_protocol::{redis_keyslot, resp2::types::NULL, types::CRLF}; -use redis_protocol::{resp2::types::Frame as Resp2Frame, resp2_frame_to_resp3, resp3::types::Frame as Resp3Frame}; use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet, HashMap}, @@ -19,8 +7,23 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, sync::Arc, }; + +use bytes_utils::Str; +use rand::Rng; +#[allow(unused_imports)] +pub use redis_protocol::{redis_keyslot, resp2::types::NULL, types::CRLF}; +use redis_protocol::{resp2::types::Frame as Resp2Frame, resp2_frame_to_resp3, resp3::types::Frame as Resp3Frame}; use tokio::sync::mpsc::UnboundedSender; +use super::utils as protocol_utils; +use crate::{ + error::{RedisError, RedisErrorKind}, + modules::inner::RedisClientInner, + protocol::{cluster, utils::server_to_parts}, + types::*, + utils, +}; + pub const REDIS_CLUSTER_SLOTS: u16 = 16384; #[cfg(any(feature = "enable-rustls", feature = "enable-native-tls"))] diff --git a/src/router/reader.rs b/src/router/reader.rs index e69de29b..8b137891 100644 --- a/src/router/reader.rs +++ b/src/router/reader.rs @@ -0,0 +1 @@ + diff --git a/src/trace/disabled.rs b/src/trace/disabled.rs index f4f18641..de433a7a 100644 --- a/src/trace/disabled.rs +++ b/src/trace/disabled.rs @@ -15,9 +15,7 @@ pub struct Span {} #[cfg(not(feature = "full-tracing"))] impl Span { - pub fn enter(&self) { - - } + pub fn enter(&self) {} pub fn record(&self, _field: &Q, _value: &V) -> &Self { self diff --git a/src/types/args.rs b/src/types/args.rs index bdcb7f9a..113ae289 100644 --- a/src/types/args.rs +++ b/src/types/args.rs @@ -1,14 +1,3 @@ -use crate::{ - error::{RedisError, RedisErrorKind}, - interfaces::{ClientLike, Resp3Frame}, - protocol::{connection::OK, utils as protocol_utils}, - types::{FromRedis, FromRedisKey, GeoPosition, XReadResponse, XReadValue, QUEUED}, - utils, -}; -use bytes::Bytes; -use bytes_utils::Str; -use float_cmp::approx_eq; -use redis_protocol::resp2::types::NULL; use std::{ borrow::Cow, collections::{BTreeMap, HashMap, HashSet, VecDeque}, @@ -21,31 +10,42 @@ use std::{ str, }; -use crate::types::{Function, GeoRadiusInfo, Server}; +use bytes::Bytes; +use bytes_utils::Str; +use float_cmp::approx_eq; +use redis_protocol::resp2::types::NULL; #[cfg(feature = "serde-json")] use serde_json::Value; +use crate::{ + error::{RedisError, RedisErrorKind}, + interfaces::{ClientLike, Resp3Frame}, + protocol::{connection::OK, utils as protocol_utils}, + types::{FromRedis, FromRedisKey, Function, GeoPosition, GeoRadiusInfo, Server, XReadResponse, XReadValue, QUEUED}, + utils, +}; + static_str!(TRUE_STR, "true"); static_str!(FALSE_STR, "false"); macro_rules! impl_string_or_number( - ($t:ty) => { - impl From<$t> for StringOrNumber { - fn from(val: $t) -> Self { - StringOrNumber::Number(val as i64) - } + ($t:ty) => { + impl From<$t> for StringOrNumber { + fn from(val: $t) -> Self { + StringOrNumber::Number(val as i64) + } + } } - } ); macro_rules! impl_from_str_for_redis_key( - ($t:ty) => { - impl From<$t> for RedisKey { - fn from(val: $t) -> Self { - RedisKey { key: val.to_string().into() } - } + ($t:ty) => { + impl From<$t> for RedisKey { + fn from(val: $t) -> Self { + RedisKey { key: val.to_string().into() } + } + } } - } ); /// An argument representing a string or number. @@ -1436,6 +1436,12 @@ impl From for RedisValue { } } +impl From<&Box<[u8]>> for RedisValue { + fn from(b: &Box<[u8]>) -> Self { + b.into() + } +} + impl From> for RedisValue { fn from(b: Box<[u8]>) -> Self { RedisValue::Bytes(b.into()) diff --git a/src/types/config.rs b/src/types/config.rs index 40c7aec6..b8fb1004 100644 --- a/src/types/config.rs +++ b/src/types/config.rs @@ -666,7 +666,7 @@ impl RedisConfig { let (url, host, port, _tls) = utils::parse_url(url, Some(6379))?; let server = ServerConfig::new_centralized(host, port); let database = utils::parse_url_db(&url)?; - let (username, password) = utils::parse_url_credentials(&url); + let (username, password) = utils::parse_url_credentials(&url)?; Ok(RedisConfig { server, @@ -704,7 +704,7 @@ impl RedisConfig { let mut cluster_nodes = utils::parse_url_other_nodes(&url)?; cluster_nodes.push(Server::new(host, port)); let server = ServerConfig::Clustered { hosts: cluster_nodes }; - let (username, password) = utils::parse_url_credentials(&url); + let (username, password) = utils::parse_url_credentials(&url)?; Ok(RedisConfig { server, @@ -754,7 +754,7 @@ impl RedisConfig { let mut other_nodes = utils::parse_url_other_nodes(&url)?; other_nodes.push(Server::new(host, port)); let service_name = utils::parse_url_sentinel_service_name(&url)?; - let (username, password) = utils::parse_url_credentials(&url); + let (username, password) = utils::parse_url_credentials(&url)?; let database = utils::parse_url_db(&url)?; let server = ServerConfig::Sentinel { hosts: other_nodes, diff --git a/src/types/from_tuple.rs b/src/types/from_tuple.rs new file mode 100644 index 00000000..eaf203c7 --- /dev/null +++ b/src/types/from_tuple.rs @@ -0,0 +1,36 @@ +use crate::types::{MultipleKeys, RedisKey, RedisValue}; + +macro_rules! tuple2val { + ($($id:tt $ty:ident);+) => { +impl<$($ty: Into),+ > From<($($ty),+)> for RedisValue { + fn from(value: ($($ty),+)) -> Self { + RedisValue::Array(vec![$(value.$id.into()),+]) + } +} + +impl<$($ty: Into),+ > From<($($ty),+)> for MultipleKeys { + fn from(value: ($($ty),+)) -> Self { + Self{keys:vec![$(value.$id.into()),+]} + } +} + }; +} + +tuple2val!(0 A0; 1 A1); +tuple2val!(0 A0; 1 A1; 2 A2); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10; 11 A11); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10; 11 A11; 12 A12); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10; 11 A11; 12 A12; 13 A13); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10; 11 A11; 12 A12; 13 A13; 14 A14); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10; 11 A11; 12 A12; 13 A13; 14 A14; 15 +A15); +tuple2val!(0 A0; 1 A1; 2 A2; 3 A3; 4 A4; 5 A5; 6 A6; 7 A7; 8 A8; 9 A9; 10 A10; 11 A11; 12 A12; 13 A13; 14 +A14; 15 A15; 16 A16); diff --git a/src/types/mod.rs b/src/types/mod.rs index b03bb287..0c85738e 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -8,6 +8,7 @@ mod builder; mod client; mod cluster; mod config; +mod from_tuple; mod geo; mod lists; mod misc; diff --git a/src/types/multiple.rs b/src/types/multiple.rs index 3e4257bc..16708c8b 100644 --- a/src/types/multiple.rs +++ b/src/types/multiple.rs @@ -7,7 +7,7 @@ use std::{collections::VecDeque, iter::FromIterator}; /// `Into`.** This is mostly useful for `EVAL` and `EVALSHA`. #[derive(Clone, Debug, Eq, PartialEq)] pub struct MultipleKeys { - keys: Vec, + pub(crate) keys: Vec, } impl MultipleKeys { diff --git a/src/utils.rs b/src/utils.rs index a65befa9..d71cc3a4 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -42,6 +42,7 @@ use tokio::{ time::sleep, }; use url::Url; +use urlencoding::decode as percent_decode; #[cfg(any(feature = "enable-native-tls", feature = "enable-rustls"))] use crate::protocol::tls::{TlsConfig, TlsConnector}; @@ -565,7 +566,11 @@ where } pub fn add_jitter(delay: u64, jitter: u32) -> u64 { - delay.saturating_add(rand::thread_rng().gen_range(0 .. jitter as u64)) + if jitter == 0 { + delay + } else { + delay.saturating_add(rand::thread_rng().gen_range(0 .. jitter as u64)) + } } pub fn into_redis_map(mut iter: I) -> Result, RedisError> @@ -728,15 +733,21 @@ pub fn parse_url_db(url: &Url) -> Result, RedisError> { Ok(Some(parts[0].parse()?)) } -pub fn parse_url_credentials(url: &Url) -> (Option, Option) { +pub fn parse_url_credentials(url: &Url) -> Result<(Option, Option), RedisError> { let username = if url.username().is_empty() { None } else { - Some(url.username().to_owned()) + let username = percent_decode(url.username())?; + Some(username.into_owned()) + }; + let password = percent_decode(url.password().unwrap_or_default())?; + let password = if password.is_empty() { + None + } else { + Some(password.into_owned()) }; - let password = url.password().map(|s| s.to_owned()); - (username, password) + Ok((username, password)) } pub fn parse_url_other_nodes(url: &Url) -> Result, RedisError> { @@ -832,6 +843,11 @@ mod tests { RedisValue::Array(v) } + #[test] + fn should_not_panic_with_zero_jitter() { + assert_eq!(add_jitter(10, 0), 10); + } + #[test] fn should_flatten_xread_example() { // 127.0.0.1:6379> xread count 2 streams foo bar 1643479648480-0 1643479834990-0 @@ -877,4 +893,31 @@ mod tests { assert_eq!(flatten_nested_array_values(actual, 1), expected); } + + #[test] + fn should_parse_url_credentials_no_creds() { + let url = Url::parse("redis://localhost:6379").unwrap(); + let (username, password) = parse_url_credentials(&url).unwrap(); + + assert_eq!(username, None); + assert_eq!(password, None); + } + + #[test] + fn should_parse_url_credentials_with_creds() { + let url = Url::parse("redis://default:abc123@localhost:6379").unwrap(); + let (username, password) = parse_url_credentials(&url).unwrap(); + + assert_eq!(username.unwrap(), "default"); + assert_eq!(password.unwrap(), "abc123"); + } + + #[test] + fn should_parse_url_credentials_with_percent_encoded_creds() { + let url = Url::parse("redis://default:abc%2F123@localhost:6379").unwrap(); + let (username, password) = parse_url_credentials(&url).unwrap(); + + assert_eq!(username.unwrap(), "default"); + assert_eq!(password.unwrap(), "abc/123"); + } } diff --git a/tests/integration/client/mod.rs b/tests/integration/client/mod.rs index e69de29b..8b137891 100644 --- a/tests/integration/client/mod.rs +++ b/tests/integration/client/mod.rs @@ -0,0 +1 @@ + diff --git a/tests/integration/sets/mod.rs b/tests/integration/sets/mod.rs index 203f4d6b..bbc08920 100644 --- a/tests/integration/sets/mod.rs +++ b/tests/integration/sets/mod.rs @@ -9,13 +9,11 @@ fn vec_to_set(data: Vec) -> HashSet { out } -/* -#[cfg(feature = "index-map")] -fn sets_eq(lhs: &IndexSet, rhs: &HashSet) -> bool { - let lhs: HashSet = lhs.iter().map(|v| v.clone()).collect(); - &lhs == rhs -} - */ +// #[cfg(feature = "index-map")] +// fn sets_eq(lhs: &IndexSet, rhs: &HashSet) -> bool { +// let lhs: HashSet = lhs.iter().map(|v| v.clone()).collect(); +// &lhs == rhs +// } fn sets_eq(lhs: &HashSet, rhs: &HashSet) -> bool { lhs == rhs