diff --git a/CHANGELOG.md b/CHANGELOG.md index f724a135..7fcf37c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 9.4.0 + +* Change scanning functions to automatically continue when the current page is dropped +* Add `scan_buffered` and `scan_cluster_buffered` interfaces +* Add `specialize-into-bytes` feature flag + ## 9.3.0 * Add `SETNX`, `ECHO`, `TYPE`, `EXPIRETIME`, and `PEXPIRETIME` commands diff --git a/Cargo.toml b/Cargo.toml index 0aacfdfc..ac9c01a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ name = "fred" readme = "README.md" repository = "https://github.com/aembke/fred.rs" rust-version = "1.75" -version = "9.3.0" +version = "9.4.0" [package.metadata.docs.rs] # do not show the glommio version of the docs @@ -33,7 +33,8 @@ features = [ "enable-rustls", "enable-native-tls", "full-tracing", - "credential-provider" + "credential-provider", + "specialize-into-bytes" ] rustdoc-args = ["--cfg", "docsrs"] @@ -45,6 +46,7 @@ test = true [features] default = ["transactions", "i-std"] +specialize-into-bytes = [] blocking-encoding = ["tokio/rt-multi-thread"] custom-reconnect-errors = [] default-nil-types = [] @@ -64,6 +66,12 @@ credential-provider = [] # Enable experimental support for the Glommio runtime. glommio = ["dep:glommio", "futures-io", "pin-project", "fred-macros/enabled", "oneshot", "futures-lite"] +# Enable experimental support for the Monoio runtime. +monoio = ["dep:monoio", "monoio-codec", "fred-macros/enabled", "oneshot", "futures-lite", "local-sync"] +monoio-native-tls = ["dep:monoio-native-tls"] +monoio-rustls = ["dep:monoio-rustls"] +# [WIP] Enable experimental support for the Smol runtime. +smol = [] # Enables rustls with the rustls/aws_lc_rs crypto backend enable-rustls = [ @@ -177,6 +185,15 @@ semver = "1.0" serde_json = { version = "1", optional = true } sha-1 = { version = "0.10", optional = true } socket2 = "0.5" +tracing = { version = "0.1", optional = true } +tracing-futures = { version = "0.2", optional = true } +url = "2.4" +urlencoding = "2.1" +# DNS Features +trust-dns-resolver = { version = "0.23", optional = true, features = ["tokio"] } +hickory-resolver = { version = "0.24.1", optional = true, features = ["tokio"] } +fred-macros = "0.1" +# Tokio Dependencies tokio = { version = "1.34", features = [ "net", "sync", @@ -188,18 +205,18 @@ tokio-native-tls = { version = "0.3", optional = true } tokio-rustls = { version = "0.26", optional = true, default-features = false } tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec"] } -tracing = { version = "0.1", optional = true } -tracing-futures = { version = "0.2", optional = true } -trust-dns-resolver = { version = "0.23", optional = true, features = ["tokio"] } -hickory-resolver = { version = "0.24.1", optional = true, features = ["tokio"] } -url = "2.4" -urlencoding = "2.1" -fred-macros = "0.1" +# Glommio Dependencies glommio = { version = "0.9.0", optional = true } futures-io = { version = "0.3", optional = true } pin-project = { version = "1.1.5", optional = true } oneshot = { version = "0.1.8", optional = true, features = ["async"] } futures-lite = { version = "2.3", optional = true } +# Monoio Dependencies +monoio = { version = "0.2.4", optional = true, features = ["bytes"] } +monoio-codec = { version = "0.3.4", optional = true } +monoio-native-tls = { version = "0.4.0", optional = true } +monoio-rustls = { version = "0.4.0", optional = true } +local-sync = { version = "0.1.1", optional = true } [dev-dependencies] axum = { version = "0.7", features = ["macros"] } diff --git a/README.md b/README.md index 1b3f8051..11e73685 100644 --- a/README.md +++ b/README.md @@ -58,31 +58,32 @@ See the build features for more information. ## Client Features -| Name | Default | Description | -|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `transactions` | x | Enable a [Transaction](https://redis.io/docs/interact/transactions/) interface. | -| `enable-native-tls` | | Enable TLS support via [native-tls](https://crates.io/crates/native-tls). | -| `enable-rustls` | | Enable TLS support via [rustls](https://crates.io/crates/rustls) with the default crypto backend features. | -| `enable-rustls-ring` | | Enable TLS support via [rustls](https://crates.io/crates/rustls) and the ring crypto backend. | -| `vendored-openssl` | | Enable the `native-tls/vendored` feature. | -| `metrics` | | Enable the metrics interface to track overall latency, network latency, and request/response sizes. | -| `full-tracing` | | Enable full [tracing](./src/trace/README.md) support. This can emit a lot of data. | -| `partial-tracing` | | Enable partial [tracing](./src/trace/README.md) support, only emitting traces for top level commands and network latency. | -| `blocking-encoding` | | Use a blocking task for encoding or decoding frames. This can be useful for clients that send or receive large payloads, but requires a multi-thread Tokio runtime. | -| `custom-reconnect-errors` | | Enable an interface for callers to customize the types of errors that should automatically trigger reconnection logic. | -| `monitor` | | Enable an interface for running the `MONITOR` command. | -| `sentinel-client` | | Enable an interface for communicating directly with Sentinel nodes. This is not necessary to use normal Redis clients behind a sentinel layer. | -| `sentinel-auth` | | Enable an interface for using different authentication credentials to sentinel nodes. | -| `subscriber-client` | | Enable a subscriber client interface that manages channel subscription state for callers. | -| `serde-json` | | Enable an interface to automatically convert Redis types to JSON via `serde-json`. | -| `mocks` | | Enable a mocking layer interface that can be used to intercept and process commands in tests. | -| `dns` | | Enable an interface that allows callers to override the DNS lookup logic. | -| `replicas` | | Enable an interface that routes commands to replica nodes. | -| `default-nil-types` | | Enable a looser parsing interface for `nil` values. | -| `sha-1` | | Enable an interface for hashing Lua scripts. | -| `unix-sockets` | | Enable Unix socket support. | -| `glommio` | | Enable experimental [Glommio](https://github.com/DataDog/glommio) support. See the [Glommio Runtime](https://github.com/aembke/fred.rs/blob/main/src/glommio/README.md) docs for more information. When enabled the client will no longer work with Tokio runtimes. | -| `credential-provider` | | Enable an interface that can dynamically load auth credentials at runtime. | +| Name | Default | Description | +|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `transactions` | x | Enable a [Transaction](https://redis.io/docs/interact/transactions/) interface. | +| `enable-native-tls` | | Enable TLS support via [native-tls](https://crates.io/crates/native-tls). | +| `enable-rustls` | | Enable TLS support via [rustls](https://crates.io/crates/rustls) with the default crypto backend features. | +| `enable-rustls-ring` | | Enable TLS support via [rustls](https://crates.io/crates/rustls) and the ring crypto backend. | +| `vendored-openssl` | | Enable the `native-tls/vendored` feature. | +| `metrics` | | Enable the metrics interface to track overall latency, network latency, and request/response sizes. | +| `full-tracing` | | Enable full [tracing](./src/trace/README.md) support. This can emit a lot of data. | +| `partial-tracing` | | Enable partial [tracing](./src/trace/README.md) support, only emitting traces for top level commands and network latency. | +| `blocking-encoding` | | Use a blocking task for encoding or decoding frames. This can be useful for clients that send or receive large payloads, but requires a multi-thread Tokio runtime. | +| `custom-reconnect-errors` | | Enable an interface for callers to customize the types of errors that should automatically trigger reconnection logic. | +| `monitor` | | Enable an interface for running the `MONITOR` command. | +| `sentinel-client` | | Enable an interface for communicating directly with Sentinel nodes. This is not necessary to use normal Redis clients behind a sentinel layer. | +| `sentinel-auth` | | Enable an interface for using different authentication credentials to sentinel nodes. | +| `subscriber-client` | | Enable a subscriber client interface that manages channel subscription state for callers. | +| `serde-json` | | Enable an interface to automatically convert Redis types to JSON via `serde-json`. | +| `mocks` | | Enable a mocking layer interface that can be used to intercept and process commands in tests. | +| `dns` | | Enable an interface that allows callers to override the DNS lookup logic. | +| `replicas` | | Enable an interface that routes commands to replica nodes. | +| `default-nil-types` | | Enable a looser parsing interface for `nil` values. | +| `sha-1` | | Enable an interface for hashing Lua scripts. | +| `unix-sockets` | | Enable Unix socket support. | +| `credential-provider` | | Enable an interface that can dynamically load auth credentials at runtime. | +| `specialize-into-bytes` | | Specialize `TryFrom for RedisValue>` to use `RedisValue::Bytes`, disabling `From for RedisValue`. This is a temporary feature flag that will be made the default in the next major version. | +| `glommio` | | Enable experimental [Glommio](https://github.com/DataDog/glommio) support. | ## Interface Features diff --git a/bin/benchmark/Cargo.toml b/bin/benchmark/Cargo.toml index 60a2bf9e..e6a4d531 100644 --- a/bin/benchmark/Cargo.toml +++ b/bin/benchmark/Cargo.toml @@ -23,12 +23,12 @@ tokio = { version = "1", features = ["full"] } futures = "0.3" rand = "0.8" indicatif = "=0.17.1" -bb8-redis = { version = "0.14", optional = true } +bb8-redis = { version = "0.17.0", optional = true } [dependencies.fred] #path = "../.." path = "/fred" -features = ["replicas", "unix-sockets"] +features = ["replicas", "unix-sockets", "i-all"] default-features = false [features] diff --git a/bin/benchmark/README.md b/bin/benchmark/README.md index b4e83984..eb4e137a 100644 --- a/bin/benchmark/README.md +++ b/bin/benchmark/README.md @@ -5,8 +5,8 @@ Redis includes a [benchmarking tool](https://redis.io/docs/management/optimizati measure the throughput of a client/connection pool. This module attempts to reproduce the same process with Tokio and Fred. -The general strategy involves using an atomic global counter and spawning `-c` Tokio tasks that share`-P` clients -in order to send `-n` total `INCR` commands to the server as quickly as possible. +The general strategy involves using an atomic global counter and spawning `-c` Tokio tasks that share `-P` clients in +order to send `-n` total `INCR` commands to the server as quickly as possible. Each of the `-c` Tokio tasks use a different random key so commands are uniformly distributed across a cluster or replica set. @@ -106,21 +106,21 @@ With `auto_pipeline` **disabled**: ``` $ ./run.sh --cluster -c 10000 -n 10000000 -P 15 -h redis-cluster-1 -p 30001 -a bar no-pipeline -Performed 10000000 operations in: 31.496934107s. Throughput: 317500 req/sec +Performed 10000000 operations in: 27.038434665s. Throughput: 369849 req/sec ``` With `auto_pipeline` **enabled**: ``` $ ./run.sh --cluster -c 10000 -n 10000000 -P 15 -h redis-cluster-1 -p 30001 -a bar pipeline -Performed 10000000 operations in: 4.125544401s. Throughput: 2424242 req/sec +Performed 10000000 operations in: 3.728232639s. Throughput: 2682403 req/sec ``` With `auto_pipeline` **enabled** and using `GET` with replica nodes instead of `INCR` with primary nodes: ``` $ ./run.sh --cluster -c 10000 -n 10000000 -P 15 -h redis-cluster-1 -p 30001 -a bar --replicas pipeline -Performed 10000000 operations in: 3.356416674s. Throughput: 2979737 req/sec +erformed 10000000 operations in: 3.234255482s. Throughput: 3092145 req/sec ``` Maybe Relevant Specs: @@ -134,7 +134,7 @@ The `USE_REDIS_RS` environment variable can be toggled to [switch the benchmark use `redis-rs` instead of `fred`. There's also an `info` level log line that can confirm this at runtime. The `redis-rs` variant uses the same general strategy, but with [bb8-redis](https://crates.io/crates/bb8-redis) ( -specifically `Pool`) instead of `fred::clients::RedisPool`. All the other components +specifically `Pool`) instead of `fred::clients::RedisPool`. All the other components in the benchmark logic are the same. ### Examples @@ -151,13 +151,13 @@ These examples use the following parameters: ``` # fred without `auto_pipeline` $ ./run.sh -h redis-main -p 6379 -a bar -n 10000000 -P 15 -c 10000 no-pipeline -Performed 10000000 operations in: 62.79547495s. Throughput: 159248 req/sec +Performed 10000000 operations in: 52.156700826s. Throughput: 191732 req/sec # redis-rs via bb8-redis $ USE_REDIS_RS=1 ./run.sh -h redis-main -p 6379 -a bar -n 10000000 -P 15 -c 10000 -Performed 10000000 operations in: 62.583055519s. Throughput: 159787 req/sec +Performed 10000000 operations in: 102.953612933s. Throughput: 97131 req/sec # fred with `auto_pipeline` $ ./run.sh -h redis-main -p 6379 -a bar -n 10000000 -P 15 -c 10000 pipeline -Performed 10000000 operations in: 5.882182708s. Throughput: 1700102 req/sec +Performed 10000000 operations in: 5.74236423s. Throughput: 1741553 req/sec ``` \ No newline at end of file diff --git a/bin/benchmark/src/_fred.rs b/bin/benchmark/src/_fred.rs index ffc607ed..99cd028e 100644 --- a/bin/benchmark/src/_fred.rs +++ b/bin/benchmark/src/_fred.rs @@ -8,6 +8,7 @@ use std::{ }; use tokio::task::JoinHandle; +use fred::types::ClusterDiscoveryPolicy; #[cfg(any( feature = "enable-rustls", feature = "enable-native-tls", @@ -42,7 +43,8 @@ pub async fn init(argv: &Arc) -> Result { } } else if argv.cluster { ServerConfig::Clustered { - hosts: vec![Server::new(&argv.host, argv.port)], + hosts: vec![Server::new(&argv.host, argv.port)], + policy: ClusterDiscoveryPolicy::ConfigEndpoint, } } else { ServerConfig::new_centralized(&argv.host, argv.port) @@ -122,7 +124,7 @@ pub async fn run(argv: Arc, counter: Arc, bar: Option, key: &str) -> i64 { +async fn incr_key(pool: &Pool, key: &str) -> i64 { let mut conn = pool.get().await.map_err(utils::crash).unwrap(); cmd("INCR") .arg(key) @@ -24,7 +24,7 @@ async fn incr_key(pool: &Pool, key: &str) -> .unwrap() } -async fn del_key(pool: &Pool, key: &str) -> i64 { +async fn del_key(pool: &Pool, key: &str) -> i64 { let mut conn = pool.get().await.map_err(utils::crash).unwrap(); cmd("DEL") .arg(key) @@ -36,7 +36,7 @@ async fn del_key(pool: &Pool, key: &str) -> i fn spawn_client_task( bar: &Option, - pool: &Pool, + pool: &Pool, counter: &Arc, argv: &Arc, ) -> JoinHandle<()> { @@ -66,7 +66,7 @@ fn spawn_client_task( } // TODO support clustered deployments -async fn init(argv: &Arc) -> Pool { +async fn init(argv: &Arc) -> Pool { let (username, password) = utils::read_auth_env(); let url = if let Some(password) = password { let username = username.map(|s| format!("{s}:")).unwrap_or("".into()); @@ -76,7 +76,7 @@ async fn init(argv: &Arc) -> Pool { }; debug!("Redis conn: {}", url); - let manager = RedisMultiplexedConnectionManager::new(url).expect("Failed to create redis connection manager"); + let manager = RedisConnectionManager::new(url).expect("Failed to create redis connection manager"); let pool = bb8::Pool::builder() .max_size(argv.pool as u32) .build(manager) @@ -85,7 +85,7 @@ async fn init(argv: &Arc) -> Pool { // try to warm up the pool first let mut warmup_ft = Vec::with_capacity(argv.pool + 1); - for _ in 0..argv.pool + 1 { + for _ in 0 .. argv.pool + 1 { warmup_ft.push(async { incr_key(&pool, "foo").await }); } futures::future::join_all(warmup_ft).await; @@ -105,7 +105,7 @@ pub async fn run(argv: Arc, counter: Arc, bar: Option, + pub sentinel_auth: Option, } fn parse_argv() -> Argv { @@ -68,6 +70,8 @@ fn parse_argv() -> Argv { .map(|v| v.parse::().expect("Invalid wait")) .unwrap_or(0); let auth = matches.value_of("auth").map(|v| v.to_owned()).unwrap_or("".into()); + let sentinel = matches.value_of("sentinel").map(|v| v.to_owned()); + let sentinel_auth = matches.value_of("sentinel-auth").map(|v| v.to_owned()); Argv { cluster, @@ -79,6 +83,8 @@ fn parse_argv() -> Argv { wait, replicas, tracing, + sentinel, + sentinel_auth, } } @@ -155,7 +161,16 @@ async fn main() -> Result<(), RedisError> { server: if argv.cluster { ServerConfig::new_clustered(vec![(&argv.host, argv.port)]) } else { - ServerConfig::new_centralized(&argv.host, argv.port) + if let Some(sentinel) = argv.sentinel.as_ref() { + ServerConfig::Sentinel { + service_name: sentinel.to_string(), + hosts: vec![Server::new(&argv.host, argv.port)], + password: argv.sentinel_auth.clone(), + username: None, + } + } else { + ServerConfig::new_centralized(&argv.host, argv.port) + } }, password: if argv.auth.is_empty() { None @@ -168,12 +183,12 @@ async fn main() -> Result<(), RedisError> { .with_connection_config(|config| { config.max_command_attempts = 3; config.unresponsive = UnresponsiveConfig { - interval: Duration::from_secs(1), + interval: Duration::from_secs(1), max_timeout: Some(Duration::from_secs(5)), }; config.connection_timeout = Duration::from_secs(3); config.internal_command_timeout = Duration::from_secs(2); - config.cluster_cache_update_delay = Duration::from_secs(20); + //config.cluster_cache_update_delay = Duration::from_secs(20); if argv.replicas { config.replica = ReplicaConfig { lazy_connections: true, diff --git a/examples/README.md b/examples/README.md index 45438553..6f533dae 100644 --- a/examples/README.md +++ b/examples/README.md @@ -27,6 +27,6 @@ Examples * [Misc](./misc.rs) - Miscellaneous or advanced features. * [Replicas](./replicas.rs) - Interact with cluster replica nodes via a `RedisPool`. * [Glommio](./glommio.rs) - Use the [Glommio](https://github.com/DataDog/glommio) runtime. - See [the source docs](../src/glommio/README.md) for more information. + See [the source docs](../src/runtime/glommio/README.md) for more information. Or see the [tests](../tests/integration) for more examples. \ No newline at end of file diff --git a/src/clients/pipeline.rs b/src/clients/pipeline.rs index 952092b7..eaa6b34a 100644 --- a/src/clients/pipeline.rs +++ b/src/clients/pipeline.rs @@ -14,14 +14,7 @@ use crate::{ use std::{collections::VecDeque, fmt, fmt::Formatter}; fn clone_buffered_commands(buffer: &Mutex>) -> VecDeque { - let guard = buffer.lock(); - let mut out = VecDeque::with_capacity(guard.len()); - - for command in guard.iter() { - out.push_back(command.duplicate(ResponseKind::Skip)); - } - - out + buffer.lock().iter().map(|c| c.duplicate(ResponseKind::Skip)).collect() } fn prepare_all_commands( diff --git a/src/clients/redis.rs b/src/clients/redis.rs index 1057e212..89c14acc 100644 --- a/src/clients/redis.rs +++ b/src/clients/redis.rs @@ -191,6 +191,9 @@ impl RedisClient { /// /// The scan operation can be canceled by dropping the returned stream. /// + /// See [scan_buffered](Self::scan_buffered) or [scan_cluster_buffered](Self::scan_cluster_buffered) for + /// alternatives that automatically continue scanning in the background. + /// /// pub fn scan

