Skip to content

Commit

Permalink
6.1.0
Browse files Browse the repository at this point in the history
* feat: client tracking, interface example
* feat: with_cluster_node interface
* feat: rustls 0.21
* fix: add StreamInterface to Transaction
* docs: fix `TracingConfig` not included in docs (#119)
* fix: close public broadcast receivers after QUIT or SHUTDOWN (#120)
* feat: add all command traits to subscriber client (#122)
* doc: comment edit to better reflect clustered behavior

---------

Co-authored-by: Thaler Benedek <[email protected]>
Co-authored-by: Artem Khramov <[email protected]>
Co-authored-by: Sanchith Hegde <[email protected]>
Co-authored-by: Kirill Pertsev <[email protected]>
  • Loading branch information
5 people authored Apr 23, 2023
1 parent 46ff203 commit 1544f47
Show file tree
Hide file tree
Showing 47 changed files with 1,615 additions and 251 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 6.1.0

* Add a [client tracking](https://redis.io/docs/manual/client-side-caching/) interface.
* Add a global config value for broadcast channel capacity.
* Add an interface to interact with individual cluster nodes.
* Fix missing `impl StreamInterface for Transaction`
* Add all `RedisClient` command traits to `SubscriberClient`

## 6.0.0

* Refactored the connection and protocol layer.
Expand Down Expand Up @@ -33,7 +41,6 @@ Potentially breaking changes in 6.x:
* Removed or renamed some fields on `RedisConfig`.
* Changed the pubsub receiver interface to use `Message` instead of `(String, RedisValue)` tuples.
* Changed the `on_*` family of functions to return a [BroadcastReceiver](https://docs.rs/tokio/latest/tokio/sync/broadcast/struct.Receiver.html).
* This usually means changing `next()` to `recv()` in `while let` loops, etc.
* The `FromRedis` trait converts `RedisValue::Null` to `"nil"` with `String` and `Str`.

## 5.2.0
Expand Down
20 changes: 15 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "6.0.0"
version = "6.1.0"
authors = ["Alec Embke <[email protected]>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand All @@ -20,13 +20,16 @@ features = [
"dns",
"enable-rustls",
"enable-native-tls",
"full-tracing",
"partial-tracing",
"blocking-encoding",
"custom-reconnect-errors",
"monitor",
"sentinel-client",
"sentinel-auth",
"check-unresponsive",
"replicas"
"replicas",
"client-tracking"
]
rustdoc-args = ["--cfg", "docsrs"]

Expand All @@ -51,21 +54,23 @@ sha-1 = "0.10"
rand = "0.8"
async-trait = "0.1"
semver = "1.0"
rustls = { version = "0.20", optional = true }
rustls = { version = "0.21", optional = true }
native-tls = { version = "0.2", optional = true }
tokio-native-tls = { version = "0.3", optional = true }
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }
nom = { version = "7.1", optional = true }
serde_json = { version = "1", optional = true }
tokio-rustls = { version = "0.23", optional = true }
tokio-rustls = { version = "0.24", optional = true }
webpki = { package = "rustls-webpki", version = "0.100", features = ["alloc", "std"], optional = true }
rustls-native-certs = { version = "0.6", optional = true }
trust-dns-resolver = { version = "0.22", optional = true }

[dev-dependencies]
prometheus = "0.12"
base64 = "0.13"
subprocess = "0.2.7"
serde = { version = "1.0", features = ["derive"] }

[lib]
doc = true
Expand All @@ -92,6 +97,10 @@ required-features = ["serde-json"]
name = "dns"
required-features = ["dns"]

[[example]]
name = "client_tracking"
required-features = ["client-tracking"]

[features]
default = ["ignore-auth-error", "pool-prefer-active"]
fallback = []
Expand All @@ -101,7 +110,7 @@ metrics = []
mocks = []
dns = ["trust-dns-resolver", "trust-dns-resolver/tokio"]
ignore-auth-error = []
enable-rustls = ["rustls", "tokio-rustls", "rustls-native-certs"]
enable-rustls = ["rustls", "tokio-rustls", "rustls-native-certs", "webpki"]
enable-native-tls = ["native-tls", "tokio-native-tls"]
vendored-openssl = ["enable-native-tls", "native-tls/vendored"]
reconnect-on-auth-error = []
Expand All @@ -117,6 +126,7 @@ sentinel-auth = []
no-client-setname = []
check-unresponsive = []
replicas = []
client-tracking = []
# Testing Features
debug-ids = []
sentinel-tests = []
51 changes: 26 additions & 25 deletions README.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ Examples
* [Serde](./serde.rs) - Use the `serde-json` feature to convert between Redis types and JSON.
* [Custom](./custom.rs) - Send custom commands or operate on RESP frames.
* [DNS](./dns.rs) - Customize the DNS resolution logic.
* [Client Tracking](./client_tracking.rs) - Implement [client side caching](https://redis.io/docs/manual/client-side-caching/).
* [Misc](./misc.rs) - Miscellaneous features or examples.

Or see the [tests](../tests/integration) for more examples.
52 changes: 2 additions & 50 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
use fred::{
prelude::*,
types::{BackpressureConfig, BackpressurePolicy, PerformanceConfig, RespVersion},
};
use fred::{prelude::*, types::RespVersion};

#[cfg(feature = "mocks")]
use fred::mocks::Echo;
#[cfg(feature = "partial-tracing")]
use fred::tracing::Level;
#[cfg(any(feature = "enable-native-tls", feature = "enable-rustls"))]
use fred::types::TlsConfig;
#[cfg(feature = "partial-tracing")]
use fred::types::TracingConfig;
#[cfg(feature = "mocks")]
use std::{default::Default, sync::Arc};

#[tokio::main]
async fn main() -> Result<(), RedisError> {
pretty_env_logger::init();

let _ = RedisConfig::from_url("redis://username:[email protected]:6379/1")?;

// full configuration with testing values
let config = RedisConfig {
fail_fast: true,
Expand All @@ -38,38 +30,11 @@ async fn main() -> Result<(), RedisError> {
#[cfg(feature = "full-tracing")]
full_tracing_level: Level::DEBUG,
},
#[cfg(feature = "mocks")]
mocks: Arc::new(Echo),
};
// full configuration for performance tuning options
let perf = PerformanceConfig {
// whether or not to automatically pipeline commands across tasks
auto_pipeline: true,
// the max number of frames to feed into a socket before flushing it
max_feed_count: 1000,
// a default timeout to apply to all commands (0 means no timeout)
default_command_timeout_ms: 0,
// the amount of time to wait before rebuilding the client's cached cluster state after a MOVED error.
cluster_cache_update_delay_ms: 10,
// the maximum number of times to retry commands
max_command_attempts: 3,
// backpressure config options
backpressure: BackpressureConfig {
// whether to disable automatic backpressure features
disable_auto_backpressure: false,
// the max number of in-flight commands before applying backpressure or returning backpressure errors
max_in_flight_commands: 5000,
// the policy to apply when the max in-flight commands count is reached
policy: BackpressurePolicy::Drain,
},
// the amount of time a command can wait in memory without a response before the connection is considered
// unresponsive
#[cfg(feature = "check-unresponsive")]
network_timeout_ms: 60_000,
};

// configure exponential backoff when reconnecting, starting at 100 ms, and doubling each time up to 30 sec.
let policy = ReconnectPolicy::new_exponential(0, 100, 30_000, 2);
let perf = PerformanceConfig::default();
let client = RedisClient::new(config, Some(perf), Some(policy));

// spawn tasks that listen for connection close or reconnect events
Expand Down Expand Up @@ -101,19 +66,6 @@ async fn main() -> Result<(), RedisError> {
// or use turbofish. the first type is always the response type.
println!("Foo: {:?}", client.get::<String, _>("foo").await?);

// update performance config options as needed
let mut perf_config = client.perf_config();
perf_config.max_command_attempts = 100;
perf_config.max_feed_count = 1000;
client.update_perf_config(perf_config);

// send commands in a pipeline
let pipeline = client.pipeline();
let _ = pipeline.incr("bar").await?;
let _ = pipeline.incr("bar").await?;
let (first, second): (i64, i64) = pipeline.all().await?;
assert_eq!((first, second), (1, 2));

let _ = client.quit().await?;
let _ = connection_task.await;
Ok(())
Expand Down
91 changes: 91 additions & 0 deletions examples/client_tracking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use fred::{interfaces::TrackingInterface, prelude::*, types::RespVersion};

// this library exposes 2 interfaces for implementing client-side caching - a high level `TrackingInterface` trait
// that requires RESP3 and works with all deployment types, and a lower level interface that directly exposes the
// `CLIENT TRACKING` commands but often requires a centralized server config.

async fn resp3_tracking_interface_example() -> Result<(), RedisError> {
let policy = ReconnectPolicy::new_constant(0, 1000);
let mut config = RedisConfig::default();
config.version = RespVersion::RESP3;

let client = RedisClient::new(config, None, Some(policy));
let _ = client.connect();
let _ = client.wait_for_connect().await?;

// spawn a task that processes invalidation messages.
let mut invalidations = client.on_invalidation();
tokio::spawn(async move {
while let Ok(invalidation) = invalidations.recv().await {
println!("{}: Invalidate {:?}", invalidation.server, invalidation.keys);
}
});

// enable client tracking on all connections. it's usually a good idea to do this in an `on_reconnect` block.
let _ = client.start_tracking(None, false, false, false, false).await?;
let _: () = client.get("foo").await?;

// send `CLIENT CACHING yes|no` before subsequent commands. the preceding `CLIENT CACHING yes|no` command will be
// sent when the command is retried as well.
println!("foo: {}", client.caching(false).incr::<i64, _>("foo").await?);
println!("foo: {}", client.caching(true).incr::<i64, _>("foo").await?);
let _ = client.stop_tracking().await?;

Ok(())
}

async fn resp2_basic_interface_example() -> Result<(), RedisError> {
let subscriber = RedisClient::default();
let client = RedisClient::default();

// RESP2 requires two connections
let _ = subscriber.connect();
let _ = client.connect();
let _ = subscriber.wait_for_connect().await?;
let _ = client.wait_for_connect().await?;

// the invalidation subscriber interface is the same as above even in RESP2 mode **as long as the `client-tracking`
// feature is enabled**. if the feature is disabled then the message will appear on the `on_message` receiver.
let mut invalidations = subscriber.on_invalidation();
tokio::spawn(async move {
while let Ok(invalidation) = invalidations.recv().await {
println!("{}: Invalidate {:?}", invalidation.server, invalidation.keys);
}
});
// in RESP2 mode we must manually subscribe to the invalidation channel. the `start_tracking` function does this
// automatically with the RESP3 interface.
let _: () = subscriber.subscribe("__redis__:invalidate").await?;

// enable client tracking, sending invalidation messages to the subscriber client
let (_, connection_id) = subscriber
.connection_ids()
.await
.into_iter()
.next()
.expect("Failed to read subscriber connection ID");
let _ = client
.client_tracking("on", Some(connection_id), None, false, false, false, false)
.await?;

println!("Tracking info: {:?}", client.client_trackinginfo::<RedisValue>().await?);
println!("Redirection: {}", client.client_getredir::<i64>().await?);

let pipeline = client.pipeline();
// it's recommended to pipeline `CLIENT CACHING yes|no` if the client is used across multiple tasks
let _: () = pipeline.client_caching(true).await?;
let _: () = pipeline.incr("foo").await?;
println!("Foo: {}", pipeline.last::<i64>().await?);

Ok(())
}

#[tokio::main]
// see https://redis.io/docs/manual/client-side-caching/ for more information
async fn main() -> Result<(), RedisError> {
pretty_env_logger::init();

resp3_tracking_interface_example().await?;
// resp2_basic_interface_example().await?;

Ok(())
}
61 changes: 61 additions & 0 deletions examples/misc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use fred::{
prelude::*,
types::{BackpressureConfig, BackpressurePolicy, PerformanceConfig},
};

#[tokio::main]
async fn main() -> Result<(), RedisError> {
// full configuration for performance tuning options
let perf = PerformanceConfig {
// whether or not to automatically pipeline commands across tasks
auto_pipeline: true,
// the max number of frames to feed into a socket before flushing it
max_feed_count: 1000,
// a default timeout to apply to all commands (0 means no timeout)
default_command_timeout_ms: 0,
// the amount of time to wait before rebuilding the client's cached cluster state after a MOVED error.
cluster_cache_update_delay_ms: 10,
// the maximum number of times to retry commands
max_command_attempts: 3,
// backpressure config options
backpressure: BackpressureConfig {
// whether to disable automatic backpressure features
disable_auto_backpressure: false,
// the max number of in-flight commands before applying backpressure or returning backpressure errors
max_in_flight_commands: 5000,
// the policy to apply when the max in-flight commands count is reached
policy: BackpressurePolicy::Drain,
},
// the amount of time a command can wait in memory without a response before the connection is considered
// unresponsive
#[cfg(feature = "check-unresponsive")]
network_timeout_ms: 60_000,
};
let config = RedisConfig {
server: ServerConfig::default_clustered(),
..RedisConfig::default()
};

let client = RedisClient::new(config, Some(perf), None);
let _ = client.connect();
let _ = client.wait_for_connect().await?;

// update performance config options
let mut perf_config = client.perf_config();
perf_config.max_command_attempts = 100;
perf_config.max_feed_count = 1000;
client.update_perf_config(perf_config);

// interact with specific cluster nodes
if client.is_clustered() {
let connections = client.active_connections().await?;

for server in connections.into_iter() {
let info: String = client.with_cluster_node(&server).client_info().await?;
println!("Client info for {}: {}", server, info);
}
}

let _ = client.quit().await?;
Ok(())
}
29 changes: 28 additions & 1 deletion examples/serde.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use fred::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

// from the serde json docs
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
struct Person {
name: String,
age: u8,
phones: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<(), RedisError> {
pretty_env_logger::init();

let client = RedisClient::default();
let _ = client.connect();
let _ = client.wait_for_connect().await?;
Expand All @@ -22,9 +33,25 @@ async fn main() -> Result<(), RedisError> {
println!("GET Result: {}", get_result);
let hget_result: Value = client.hgetall("wobble").await?;
println!("HGETALL Result: {}", hget_result);

assert_eq!(value, get_result);
assert_eq!(value, hget_result);

// or store types as json strings via Serialize and Deserialize
let person = Person {
name: "Foo".into(),
age: 42,
phones: vec!["abc".into(), "123".into()],
};

let serialized = serde_json::to_string(&person)?;
let _: () = client.set("person 1", serialized, None, None, false).await?;
// deserialize as a json value
let deserialized: Person = serde_json::from_value(client.get::<Value, _>("person 1").await?)?;
assert_eq!(person, deserialized);
// or as a json string
let deserialized: Person = serde_json::from_str(&client.get::<String, _>("person 1").await?)?;
assert_eq!(person, deserialized);

let _ = client.quit().await;
Ok(())
}
Loading

0 comments on commit 1544f47

Please sign in to comment.