Skip to content

Commit

Permalink
5.0.0 (#38)
Browse files Browse the repository at this point in the history
* URL parsing into `RedisConfig`
* Change `bzpopmin` and `bzpopmax` return value types
* Bug fixes
* Remove unimplemented mocks feature
  • Loading branch information
aembke authored May 3, 2022
1 parent f20c0a5 commit 9a16ee7
Show file tree
Hide file tree
Showing 21 changed files with 712 additions and 194 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ TLDR
* Versions 5.x are focused on feature parity with newer Redis features (streams, RESP3, etc)
* Versions 6.x will be focused on performance.

## 5.0.0

* Bug fixes
* Support URL parsing into a `RedisConfig`
* Update `bzpopmin` and `bzpopmax` return type
* Remove unimplemented `mocks` feature

## 5.0.0-beta.1

* Rewrite the [protocol parser](https://github.com/aembke/redis-protocol.rs) so it can decode frames without moving or copying the underlying bytes
Expand All @@ -14,8 +21,6 @@ TLDR
* Relax some restrictions on certain commands being used in a transaction
* Implement the Streams interface (XADD, XREAD, etc)
* RESP3 support
* Minor perf improvements via the removal of some locks...
* Minor perf regressions from workarounds required to use [async functions with traits](https://smallcultfollowing.com/babysteps/blog/2019/10/26/async-fn-in-traits-are-hard/). In the end it's a wash.
* Move most perf configuration options from `globals` to client-specific config structs
* Add backpressure configuration options to the client config struct
* Fix bugs that can occur when using non-UTF8 byte arrays as keys
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This document gives some background on how the library is structured and how to
* Use 2 spaces instead of tabs.
* Run rustfmt before submitting any changes.
* Clean up any compiler warnings.
* Submit PRs against the `develop` branch.

## TODO List

Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "5.0.0-beta.1"
version = "5.0.0"
authors = ["Alec Embke <[email protected]>"]
edition = "2018"
description = "An async Redis client for Rust built on Futures and Tokio."
Expand Down Expand Up @@ -86,7 +86,6 @@ metrics = []
ignore-auth-error = []
enable-tls = ["native-tls", "tokio-native-tls"]
vendored-tls = ["enable-tls", "native-tls/vendored"]
mocks = []
reconnect-on-auth-error = []
pool-prefer-active = []
full-tracing = ["partial-tracing", "tracing", "tracing-futures"]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async fn main() -> Result<(), RedisError> {
// connect to the server, returning a handle to the task that drives the connection
let _ = client.connect(Some(policy));
let _ = client.wait_for_connect().await?;
// attention: delete the keys in all databases
let _ = client.flushall(false).await?;

// convert responses to many common Rust types
Expand Down
28 changes: 16 additions & 12 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const DATABASE: u8 = 2;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
// example showing how to parse a redis URL (from an environment variable, etc)
// see the `RedisConfig::from_url` function documentation for more information
let config = RedisConfig::from_url("redis://username:[email protected]:6379/1")?;

// example showing a full kitchen sink configuration
// use `..Default::default` to fill in defaults wherever needed
let config = RedisConfig {
Expand All @@ -16,15 +20,15 @@ async fn main() -> Result<(), RedisError> {
server: ServerConfig::new_centralized("127.0.0.1", 6379),
// how to handle commands sent while a connection is blocked
blocking: Blocking::Block,
// an optional username, if using ACL rules
// an optional username, if using ACL rules. use "default" if you need to specify a username but have not configured ACL rules.
username: None,
// an optional authentication key or password
password: None,
// optional TLS settings
// optional TLS settings (requires the `enable-tls` feature)
tls: None,
// whether to enable tracing
// whether to enable tracing (only used with `partial-tracing` or `full-tracing` features)
tracing: false,
// the protocol version to use
// the protocol version to use. note: upgrading an existing codebase to RESP3 can be non-trivial. be careful.
version: RespVersion::RESP2,
// the database to automatically select after connecting or reconnecting
database: Some(DATABASE),
Expand All @@ -36,7 +40,7 @@ async fn main() -> Result<(), RedisError> {
max_feed_count: 1000,
// a default timeout to apply to all commands (0 means no timeout)
default_command_timeout_ms: 0,
// the amount of time to wait before syncing cluster state after a MOVED or ASK error
// the amount of time to wait before rebuilding the client's cached cluster state after a MOVED or ASK error.
cluster_cache_update_delay_ms: 10,
// the maximum number of times to retry commands when connections close unexpectedly
max_command_attempts: 3,
Expand All @@ -59,15 +63,14 @@ async fn main() -> Result<(), RedisError> {

// run a function when the connection closes unexpectedly
tokio::spawn(client.on_error().for_each(|e| async move {
println!("Client received connection error: {:?}", e);
println!("Client disconnected with error: {:?}", e);
}));
// run a function whenever the client reconnects
tokio::spawn(client.on_reconnect().for_each(move |client| async move {
println!("Client {} reconnected.", client.id());
Ok(())
}));

let _ = client.connect(Some(policy));
let connection_task = client.connect(Some(policy));
let _ = client.wait_for_connect().await?;

// declare types on response values
Expand All @@ -82,11 +85,12 @@ async fn main() -> Result<(), RedisError> {
println!("Foo: {:?}", client.get::<String, _>("foo").await?);

// update performance config options as needed
client.update_perf_config(PerformanceConfig {
max_command_attempts: 100,
..Default::default()
});
let mut perf_config = client.client_config().performance;
perf_config.max_command_attempts = 100;
client.update_perf_config(perf_config);

let _ = client.quit().await?;
// or manage the connection task directly if needed. however, calling `quit` or `shutdown` also ends the connection task.
connection_task.abort();
Ok(())
}
33 changes: 20 additions & 13 deletions examples/resilience.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
use fred::globals::{self, ReconnectError};
use fred::prelude::*;
use fred::types::InfoKind::Default;
use fred::types::PerformanceConfig;
use futures::StreamExt;

const DATABASE: u8 = 2;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
// try to write commands up to 20 times. this value is incremented for a command whenever the connection closes while the
// command is in-flight or whenever the client receives a MOVED error in response to the command. in the case of a MOVED
// error the client will not try the command again until the hash slot is finished migrating to the destination node.
globals::set_max_command_attempts(20);
// configure the amount of time to wait when checking the state of migrating hash slots
globals::set_cluster_error_cache_delay_ms(100);
// apply a global timeout on commands, if necessary. otherwise the client will attempt to write them until the max attempt
// count is reached (however long that takes depends on the reconnect policy, etc).
globals::set_default_command_timeout(60_000);
// automatically trigger reconnection and retry logic whenever errors are received with the provided prefixes
globals::set_custom_reconnect_errors(vec![
ReconnectError::Loading,
Expand All @@ -27,6 +20,18 @@ async fn main() -> Result<(), RedisError> {
// if you use this feature make sure your server config is correct or you wont see errors until the reconnection policy
// max attempts value is reached (unless certain logging is enabled).
fail_fast: false,
performance: PerformanceConfig {
// try to write commands up to 20 times. this value is incremented for a command whenever the connection closes while the
// command is in-flight, or whenever the client receives a MOVED/ASK error in response to the command. in the case of a
// MOVED/ASK error the client will not try the command again until the hash slot is finished migrating to the destination node.
max_command_attempts: 20,
// configure the amount of time to wait when checking the state of migrating hash slots.
cluster_cache_update_delay_ms: 100,
// apply a global timeout on commands, if necessary. otherwise the client will attempt to write them until the max attempt
// count is reached (however long that takes depends on the reconnect policy, etc).
default_command_timeout_ms: 60_000,
..Default::default()
},
..Default::default()
};
// configure exponential backoff when reconnecting, starting at 100 ms, and doubling each time up to 30 sec.
Expand All @@ -42,10 +47,12 @@ async fn main() -> Result<(), RedisError> {
// run a function whenever the client reconnects
tokio::spawn(client.on_reconnect().for_each(move |client| async move {
println!("Client {} reconnected.", client.id());
// set up db selection logic to run whenever we reconnect
let _ = client.select(DATABASE).await;
// if using pubsub features then SUBSCRIBE calls should go here too
// if it's necessary to change users then AUTH calls should go here too
// (re)subscribe to any pubsub channels upon connecting or reconnecting.
let _ = client.subscribe("foo").await?;
// it is recommended to use the `database`, `username`, and `password` fields on the `RedisConfig` instead of
// manually adding SELECT or AUTH/HELLO calls to this block. in-flight commands will retry before commands
// specified in this block, which may lead to some difficult bugs. however, the client will automatically
// authenticate and select the correct database first if the associated configuration options are provided.
}));

let _ = client.connect(Some(policy));
Expand Down
3 changes: 2 additions & 1 deletion src/commands/impls/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ where
{
let key = key.into();
let frame = utils::request_response(inner, move || {
let mut args = Vec::with_capacity(2);
let mut args = Vec::with_capacity(3);
args.push(key.into());

if let Some(samples) = samples {
args.push(static_val!(SAMPLES));
args.push(samples.into());
}

Expand Down
1 change: 1 addition & 0 deletions src/commands/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub static IDLE: &'static str = "IDLE";
pub static TIME: &'static str = "TIME";
pub static RETRYCOUNT: &'static str = "RETRYCOUNT";
pub static JUSTID: &'static str = "JUSTID";
pub static SAMPLES: &'static str = "SAMPLES";

/// Macro to generate a command function that takes no arguments and expects an OK response - returning `()` to the caller.
macro_rules! ok_cmd(
Expand Down
1 change: 1 addition & 0 deletions src/commands/impls/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub fn scan_cluster(
}

for slot in hash_slots.into_iter() {
_trace!(inner, "Scan cluster hash slot server: {}", slot);
let scan_inner = KeyScanInner {
key_slot: Some(slot),
tx: tx.clone(),
Expand Down
77 changes: 4 additions & 73 deletions src/commands/impls/sorted_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::protocol::types::*;
use crate::protocol::utils as protocol_utils;
use crate::types::*;
use crate::utils;
use redis_protocol::resp3::types::Frame;
use std::convert::TryInto;
use std::str;
use std::sync::Arc;

fn new_range_error(kind: &Option<ZSort>) -> Result<(), RedisError> {
Expand Down Expand Up @@ -49,54 +47,7 @@ fn check_range_types(min: &ZRange, max: &ZRange, kind: &Option<ZSort>) -> Result
Ok(())
}

fn bytes_to_f64(b: &[u8]) -> Result<f64, RedisError> {
str::from_utf8(b)
.map_err(|e| e.into())
.and_then(|s| s.parse::<f64>().map_err(|e| e.into()))
}

fn frames_to_bzpop_result(mut frames: Vec<Frame>) -> Result<Option<(RedisKey, RedisValue, f64)>, RedisError> {
if frames.len() != 3 {
return Err(RedisError::new(
RedisErrorKind::ProtocolError,
"Expected 3 element array.",
));
}
let score_frame = frames.pop().unwrap();
let value_frame = frames.pop().unwrap();
let key_frame = frames.pop().unwrap();

let score = match score_frame {
Frame::SimpleString { data, .. } => bytes_to_f64(&data)?,
Frame::BlobString { data, .. } => bytes_to_f64(&data)?,
Frame::Double { data, .. } => data,
_ => {
return Err(RedisError::new(
RedisErrorKind::ProtocolError,
"Expected bulk string score.",
))
}
};
let value = protocol_utils::frame_to_results(value_frame)?;
let key = match key_frame {
Frame::SimpleString { data, .. } => data.into(),
Frame::BlobString { data, .. } => data.into(),
_ => {
return Err(RedisError::new(
RedisErrorKind::ProtocolError,
"Expected bulk string key,",
))
}
};

Ok(Some((key, value, score)))
}

pub async fn bzpopmin<K>(
inner: &Arc<RedisClientInner>,
keys: K,
timeout: f64,
) -> Result<Option<(RedisKey, RedisValue, f64)>, RedisError>
pub async fn bzpopmin<K>(inner: &Arc<RedisClientInner>, keys: K, timeout: f64) -> Result<RedisValue, RedisError>
where
K: Into<MultipleKeys>,
{
Expand All @@ -113,22 +64,10 @@ where
})
.await?;

if let Frame::Array { data, .. } = frame {
frames_to_bzpop_result(data)
} else {
if protocol_utils::is_null(&frame) {
Ok(None)
} else {
Err(RedisError::new(RedisErrorKind::ProtocolError, "Expected nil or array."))
}
}
protocol_utils::frame_to_results(frame)
}

pub async fn bzpopmax<K>(
inner: &Arc<RedisClientInner>,
keys: K,
timeout: f64,
) -> Result<Option<(RedisKey, RedisValue, f64)>, RedisError>
pub async fn bzpopmax<K>(inner: &Arc<RedisClientInner>, keys: K, timeout: f64) -> Result<RedisValue, RedisError>
where
K: Into<MultipleKeys>,
{
Expand All @@ -145,15 +84,7 @@ where
})
.await?;

if let Frame::Array { data, .. } = frame {
frames_to_bzpop_result(data)
} else {
if protocol_utils::is_null(&frame) {
Ok(None)
} else {
Err(RedisError::new(RedisErrorKind::ProtocolError, "Expected nil or array."))
}
}
protocol_utils::frame_to_results(frame)
}

pub async fn zadd<K>(
Expand Down
2 changes: 1 addition & 1 deletion src/commands/interfaces/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub trait KeysInterface: ClientLike + Sized {

/// Set a value with optional NX|XX, EX|PX|EXAT|PXAT|KEEPTTL, and GET arguments.
///
/// Note: the `get` flag was added in 6.2.0.
/// Note: the `get` flag was added in 6.2.0. Setting it as `false` works with Redis versions <=6.2.0.
///
/// <https://redis.io/commands/set>
fn set<R, K, V>(
Expand Down
10 changes: 6 additions & 4 deletions src/commands/interfaces/sorted_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,28 @@ pub trait SortedSetsInterface: ClientLike + Sized {
/// The blocking variant of the ZPOPMIN command.
///
/// <https://redis.io/commands/bzpopmin>
fn bzpopmin<K>(&self, keys: K, timeout: f64) -> AsyncResult<Option<(RedisKey, RedisValue, f64)>>
fn bzpopmin<R, K>(&self, keys: K, timeout: f64) -> AsyncResult<R>
where
R: FromRedis + Unpin + Send,
K: Into<MultipleKeys>,
{
into!(keys);
async_spawn(self, |inner| async move {
commands::sorted_sets::bzpopmin(&inner, keys, timeout).await
commands::sorted_sets::bzpopmin(&inner, keys, timeout).await?.convert()
})
}

/// The blocking variant of the ZPOPMAX command.
///
/// <https://redis.io/commands/bzpopmax>
fn bzpopmax<K>(&self, keys: K, timeout: f64) -> AsyncResult<Option<(RedisKey, RedisValue, f64)>>
fn bzpopmax<R, K>(&self, keys: K, timeout: f64) -> AsyncResult<R>
where
R: FromRedis + Unpin + Send,
K: Into<MultipleKeys>,
{
into!(keys);
async_spawn(self, |inner| async move {
commands::sorted_sets::bzpopmax(&inner, keys, timeout).await
commands::sorted_sets::bzpopmax(&inner, keys, timeout).await?.convert()
})
}

Expand Down
Loading

0 comments on commit 9a16ee7

Please sign in to comment.