( &self, @@ -204,6 +207,28 @@ impl RedisClient { commands::scan::scan(&self.inner, pattern.into(), count, r#type, None) } + /// Scan the keys in the keyspace, buffering all results in memory as quickly as the server returns them. + /// + /// This function should be used with care as it can result in the caller buffering the entire keyspace in memory if + /// results are not processed quickly. Additionally, since results are paged in the background the cursor is not + /// exposed to the caller with each page of results. + /// + /// See [scan](Self::scan) or [scan_cluster](Self::scan_cluster) for alternatives that allow callers to control the + /// rate at which pages are scanned. + /// + /// + pub fn scan_buffered

( + &self, + pattern: P, + count: Option, + r#type: Option, + ) -> impl Stream> + where + P: Into, + { + commands::scan::scan_buffered(&self.inner, pattern.into(), count, r#type, None) + } + /// Run the `SCAN` command on each primary/main node in a cluster concurrently. /// /// In order for this function to work reliably the cluster state must not change while scanning. If nodes are added @@ -213,6 +238,9 @@ impl RedisClient { /// /// Unlike `SCAN`, `HSCAN`, etc, the returned stream may continue even if /// [has_more](crate::types::ScanResult::has_more) returns false on a given page of keys. + /// + /// See [scan_buffered](Self::scan_buffered) or [scan_cluster_buffered](Self::scan_cluster_buffered) for + /// alternatives that automatically continue scanning in the background. pub fn scan_cluster

( &self, pattern: P, @@ -225,6 +253,29 @@ impl RedisClient { commands::scan::scan_cluster(&self.inner, pattern.into(), count, r#type) } + /// Scan the keys in the keyspace concurrently across all nodes in the cluster, buffering all results in memory as + /// quickly as the server returns them. + /// + /// This function should be used with care as it can result in the caller buffering the entire keyspace in memory if + /// results are not processed quickly. Additionally, since results are paged in the background the cursor is not + /// exposed to the caller with each page of results. + /// + /// See [scan](Self::scan) or [scan_cluster](Self::scan_cluster) for alternatives that allow callers to control the + /// rate at which pages are scanned. + /// + /// + pub fn scan_cluster_buffered

( + &self, + pattern: P, + count: Option, + r#type: Option, + ) -> impl Stream> + where + P: Into, + { + commands::scan::scan_cluster_buffered(&self.inner, pattern.into(), count, r#type) + } + /// Incrementally iterate over pages of the hash map stored at `key`, returning `count` results per page, if /// specified. /// diff --git a/src/commands/impls/acl.rs b/src/commands/impls/acl.rs index 32b36f6c..f15a2e8d 100644 --- a/src/commands/impls/acl.rs +++ b/src/commands/impls/acl.rs @@ -29,12 +29,8 @@ pub async fn acl_setuser(client: &C, username: Str, rules: Multip protocol_utils::expect_ok(&response) } -pub async fn acl_getuser(client: &C, username: Str) -> Result { - let frame = utils::request_response(client, move || { - Ok((RedisCommandKind::AclGetUser, vec![username.into()])) - }) - .await?; - protocol_utils::frame_to_results(frame) +pub async fn acl_getuser(client: &C, username: RedisValue) -> Result { + one_arg_value_cmd(client, RedisCommandKind::AclGetUser, username).await } pub async fn acl_deluser(client: &C, usernames: MultipleKeys) -> Result { diff --git a/src/commands/impls/keys.rs b/src/commands/impls/keys.rs index 19af508f..31cb0f55 100644 --- a/src/commands/impls/keys.rs +++ b/src/commands/impls/keys.rs @@ -400,7 +400,7 @@ pub async fn copy( if let Some(db) = db { args.push(static_val!(DB)); - args.push(db.into()); + args.push((db as i64).into()); } if replace { args.push(static_val!(REPLACE)); diff --git a/src/commands/impls/scan.rs b/src/commands/impls/scan.rs index 9a31050e..347bccfd 100644 --- a/src/commands/impls/scan.rs +++ b/src/commands/impls/scan.rs @@ -35,6 +35,21 @@ fn values_args(key: RedisKey, pattern: Str, count: Option) -> Vec, pattern: Str, count: Option, r#type: Option) { + args.push(static_val!(STARTING_CURSOR)); + args.push(static_val!(MATCH)); + args.push(pattern.into()); + + if let Some(count) = count { + args.push(static_val!(COUNT)); + args.push(count.into()); + } + if let Some(r#type) = r#type { + args.push(static_val!(TYPE)); + args.push(r#type.to_str().into()); + } +} + pub fn scan_cluster( inner: &RefCount, pattern: Str, @@ -55,22 +70,51 @@ pub fn scan_cluster( }; let mut args = Vec::with_capacity(7); - args.push(static_val!(STARTING_CURSOR)); - args.push(static_val!(MATCH)); - args.push(pattern.into()); + create_scan_args(&mut args, pattern, count, r#type); + for slot in hash_slots.into_iter() { + _trace!(inner, "Scan cluster hash slot server: {}", slot); + let response = ResponseKind::KeyScan(KeyScanInner { + hash_slot: Some(slot), + args: args.clone(), + cursor_idx: 0, + tx: tx.clone(), + server: None, + }); + let command: RedisCommand = (RedisCommandKind::Scan, Vec::new(), response).into(); - if let Some(count) = count { - args.push(static_val!(COUNT)); - args.push(count.into()); - } - if let Some(r#type) = r#type { - args.push(static_val!(TYPE)); - args.push(r#type.to_str().into()); + if let Err(e) = interfaces::default_send_command(inner, command) { + let _ = tx.send(Err(e)); + break; + } } + rx_stream(rx) +} + +pub fn scan_cluster_buffered( + inner: &RefCount, + pattern: Str, + count: Option, + r#type: Option, +) -> impl Stream> { + let (tx, rx) = unbounded_channel(); + #[cfg(feature = "glommio")] + let tx: UnboundedSender<_> = tx.into(); + + let hash_slots = inner.with_cluster_state(|state| Ok(state.unique_hash_slots())); + let hash_slots = match hash_slots { + Ok(slots) => slots, + Err(e) => { + let _ = tx.send(Err(e)); + return rx_stream(rx); + }, + }; + + let mut args = Vec::with_capacity(7); + create_scan_args(&mut args, pattern, count, r#type); for slot in hash_slots.into_iter() { - _trace!(inner, "Scan cluster hash slot server: {}", slot); - let response = ResponseKind::KeyScan(KeyScanInner { + _trace!(inner, "Scan cluster buffered hash slot server: {}", slot); + let response = ResponseKind::KeyScanBuffered(KeyScanBufferedInner { hash_slot: Some(slot), args: args.clone(), cursor_idx: 0, @@ -110,20 +154,47 @@ pub fn scan( }; let mut args = Vec::with_capacity(7); - args.push(static_val!(STARTING_CURSOR)); - args.push(static_val!(MATCH)); - args.push(pattern.into()); + create_scan_args(&mut args, pattern, count, r#type); + let response = ResponseKind::KeyScan(KeyScanInner { + hash_slot, + args, + server, + cursor_idx: 0, + tx: tx.clone(), + }); + let command: RedisCommand = (RedisCommandKind::Scan, Vec::new(), response).into(); - if let Some(count) = count { - args.push(static_val!(COUNT)); - args.push(count.into()); - } - if let Some(r#type) = r#type { - args.push(static_val!(TYPE)); - args.push(r#type.to_str().into()); + if let Err(e) = interfaces::default_send_command(inner, command) { + let _ = tx.send(Err(e)); } - let response = ResponseKind::KeyScan(KeyScanInner { + rx_stream(rx) +} + +pub fn scan_buffered( + inner: &RefCount, + pattern: Str, + count: Option, + r#type: Option, + server: Option, +) -> impl Stream> { + let (tx, rx) = unbounded_channel(); + #[cfg(feature = "glommio")] + let tx: UnboundedSender<_> = tx.into(); + + let hash_slot = if inner.config.server.is_clustered() { + if utils::clustered_scan_pattern_has_hash_tag(inner, &pattern) { + Some(redis_protocol::redis_keyslot(pattern.as_bytes())) + } else { + None + } + } else { + None + }; + + let mut args = Vec::with_capacity(7); + create_scan_args(&mut args, pattern, count, r#type); + let response = ResponseKind::KeyScanBuffered(KeyScanBufferedInner { hash_slot, args, server, diff --git a/src/commands/impls/server.rs b/src/commands/impls/server.rs index d9ffb0d4..849be25f 100644 --- a/src/commands/impls/server.rs +++ b/src/commands/impls/server.rs @@ -150,7 +150,7 @@ pub async fn ping(client: &C) -> Result { } pub async fn select(client: &C, db: u8) -> Result { - let frame = utils::request_response(client, || Ok((RedisCommandKind::Select, vec![db.into()]))).await?; + let frame = utils::request_response(client, || Ok((RedisCommandKind::Select, vec![(db as i64).into()]))).await?; protocol_utils::frame_to_results(frame) } diff --git a/src/commands/interfaces/acl.rs b/src/commands/interfaces/acl.rs index 86253098..ff58b845 100644 --- a/src/commands/interfaces/acl.rs +++ b/src/commands/interfaces/acl.rs @@ -2,7 +2,7 @@ use crate::{ commands, error::RedisError, interfaces::{ClientLike, RedisResult}, - types::{FromRedis, MultipleStrings, MultipleValues}, + types::{FromRedis, MultipleStrings, MultipleValues, RedisValue}, }; use bytes_utils::Str; use fred_macros::rm_send_if; @@ -66,13 +66,14 @@ pub trait AclInterface: ClientLike + Sized { /// The command returns all the rules defined for an existing ACL user. /// /// - fn acl_getuser(&self, username: S) -> impl Future> + Send + fn acl_getuser(&self, username: U) -> impl Future> + Send where R: FromRedis, - S: Into + Send, + U: TryInto + Send, + U::Error: Into + Send, { async move { - into!(username); + try_into!(username); commands::acl::acl_getuser(self, username).await?.convert() } } diff --git a/src/commands/interfaces/client.rs b/src/commands/interfaces/client.rs index 984307b2..ef0dc19c 100644 --- a/src/commands/interfaces/client.rs +++ b/src/commands/interfaces/client.rs @@ -162,7 +162,7 @@ pub trait ClientInterface: ClientLike + Sized { /// Returns message. /// - /// https://redis.io/docs/latest/commands/echo/ + /// fn echo(&self, message: M) -> impl Future> + Send where R: FromRedis, diff --git a/src/glommio/README.md b/src/glommio/README.md deleted file mode 100644 index cc3c7959..00000000 --- a/src/glommio/README.md +++ /dev/null @@ -1,45 +0,0 @@ -# Glommio - -See the [Glommio Introduction](https://www.datadoghq.com/blog/engineering/introducing-glommio/) for more info. - -Tokio and Glommio have an important difference in their scheduling interfaces: - -* [tokio::spawn](https://docs.rs/tokio/latest/tokio/task/fn.spawn.html) requires a `Send` bound on the spawned - future so that the Tokio scheduler can implement work-stealing across threads. -* Glommio's scheduling interface is intended to be used in cases where runtime threads do not need to share or - synchronize any state. Both the [spawn_local](https://docs.rs/glommio/latest/glommio/fn.spawn_local.html) - and [spawn_local_into](https://docs.rs/glommio/latest/glommio/fn.spawn_local_into.html) functions - spawn tasks on the same thread and therefore do not have a `Send` bound. - -`fred` was originally written with message-passing design patterns targeting a Tokio runtime and therefore the `Send` -bound from `tokio::spawn` leaked into all the public interfaces that send messages across tasks. This includes nearly -all the public command traits, including the base `ClientLike` trait. - -When building with `--features glommio` the public interface will change in several ways: - -* The `Send + Sync` bounds will be removed from all generic input parameters, where clause predicates, and `impl Trait` - return types. -* Internal `Arc` usages will change to `Rc`. -* Internal `RwLock` and `Mutex` usages will change to `RefCell`. -* Internal usages of `std::sync::atomic` types will change to thin wrappers around a `RefCell`. -* Any Tokio message passing interfaces (`BroadcastSender`, etc) will change to the closest Glommio equivalent. -* A Tokio compatability layer will be used to map between the two runtime's versions of `AsyncRead` and - `AsyncWrite`. This enables the existing codec interface (`Encoder` + `Decoder`) to work with Glommio's network types. - As a result, for now some Tokio dependencies are still required when using Glommio features. - -[Glommio Example](https://github.com/aembke/fred.rs/blob/main/examples/glommio.rs) - -The public docs -on [docs.rs](https://docs.rs/fred/latest) will continue to show the Tokio interfaces that require `Send` bounds, but -callers can find the latest rustdocs for both runtimes on the -`gh-pages` branch: - -[Glommio Documentation](https://aembke.github.io/fred.rs/glommio/fred/index.html) - -[Tokio Documentation](https://aembke.github.io/fred.rs/tokio/fred/index.html) - -Callers can rebuild Glommio docs via the [doc-glommio.sh](../../tests/doc-glommio.sh) script: - -``` -path/to/fred/tests/doc-glommio.sh --open -``` \ No newline at end of file diff --git a/src/interfaces.rs b/src/interfaces.rs index 136674a7..16865734 100644 --- a/src/interfaces.rs +++ b/src/interfaces.rs @@ -13,6 +13,9 @@ use futures::Future; pub use redis_protocol::resp3::types::BytesFrame as Resp3Frame; use std::time::Duration; +pub(crate) use crate::runtime::spawn_event_listener; +pub use crate::runtime::ClientLike; + /// Type alias for `Result`. pub type RedisResult = Result; @@ -92,16 +95,6 @@ pub(crate) fn send_to_router(inner: &RefCount, command: Router } } -#[cfg(not(feature = "glommio"))] -pub use crate::_tokio::ClientLike; -#[cfg(feature = "glommio")] -pub use crate::glommio::interfaces::ClientLike; - -#[cfg(not(feature = "glommio"))] -pub use crate::_tokio::spawn_event_listener; -#[cfg(feature = "glommio")] -pub use crate::glommio::interfaces::spawn_event_listener; - /// Functions that provide a connection heartbeat interface. #[rm_send_if(feature = "glommio")] pub trait HeartbeatInterface: ClientLike { diff --git a/src/lib.rs b/src/lib.rs index eb920c44..944f232d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,15 +72,7 @@ pub mod monitor; /// The structs and enums used by the Redis client. pub mod types; -#[cfg(feature = "glommio")] -mod glommio; -#[cfg(feature = "glommio")] -pub(crate) use glommio::compat as runtime; - -#[cfg(not(feature = "glommio"))] -mod _tokio; -#[cfg(not(feature = "glommio"))] -pub(crate) use _tokio as runtime; +mod runtime; /// Various client utility functions. pub mod util { diff --git a/src/protocol/command.rs b/src/protocol/command.rs index c61909fd..effdac84 100644 --- a/src/protocol/command.rs +++ b/src/protocol/command.rs @@ -1756,8 +1756,8 @@ impl RedisCommand { response: ResponseKind::Respond(None), hasher: ClusterHash::default(), router_tx: RefCount::new(Mutex::new(None)), - attempts_remaining: 0, - redirections_remaining: 0, + attempts_remaining: 1, + redirections_remaining: 1, can_pipeline: true, skip_backpressure: false, transaction_id: None, @@ -1787,8 +1787,8 @@ impl RedisCommand { timeout_dur: None, response: ResponseKind::Respond(None), router_tx: RefCount::new(Mutex::new(None)), - attempts_remaining: 0, - redirections_remaining: 0, + attempts_remaining: 1, + redirections_remaining: 1, can_pipeline: true, skip_backpressure: false, transaction_id: None, @@ -1878,6 +1878,7 @@ impl RedisCommand { match self.response { ResponseKind::ValueScan(ref inner) => &inner.args, ResponseKind::KeyScan(ref inner) => &inner.args, + ResponseKind::KeyScanBuffered(ref inner) => &inner.args, _ => &self.arguments, } } @@ -1914,6 +1915,7 @@ impl RedisCommand { match self.response { ResponseKind::ValueScan(ref mut inner) => inner.args.drain(..).collect(), ResponseKind::KeyScan(ref mut inner) => inner.args.drain(..).collect(), + ResponseKind::KeyScanBuffered(ref mut inner) => inner.args.drain(..).collect(), _ => self.arguments.drain(..).collect(), } } @@ -2034,9 +2036,29 @@ impl RedisCommand { /// Respond to the caller, taking the response channel in the process. pub fn respond_to_caller(&mut self, result: Result) { - #[allow(unused_mut)] - if let Some(mut tx) = self.take_responder() { - let _ = tx.send(result); + match self.response { + ResponseKind::KeyScanBuffered(ref inner) => { + if let Err(error) = result { + let _ = inner.tx.send(Err(error)); + } + }, + ResponseKind::KeyScan(ref inner) => { + if let Err(error) = result { + let _ = inner.tx.send(Err(error)); + } + }, + ResponseKind::ValueScan(ref inner) => { + if let Err(error) = result { + let _ = inner.tx.send(Err(error)); + } + }, + _ => + { + #[allow(unused_mut)] + if let Some(mut tx) = self.take_responder() { + let _ = tx.send(result); + } + }, } } @@ -2064,6 +2086,7 @@ impl RedisCommand { pub fn scan_hash_slot(&self) -> Option { match self.response { ResponseKind::KeyScan(ref inner) => inner.hash_slot, + ResponseKind::KeyScanBuffered(ref inner) => inner.hash_slot, _ => None, } } diff --git a/src/protocol/connection.rs b/src/protocol/connection.rs index fbfd16c5..3ab18296 100644 --- a/src/protocol/connection.rs +++ b/src/protocol/connection.rs @@ -38,7 +38,7 @@ use socket2::SockRef; #[cfg(feature = "glommio")] use glommio::net::TcpStream as BaseTcpStream; #[cfg(feature = "glommio")] -pub type TcpStream = crate::glommio::io_compat::TokioIO; +pub type TcpStream = crate::runtime::glommio::io_compat::TokioIO; #[cfg(not(feature = "glommio"))] use tokio::net::TcpStream; @@ -171,7 +171,7 @@ async fn tcp_connect_any( } #[cfg(feature = "glommio")] - let socket = crate::glommio::io_compat::TokioIO(socket); + let socket = crate::runtime::glommio::io_compat::TokioIO(socket); return Ok((socket, *addr)); } @@ -778,7 +778,7 @@ impl RedisTransport { }; _trace!(inner, "Selecting database {} after connecting.", db); - let command = RedisCommand::new(RedisCommandKind::Select, vec![db.into()]); + let command = RedisCommand::new(RedisCommandKind::Select, vec![(db as i64).into()]); let response = self.request_response(command, inner.is_resp3()).await?; if let Some(error) = protocol_utils::frame_to_error(&response) { diff --git a/src/protocol/responders.rs b/src/protocol/responders.rs index 7e25c0ce..489fa527 100644 --- a/src/protocol/responders.rs +++ b/src/protocol/responders.rs @@ -1,10 +1,11 @@ use crate::{ error::{RedisError, RedisErrorKind}, + interfaces, interfaces::Resp3Frame, modules::inner::RedisClientInner, protocol::{ command::{RedisCommand, RedisCommandKind, ResponseSender, RouterResponse}, - types::{KeyScanInner, Server, ValueScanInner, ValueScanResult}, + types::{KeyScanBufferedInner, KeyScanInner, Server, ValueScanInner, ValueScanResult}, utils as protocol_utils, }, runtime::{AtomicUsize, Mutex, RefCount}, @@ -56,6 +57,8 @@ pub enum ResponseKind { ValueScan(ValueScanInner), /// Handle the response as a page of keys from a SCAN command. KeyScan(KeyScanInner), + /// Handle the response as a buffered key SCAN command. + KeyScanBuffered(KeyScanBufferedInner), } impl fmt::Debug for ResponseKind { @@ -66,6 +69,7 @@ impl fmt::Debug for ResponseKind { ResponseKind::Respond(_) => "Respond", ResponseKind::KeyScan(_) => "KeyScan", ResponseKind::ValueScan(_) => "ValueScan", + ResponseKind::KeyScanBuffered(_) => "KeyScanBuffered", }) } } @@ -94,7 +98,7 @@ impl ResponseKind { expected: *expected, error_early: *error_early, }, - ResponseKind::KeyScan(_) | ResponseKind::ValueScan(_) => return None, + ResponseKind::KeyScan(_) | ResponseKind::ValueScan(_) | ResponseKind::KeyScanBuffered(_) => return None, }) } @@ -165,7 +169,7 @@ impl ResponseKind { match self { ResponseKind::Skip | ResponseKind::Respond(_) => 1, ResponseKind::Buffer { ref expected, .. } => *expected, - ResponseKind::ValueScan(_) | ResponseKind::KeyScan(_) => 1, + ResponseKind::ValueScan(_) | ResponseKind::KeyScan(_) | ResponseKind::KeyScanBuffered(_) => 1, } } @@ -377,7 +381,7 @@ fn send_value_scan_result( let state = ValueScanResult::ZScan(ZScanResult { can_continue, inner: inner.clone(), - scan_state: scanner, + scan_state: Some(scanner), results: Some(results), }); @@ -391,7 +395,7 @@ fn send_value_scan_result( let state = ValueScanResult::SScan(SScanResult { can_continue, inner: inner.clone(), - scan_state: scanner, + scan_state: Some(scanner), results: Some(result), }); @@ -406,7 +410,7 @@ fn send_value_scan_result( let state = ValueScanResult::HScan(HScanResult { can_continue, inner: inner.clone(), - scan_state: scanner, + scan_state: Some(scanner), results: Some(results), }); @@ -568,7 +572,7 @@ pub fn respond_key_scan( command.respond_to_router(inner, RouterResponse::Continue); let scan_result = ScanResult { - scan_state: scanner, + scan_state: Some(scanner), inner: inner.clone(), results: Some(keys), can_continue, @@ -580,6 +584,50 @@ pub fn respond_key_scan( Ok(()) } +pub fn respond_key_scan_buffered( + inner: &RefCount, + server: &Server, + command: RedisCommand, + mut scanner: KeyScanBufferedInner, + frame: Resp3Frame, +) -> Result<(), RedisError> { + _trace!( + inner, + "Handling `KeyScanBuffered` response from {} for {}", + server, + command.kind.to_str_debug() + ); + + let (next_cursor, keys) = match parse_key_scan_frame(frame) { + Ok(result) => result, + Err(e) => { + scanner.send_error(e); + command.respond_to_router(inner, RouterResponse::Continue); + return Ok(()); + }, + }; + let scan_stream = scanner.tx.clone(); + let can_continue = next_cursor != LAST_CURSOR; + scanner.update_cursor(next_cursor); + command.respond_to_router(inner, RouterResponse::Continue); + + for key in keys.into_iter() { + if let Err(_) = scan_stream.send(Ok(key)) { + _debug!(inner, "Error sending SCAN key."); + break; + } + } + + if can_continue { + let mut command = RedisCommand::new(RedisCommandKind::Scan, Vec::new()); + command.response = ResponseKind::KeyScanBuffered(scanner); + if let Err(e) = interfaces::default_send_command(inner, command) { + let _ = scan_stream.send(Err(e)); + }; + } + Ok(()) +} + /// Respond to the caller of a value scanning operation. pub fn respond_value_scan( inner: &RefCount, diff --git a/src/protocol/types.rs b/src/protocol/types.rs index 194fd013..005b849f 100644 --- a/src/protocol/types.rs +++ b/src/protocol/types.rs @@ -345,6 +345,19 @@ pub struct KeyScanInner { pub tx: UnboundedSender>, } +pub struct KeyScanBufferedInner { + /// The hash slot for the command. + pub hash_slot: Option, + /// An optional server override. + pub server: Option, + /// The index of the cursor in `args`. + pub cursor_idx: usize, + /// The arguments sent in each scan command. + pub args: Vec, + /// The sender half of the results channel. + pub tx: UnboundedSender>, +} + impl KeyScanInner { /// Update the cursor in place in the arguments. pub fn update_cursor(&mut self, cursor: Str) { @@ -357,6 +370,18 @@ impl KeyScanInner { } } +impl KeyScanBufferedInner { + /// Update the cursor in place in the arguments. + pub fn update_cursor(&mut self, cursor: Str) { + self.args[self.cursor_idx] = cursor.into(); + } + + /// Send an error on the response stream. + pub fn send_error(&self, error: RedisError) { + let _ = self.tx.send(Err(error)); + } +} + pub enum ValueScanResult { SScan(SScanResult), HScan(HScanResult), diff --git a/src/router/centralized.rs b/src/router/centralized.rs index 75c9d7f9..85dbd4ba 100644 --- a/src/router/centralized.rs +++ b/src/router/centralized.rs @@ -189,6 +189,9 @@ pub async fn process_response_frame( ), ResponseKind::KeyScan(scanner) => responders::respond_key_scan(inner, server, command, scanner, frame), ResponseKind::ValueScan(scanner) => responders::respond_value_scan(inner, server, command, scanner, frame), + ResponseKind::KeyScanBuffered(scanner) => { + responders::respond_key_scan_buffered(inner, server, command, scanner, frame) + }, } } diff --git a/src/router/clustered.rs b/src/router/clustered.rs index 9ad85385..f721e762 100644 --- a/src/router/clustered.rs +++ b/src/router/clustered.rs @@ -508,6 +508,9 @@ pub async fn process_response_frame( ), ResponseKind::KeyScan(scanner) => responders::respond_key_scan(inner, server, command, scanner, frame), ResponseKind::ValueScan(scanner) => responders::respond_value_scan(inner, server, command, scanner, frame), + ResponseKind::KeyScanBuffered(scanner) => { + responders::respond_key_scan_buffered(inner, server, command, scanner, frame) + }, } } diff --git a/src/runtime/README.md b/src/runtime/README.md new file mode 100644 index 00000000..7b03338c --- /dev/null +++ b/src/runtime/README.md @@ -0,0 +1,58 @@ +# Alternative Runtimes + +`fred` was originally written for Tokio runtimes, but callers can enable experimental support for other runtimes via the +`glommio`, `monoio` and `smol` features. Only one runtime interface feature can be enabled at a time. + +These runtimes have some important differences in their scheduling interfaces based on whether they implement a +work-stealing or thread-per-core scheduling model. + +* [tokio::spawn](https://docs.rs/tokio/latest/tokio/task/fn.spawn.html) + and [async_task::spawn](https://docs.rs/async-task/latest/async_task/fn.spawn.html) require a `Send` bound on the + spawned future so that the scheduler can implement work-stealing across threads. +* Glommio and Monoio both use a thread-per-core model that works best when threads do not need to share or synchronize + any state. Both the [spawn_local](https://docs.rs/glommio/latest/glommio/fn.spawn_local.html) + and [spawn_local_into](https://docs.rs/glommio/latest/glommio/fn.spawn_local_into.html) functions + spawn tasks on the same thread and therefore do not have a `Send` bound. + +`fred` was originally written with message-passing design patterns targeting a Tokio runtime and therefore the `Send` +bound from `tokio::spawn` leaked into all the public interfaces that send messages across tasks. This includes nearly +all the public command traits, including the base `ClientLike` trait. + +If any of the alternative runtime features are enabled the client's interface and internal implementation will change in +several ways based on the runtime scheduling model. For thread-per-core runtimes (`glommio` and `monoio`) +this includes: + +* The `Send + Sync` bounds will be removed from all generic input parameters, where clause predicates, and `impl Trait` + return types. +* Internal `Arc` usages will change to `Rc`. +* Internal `RwLock` and `Mutex` usages will change to `RefCell`. +* Internal usages of `std::sync::atomic` types will change to thin wrappers around a `RefCell`. +* Any Tokio message passing interfaces (`BroadcastReceiver`, etc) will change to the closest equivalent provided by the + runtime. + +The public docs +on [docs.rs](https://docs.rs/fred/latest) will continue to show the Tokio interfaces that require `Send` bounds, but +callers can find the latest rustdocs for both runtimes on the +`gh-pages` branch: + +[Glommio Documentation](https://aembke.github.io/fred.rs/glommio/fred/index.html) + +[Tokio Documentation](https://aembke.github.io/fred.rs/tokio/fred/index.html) + +## Compatibility + +# Glommio + +See the [Glommio Introduction](https://www.datadoghq.com/blog/engineering/introducing-glommio/) for more info. + +When building with `--features glommio` a Tokio compatability layer will be used to map between the two runtime's +versions of `AsyncRead` and `AsyncWrite`. This enables the existing codec interface (`Encoder` + `Decoder`) to work with +Glommio's network types. As a result, for now some Tokio dependencies are still required when using Glommio features. + +This approach also allows the `tokio-native-tls` and `tokio-rustls` modules to work with Glommio's network types. + +[Glommio Example](https://github.com/aembke/fred.rs/blob/main/examples/glommio.rs) + +# Monoio + +Work In Progress diff --git a/src/_tokio.rs b/src/runtime/_tokio.rs similarity index 100% rename from src/_tokio.rs rename to src/runtime/_tokio.rs diff --git a/src/glommio/broadcast.rs b/src/runtime/glommio/broadcast.rs similarity index 100% rename from src/glommio/broadcast.rs rename to src/runtime/glommio/broadcast.rs diff --git a/src/glommio/interfaces.rs b/src/runtime/glommio/interfaces.rs similarity index 99% rename from src/glommio/interfaces.rs rename to src/runtime/glommio/interfaces.rs index 39b16fdc..a213e4d1 100644 --- a/src/glommio/interfaces.rs +++ b/src/runtime/glommio/interfaces.rs @@ -4,13 +4,12 @@ use crate::{ clients::WithOptions, commands, error::RedisError, - glommio::compat::spawn_into, interfaces::{RedisResult, Resp3Frame}, modules::inner::RedisClientInner, prelude::default_send_command, protocol::command::RedisCommand, router::commands as router_commands, - runtime::{spawn, BroadcastReceiver, JoinHandle, RefCount}, + runtime::{glommio::compat::spawn_into, spawn, BroadcastReceiver, JoinHandle, RefCount}, types::{ ClientState, ConnectHandle, diff --git a/src/glommio/io_compat.rs b/src/runtime/glommio/io_compat.rs similarity index 100% rename from src/glommio/io_compat.rs rename to src/runtime/glommio/io_compat.rs diff --git a/src/glommio/mod.rs b/src/runtime/glommio/mod.rs similarity index 81% rename from src/glommio/mod.rs rename to src/runtime/glommio/mod.rs index 3d21b683..e33efe7e 100644 --- a/src/glommio/mod.rs +++ b/src/runtime/glommio/mod.rs @@ -5,13 +5,11 @@ pub(crate) mod broadcast; pub(crate) mod interfaces; pub(crate) mod io_compat; pub(crate) mod mpsc; -pub(crate) mod sync; pub(crate) mod compat { pub use super::{ broadcast::{BroadcastReceiver, BroadcastSender}, mpsc::{rx_stream, UnboundedReceiver, UnboundedSender}, - sync::*, }; use crate::error::RedisError; use futures::Future; @@ -23,7 +21,7 @@ pub(crate) mod compat { }; pub use oneshot::{channel as oneshot_channel, Receiver as OneshotReceiver, Sender as OneshotSender}; use std::{ - cell::RefCell, + cell::Cell, pin::Pin, rc::Rc, task::{Context, Poll}, @@ -48,11 +46,11 @@ pub(crate) mod compat { /// [JoinHandle](tokio::task::JoinHandle) pub struct JoinHandle { pub(crate) inner: GlommioJoinHandle, - pub(crate) finished: Rc>, + pub(crate) finished: Rc>, } pub fn spawn(ft: impl Future + 'static) -> JoinHandle { - let finished = Rc::new(RefCell::new(false)); + let finished = Rc::new(Cell::new(false)); let _finished = finished.clone(); let inner = glommio::spawn_local(async move { let result = ft.await; @@ -65,7 +63,7 @@ pub(crate) mod compat { } pub fn spawn_into(ft: impl Future + 'static, tq: TaskQueueHandle) -> JoinHandle { - let finished = Rc::new(RefCell::new(false)); + let finished = Rc::new(Cell::new(false)); let _finished = finished.clone(); let inner = glommio::spawn_local_into( async move { @@ -81,6 +79,7 @@ pub(crate) mod compat { JoinHandle { inner, finished } } + // map from futures_lite::Future to std::future::Future impl Future for JoinHandle { type Output = Result; @@ -107,7 +106,7 @@ pub(crate) mod compat { } pub fn is_finished(&self) -> bool { - *self.finished.as_ref().borrow() + self.finished.get() } pub fn abort(&self) { @@ -115,4 +114,24 @@ pub(crate) mod compat { self.set_finished(); } } + + pub struct AsyncRwLock { + inner: glommio::sync::RwLock, + } + + impl AsyncRwLock { + pub fn new(val: T) -> Self { + AsyncRwLock { + inner: glommio::sync::RwLock::new(val), + } + } + + pub async fn write(&self) -> glommio::sync::RwLockWriteGuard { + self.inner.write().await.unwrap() + } + + pub async fn read(&self) -> glommio::sync::RwLockReadGuard { + self.inner.read().await.unwrap() + } + } } diff --git a/src/glommio/mpsc.rs b/src/runtime/glommio/mpsc.rs similarity index 100% rename from src/glommio/mpsc.rs rename to src/runtime/glommio/mpsc.rs diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs new file mode 100644 index 00000000..86da981d --- /dev/null +++ b/src/runtime/mod.rs @@ -0,0 +1,25 @@ +#[cfg(not(any(feature = "glommio", feature = "smol", feature = "monoio")))] +mod _tokio; +#[cfg(not(any(feature = "glommio", feature = "smol", feature = "monoio")))] +pub use _tokio::*; + +#[cfg(feature = "glommio")] +pub(crate) mod glommio; +#[cfg(feature = "glommio")] +pub use glommio::compat::*; + +#[cfg(any(feature = "glommio", feature = "smol", feature = "monoio"))] +mod sync; +#[cfg(any(feature = "glommio", feature = "smol", feature = "monoio"))] +pub use sync::*; + +#[cfg(not(feature = "glommio"))] +pub use _tokio::ClientLike; +#[cfg(feature = "glommio")] +pub use glommio::interfaces::ClientLike; + +#[cfg(not(feature = "glommio"))] +pub(crate) use _tokio::spawn_event_listener; +#[cfg(feature = "glommio")] +#[doc(hidden)] +pub(crate) use glommio::interfaces::spawn_event_listener; diff --git a/src/glommio/notes.md b/src/runtime/monoio/codec.rs similarity index 100% rename from src/glommio/notes.md rename to src/runtime/monoio/codec.rs diff --git a/src/runtime/monoio/mod.rs b/src/runtime/monoio/mod.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/runtime/smol/mod.rs b/src/runtime/smol/mod.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/glommio/sync.rs b/src/runtime/sync.rs similarity index 64% rename from src/glommio/sync.rs rename to src/runtime/sync.rs index 4d4cacf4..2178a127 100644 --- a/src/glommio/sync.rs +++ b/src/runtime/sync.rs @@ -1,10 +1,11 @@ use std::{ - cell::{Ref, RefCell, RefMut}, + cell::{Cell, Ref, RefCell, RefMut}, fmt, mem, sync::atomic::Ordering, }; +/// A !Send flavor of `ArcSwap` with an interface similar to std::sync::atomic types. pub struct RefSwap { inner: RefCell, } @@ -29,87 +30,55 @@ impl RefSwap { } } -pub struct AsyncRwLock { - inner: glommio::sync::RwLock, -} - -impl AsyncRwLock { - pub fn new(val: T) -> Self { - AsyncRwLock { - inner: glommio::sync::RwLock::new(val), - } - } - - pub async fn write(&self) -> glommio::sync::RwLockWriteGuard { - self.inner.write().await.unwrap() - } - - pub async fn read(&self) -> glommio::sync::RwLockReadGuard { - self.inner.read().await.unwrap() - } -} - +/// A !Send flavor of `AtomicUsize`, with the same interface. #[derive(Debug)] pub struct AtomicUsize { - inner: RefCell, + inner: Cell, } impl AtomicUsize { pub fn new(val: usize) -> Self { - AtomicUsize { - inner: RefCell::new(val), - } + AtomicUsize { inner: Cell::new(val) } } pub fn fetch_add(&self, val: usize, _: Ordering) -> usize { - let mut guard = self.inner.borrow_mut(); - - let new = guard.saturating_add(val); - *guard = new; - new + let tmp = self.inner.get().saturating_add(val); + self.inner.replace(tmp); + tmp } pub fn fetch_sub(&self, val: usize, _: Ordering) -> usize { - let mut guard = self.inner.borrow_mut(); - - let new = guard.saturating_sub(val); - *guard = new; - new + let tmp = self.inner.get().saturating_sub(val); + self.inner.replace(tmp); + tmp } pub fn load(&self, _: Ordering) -> usize { - *self.inner.borrow() + self.inner.get() } pub fn swap(&self, val: usize, _: Ordering) -> usize { - let mut guard = self.inner.borrow_mut(); - let old = *guard; - *guard = val; - old + self.inner.replace(val) } } +/// A !Send flavor of `AtomicBool`, with the same interface. #[derive(Debug)] pub struct AtomicBool { - inner: RefCell, + inner: Cell, } impl AtomicBool { pub fn new(val: bool) -> Self { - AtomicBool { - inner: RefCell::new(val), - } + AtomicBool { inner: Cell::new(val) } } pub fn load(&self, _: Ordering) -> bool { - *self.inner.borrow() + self.inner.get() } pub fn swap(&self, val: bool, _: Ordering) -> bool { - let mut guard = self.inner.borrow_mut(); - let old = *guard; - *guard = val; - old + self.inner.replace(val) } } diff --git a/src/types/args.rs b/src/types/args.rs index 90c6225f..1829227b 100644 --- a/src/types/args.rs +++ b/src/types/args.rs @@ -1391,6 +1391,8 @@ impl Hash for RedisValue { } } +#[cfg(not(feature = "specialize-into-bytes"))] +#[cfg_attr(docsrs, doc(cfg(not(feature = "specialize-into-bytes"))))] impl From for RedisValue { fn from(d: u8) -> Self { RedisValue::Integer(d as i64) @@ -1590,6 +1592,16 @@ where } } +#[cfg(feature = "specialize-into-bytes")] +#[cfg_attr(docsrs, doc(cfg(feature = "specialize-into-bytes")))] +impl TryFrom> for RedisValue { + type Error = RedisError; + + fn try_from(value: Vec) -> Result { + Ok(RedisValue::Bytes(value.into())) + } +} + impl TryFrom> for RedisValue where T: TryInto, @@ -1703,7 +1715,7 @@ mod tests { // requires specialization of TryFrom> for RedisValue #[test] - #[ignore] + #[cfg(feature = "specialize-into-bytes")] fn redis_bytes_from_vec_u8() { let input: Vec = vec![0, 1, 2]; let output: RedisValue = input.clone().try_into().unwrap(); diff --git a/src/types/scan.rs b/src/types/scan.rs index f5d4f62f..08e84828 100644 --- a/src/types/scan.rs +++ b/src/types/scan.rs @@ -70,10 +70,12 @@ pub trait Scanner { /// Move on to the next page of results from the SCAN operation. If no more results are available this may close the /// stream. /// - /// **This must be called to continue scanning the keyspace.** Results are not automatically scanned in the - /// background since this could cause the buffer backing the stream to grow too large very quickly. This - /// interface provides a mechanism for throttling the throughput of the SCAN call. If this struct is dropped - /// without calling this function the stream will close without an error. + /// If callers do not call this function the scanning will continue when this struct is dropped. Results are not + /// automatically scanned in the background since this could cause the buffer backing the stream to grow too large + /// very quickly. This interface provides a mechanism for throttling the throughput of the SCAN call. Callers can + /// use [scan_buffered](crate::clients::RedisClient::scan_buffered) or + /// [scan_cluster_buffered](crate::clients::RedisClient::scan_cluster_buffered) to automatically continue scanning + /// in the background. /// /// If this function returns an error the scan call cannot continue as the client has been closed, or some other /// fatal error has occurred. If this happens the error will appear in the stream from the original SCAN call. @@ -84,15 +86,38 @@ pub trait Scanner { pub struct ScanResult { pub(crate) results: Option>, pub(crate) inner: RefCount, - pub(crate) scan_state: KeyScanInner, + pub(crate) scan_state: Option, pub(crate) can_continue: bool, } +fn next_key_page(inner: &RefCount, state: &mut Option) { + if let Some(state) = state.take() { + let cluster_node = state.server.clone(); + let response = ResponseKind::KeyScan(state); + let mut cmd: RedisCommand = (RedisCommandKind::Scan, Vec::new(), response).into(); + cmd.cluster_node = cluster_node; + + let _ = interfaces::default_send_command(inner, cmd); + } +} + +impl Drop for ScanResult { + fn drop(&mut self) { + if self.can_continue { + next_key_page(&self.inner, &mut self.scan_state); + } + } +} + impl Scanner for ScanResult { type Page = Vec; fn cursor(&self) -> Option> { - self.scan_state.args[self.scan_state.cursor_idx].as_str() + if let Some(ref state) = self.scan_state { + state.args[state.cursor_idx].as_str() + } else { + None + } } fn has_more(&self) -> bool { @@ -113,17 +138,15 @@ impl Scanner for ScanResult { } } + /// TODO remove Result wrapper in next major version fn next(self) -> Result<(), RedisError> { if !self.can_continue { return Ok(()); } - let cluster_node = self.scan_state.server.clone(); - let response = ResponseKind::KeyScan(self.scan_state); - let mut cmd: RedisCommand = (RedisCommandKind::Scan, Vec::new(), response).into(); - cmd.cluster_node = cluster_node; - - interfaces::default_send_command(&self.inner, cmd) + let mut _self = self; + next_key_page(&_self.inner, &mut _self.scan_state); + Ok(()) } } @@ -131,15 +154,35 @@ impl Scanner for ScanResult { pub struct HScanResult { pub(crate) results: Option, pub(crate) inner: RefCount, - pub(crate) scan_state: ValueScanInner, + pub(crate) scan_state: Option, pub(crate) can_continue: bool, } +fn next_hscan_page(inner: &RefCount, state: &mut Option) { + if let Some(state) = state.take() { + let response = ResponseKind::ValueScan(state); + let cmd: RedisCommand = (RedisCommandKind::Hscan, Vec::new(), response).into(); + let _ = interfaces::default_send_command(inner, cmd); + } +} + +impl Drop for HScanResult { + fn drop(&mut self) { + if self.can_continue { + next_hscan_page(&self.inner, &mut self.scan_state); + } + } +} + impl Scanner for HScanResult { type Page = RedisMap; fn cursor(&self) -> Option> { - self.scan_state.args[self.scan_state.cursor_idx].as_str() + if let Some(ref state) = self.scan_state { + state.args[state.cursor_idx].as_str() + } else { + None + } } fn has_more(&self) -> bool { @@ -165,9 +208,9 @@ impl Scanner for HScanResult { return Ok(()); } - let response = ResponseKind::ValueScan(self.scan_state); - let cmd: RedisCommand = (RedisCommandKind::Hscan, Vec::new(), response).into(); - interfaces::default_send_command(&self.inner, cmd) + let mut _self = self; + next_hscan_page(&_self.inner, &mut _self.scan_state); + Ok(()) } } @@ -175,15 +218,35 @@ impl Scanner for HScanResult { pub struct SScanResult { pub(crate) results: Option>, pub(crate) inner: RefCount, - pub(crate) scan_state: ValueScanInner, + pub(crate) scan_state: Option, pub(crate) can_continue: bool, } +fn next_sscan_page(inner: &RefCount, state: &mut Option) { + if let Some(state) = state.take() { + let response = ResponseKind::ValueScan(state); + let cmd: RedisCommand = (RedisCommandKind::Sscan, Vec::new(), response).into(); + let _ = interfaces::default_send_command(inner, cmd); + } +} + +impl Drop for SScanResult { + fn drop(&mut self) { + if self.can_continue { + next_sscan_page(&self.inner, &mut self.scan_state); + } + } +} + impl Scanner for SScanResult { type Page = Vec; fn cursor(&self) -> Option> { - self.scan_state.args[self.scan_state.cursor_idx].as_str() + if let Some(ref state) = self.scan_state { + state.args[state.cursor_idx].as_str() + } else { + None + } } fn results(&self) -> &Option { @@ -209,9 +272,9 @@ impl Scanner for SScanResult { return Ok(()); } - let response = ResponseKind::ValueScan(self.scan_state); - let cmd: RedisCommand = (RedisCommandKind::Sscan, Vec::new(), response).into(); - interfaces::default_send_command(&self.inner, cmd) + let mut _self = self; + next_sscan_page(&_self.inner, &mut _self.scan_state); + Ok(()) } } @@ -219,15 +282,35 @@ impl Scanner for SScanResult { pub struct ZScanResult { pub(crate) results: Option>, pub(crate) inner: RefCount, - pub(crate) scan_state: ValueScanInner, + pub(crate) scan_state: Option, pub(crate) can_continue: bool, } +fn next_zscan_page(inner: &RefCount, state: &mut Option) { + if let Some(state) = state.take() { + let response = ResponseKind::ValueScan(state); + let cmd: RedisCommand = (RedisCommandKind::Zscan, Vec::new(), response).into(); + let _ = interfaces::default_send_command(inner, cmd); + } +} + +impl Drop for ZScanResult { + fn drop(&mut self) { + if self.can_continue { + next_zscan_page(&self.inner, &mut self.scan_state); + } + } +} + impl Scanner for ZScanResult { type Page = Vec<(RedisValue, f64)>; fn cursor(&self) -> Option> { - self.scan_state.args[self.scan_state.cursor_idx].as_str() + if let Some(ref state) = self.scan_state { + state.args[state.cursor_idx].as_str() + } else { + None + } } fn has_more(&self) -> bool { @@ -253,8 +336,8 @@ impl Scanner for ZScanResult { return Ok(()); } - let response = ResponseKind::ValueScan(self.scan_state); - let cmd: RedisCommand = (RedisCommandKind::Zscan, Vec::new(), response).into(); - interfaces::default_send_command(&self.inner, cmd) + let mut _self = self; + next_zscan_page(&_self.inner, &mut _self.scan_state); + Ok(()) } } diff --git a/tests/docker/runners/bash/all-features.sh b/tests/docker/runners/bash/all-features.sh index 5ab31598..b1d3f757 100755 --- a/tests/docker/runners/bash/all-features.sh +++ b/tests/docker/runners/bash/all-features.sh @@ -15,7 +15,7 @@ done # those features individually. FEATURES="network-logs custom-reconnect-errors serde-json blocking-encoding full-tracing monitor metrics sentinel-client subscriber-client dns debug-ids - replicas sha-1 transactions i-all credential-provider" + replicas sha-1 transactions i-all credential-provider specialize-into-bytes" if [ -z "$FRED_CI_NEXTEST" ]; then cargo test --release --lib --tests --features "$FEATURES" -- --test-threads=1 "$@" diff --git a/tests/integration/centralized.rs b/tests/integration/centralized.rs index 4640dffe..94b3ab3e 100644 --- a/tests/integration/centralized.rs +++ b/tests/integration/centralized.rs @@ -159,13 +159,17 @@ mod hyperloglog { mod scanning { #[cfg(feature = "i-keys")] - cluster_test!(scanning, should_scan_keyspace); + centralized_test!(scanning, should_scan_keyspace); #[cfg(feature = "i-hashes")] - cluster_test!(scanning, should_hscan_hash); + centralized_test!(scanning, should_hscan_hash); #[cfg(feature = "i-sets")] - cluster_test!(scanning, should_sscan_set); + centralized_test!(scanning, should_sscan_set); #[cfg(feature = "i-sorted-sets")] - cluster_test!(scanning, should_zscan_sorted_set); + centralized_test!(scanning, should_zscan_sorted_set); + #[cfg(feature = "i-keys")] + centralized_test!(scanning, should_scan_buffered); + #[cfg(feature = "i-keys")] + centralized_test!(scanning, should_continue_scanning_on_page_drop); } #[cfg(feature = "i-slowlog")] diff --git a/tests/integration/clustered.rs b/tests/integration/clustered.rs index af0b1a21..83a20ea9 100644 --- a/tests/integration/clustered.rs +++ b/tests/integration/clustered.rs @@ -173,6 +173,12 @@ mod scanning { cluster_test!(scanning, should_zscan_sorted_set); #[cfg(feature = "i-keys")] cluster_test!(scanning, should_scan_cluster); + #[cfg(feature = "i-keys")] + cluster_test!(scanning, should_scan_buffered); + #[cfg(feature = "i-keys")] + cluster_test!(scanning, should_scan_cluster_buffered); + #[cfg(feature = "i-keys")] + cluster_test!(scanning, should_continue_scanning_on_page_drop); } #[cfg(feature = "i-slowlog")] diff --git a/tests/integration/scanning/mod.rs b/tests/integration/scanning/mod.rs index 450e06a2..039c05ca 100644 --- a/tests/integration/scanning/mod.rs +++ b/tests/integration/scanning/mod.rs @@ -1,6 +1,10 @@ #![allow(dead_code)] -use fred::{prelude::*, types::Scanner}; -use futures::TryStreamExt; +use fred::{ + prelude::*, + types::{ScanResult, Scanner}, +}; +use futures::{Stream, TryStreamExt}; +// tokio_stream has a more flexible version of `collect` use tokio_stream::StreamExt; const SCAN_KEYS: i64 = 100; @@ -146,3 +150,73 @@ pub async fn should_scan_cluster(client: RedisClient, _: RedisConfig) -> Result< assert_eq!(count, 2000); Ok(()) } + +#[cfg(feature = "i-keys")] +pub async fn should_scan_buffered(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> { + let mut expected = Vec::with_capacity(100); + for idx in 0 .. 100 { + // write everything to the same cluster node + let key: RedisKey = format!("foo-{{1}}-{}", idx).into(); + expected.push(key.clone()); + let _: () = client.set(key, idx, None, None, false).await?; + } + expected.sort(); + + let mut keys: Vec = client + .scan_buffered("foo-{1}*", Some(20), None) + .collect::, RedisError>>() + .await?; + keys.sort(); + + assert_eq!(keys, expected); + Ok(()) +} + +#[cfg(feature = "i-keys")] +pub async fn should_scan_cluster_buffered(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> { + let mut expected = Vec::with_capacity(100); + for idx in 0 .. 100 { + let key: RedisKey = format!("foo-{}", idx).into(); + expected.push(key.clone()); + let _: () = client.set(key, idx, None, None, false).await?; + } + expected.sort(); + + let mut keys: Vec = client + .scan_cluster_buffered("foo*", Some(20), None) + .collect::, RedisError>>() + .await?; + keys.sort(); + + assert_eq!(keys, expected); + Ok(()) +} + +#[cfg(feature = "i-keys")] +fn scan_all(client: &RedisClient, page_size: Option) -> impl Stream> { + use futures::StreamExt; + + if client.is_clustered() { + client.scan_cluster("*", page_size, None).boxed() + } else { + client.scan("*", page_size, None).boxed() + } +} + +#[cfg(feature = "i-keys")] +pub async fn should_continue_scanning_on_page_drop(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> { + for idx in 0 .. 100 { + let key: RedisKey = format!("foo-{}", idx).into(); + let _: () = client.set(key, idx, None, None, false).await?; + } + + let mut count = 0; + let mut scanner = scan_all(&client, Some(10)); + while let Some(Ok(mut page)) = scanner.next().await { + let keys = page.take_results().unwrap(); + count += keys.len(); + } + assert_eq!(count, 100); + + Ok(()) +}