This document gives some background on how the library is structured and how to contribute.
- Use 2 spaces instead of tabs.
- Run rustfmt before submitting any changes.
- Clean up any compiler warnings.
- Support custom DNS resolvers on the client.
- Add a FF that uses rustls instead of tokio-native-tls
- Add a FF for redis v7 changes (GET, XAUTOCLAIM, etc)
- Any missing commands.
- Support unix domain sockets
- Switch to
ArcStr
fromArc<String>
for map identifiers in the multiplexer. - General cleanup and refactoring. A lot of the lower level logic was written before async/await, before
impl Trait
, and before NLLs. It could certainly be more modern and generic.
This section covers some useful design considerations and assumptions that went into this module.
- Debugging Redis issues late at night is not fun. If you find yourself adding log lines that help debug an issue please clean them up and leave them in the code. The one exception is that logs should never include potentially sensitive user data (i.e. don't commit changes that log full requests or responses). The
network-logs
feature can enable sensitive logs if needed. - Any client struct needs to be
Send + Sync
to work effectively with Tokio. - Any client struct should be fast and cheap to
Clone
. - The primary command interfaces should be as flexible as possible via use of
Into
andTryInto
for arguments. - Assume nearly any command might be used in the context of a transaction, and so it could return a
QUEUED
response even if the docs only mention bulk strings, arrays, etc. There are some exceptions to this where return values could be typed to exactly match the rust-equivalent type of the return value, but generally speaking every command should return aRedisValue
.
There are other Redis libraries for Rust that have different goals, but the main goal of this library is to provide callers with a high level interface that abstracts away everything to do with safe and reliable connection management. This also includes some optional features to automatically handle common use cases around error handling, reconnection & backoff, retry, metrics, etc.
This section covers how code close to the network layer works.
- This library uses the redis-protocol crate to encode and decode commands via a
RedisCodec
from the codec.rs module. EachRedisCodec
instance also tracks payload size metrics for encoded and decoded frames. - Connections are created by combining a
RedisCodec
with a TcpStream or TlsStream to create a Framed stream. Framed
streams are split after being created. The writer half is stored on a struct that processes and routes commands, and the reader half is used in a background task that processes the stream of frames coming from the socket.- If TLS features are disabled the TLS type aliases become references to the TCP types.
- The writer half exposes 2 functions for sending frames to the socket: feed and send.
send
flushes the socket andfeed
does not. The client will usesend
under any of the following conditions:- There are no queued commands following the current command.
- The max number of fed commands (
max_feed_count
) was reached. In this case the client will usesend
once and reset the feed counter. - The current command is QUIT or SHUTDOWN.
- The current command ends a transaction.
- The client has pipelining disabled.
All frames are automatically converted to RESP3 frames, even in RESP2 mode, to provide a single interface for callers to parse responses. This works because RESP3 is a superset of RESP2 and the type conversion logic accounts for the different possible representations of most data types.
When establishing fresh connections to the server(s), or rebuilding existing connections, the client does the following:
- Try to connect to any of the nodes included in the client's
RedisConfig
. - Upon successfully connecting and authenticating run the
CLUSTER NODES
command. - Parse the response and build a
ClusterKeyCache
struct. - Connect and authenticate to all of the primary/main nodes in the cluster.
- Split each connection, storing the writer half on the
Multiplexer
and spawning a background task to read from the reader half.
The ClusterKeyCache
struct does the following:
- Parses the
CLUSTER NODES
response, building out a sorted array ofSlotRange
structs for each slot range in the cluster. EachSlotRange
contains a reference to the server that owns the hash slot range and the starting and ending slots. - Provides an interface for mapping a Redis key to a
SlotRange
. This interface uses the redis_keyslot mapping function from theredis_protocol
crate to hash a Redis key, and then a binary search of theSlotRange
array. - Provides interfaces for looking up a random node in the cluster and rebuilding the existing cache in place.
When writing a command to a Redis cluster the client does the following:
- Hash the command's key if it has a key, otherwise pick a random cluster node.
- Find the associated slot range for the key's hash slot.
- Use the server ID of the hash slot range from step 2 to look up the writer half of the socket connected to that node.
- Convert the command to a Frame.
- Push the command onto the back of a queue of commands attached to the socket. This will be covered more in the pipelining section.
- Pass the frame from step 4 to the
Framed
sink on top of the socket writer half from step 3, possibly flushing the socket in the process.
Redis allows clients to switch between several states:
- The client can be in a pipelined request-response state.
- The client can be in a non-pipelined request-response state.
- The client can be in a blocking request-response state.
- The client can be in a subscriber state for pubsub or keyspace event messages.
In order to support use cases where a client may switch between these states at any time this library implements a struct called the Multiplexer
that manages all connection-related state. The
Multiplexer
struct is not directly attached to a RedisClient
, but instead exists as a part of a separate Tokio task that processes commands from the client.
The Multiplexer
struct stores the following state:
- A connection map that maps server IDs to
Framed
sinks, or just oneFramed
sink when connected to a centralized Redis deployment. - A map that maps server IDs to a queue of in-flight commands (
VecDeque<RedisCommand>
). This is a different map/queue because it has different locking requirements than the connection map. This is the only lock that should see any real contention, but it is a requirement to implement pipelining. - A broadcast sender used to broadcast connection issues from either the reader or writer half of the socket.
- An instance of a
ClusterKeyCache
used to route commands to specific nodes in the cluster, if necessary. - An
Arc<RedisClientInner>
, used to communicate messages back to the client when a connection dies, state changes, pubsub messages are received, etc.
All commands from a RedisClient
instance move through one channel where the client instance writes to the channel
and the Multiplexer
reads from the channel. Clones of a RedisClient
all share the same command channel, so cloning a RedisClient
mostly boils down to cloning an Arc<UnboundedSender>
.
The rate at which commands are processed from this channel depends on the client's settings.
When the client sends a command to the server the following operations occur:
- The client prepares the command, creating a
Vec<RedisValue>
array of arguments. - The client attaches a oneshot sender to the command on which the response will be sent.
- The client sends the command to the
command_tx
sender channel on theRedisClientInner
struct. - The client calls
await
on the receiver half of the oneshot channel from step 2. - Some time later the
Multiplexer
receives the command from the command stream running in a separate Tokio task. - The
Multiplexer
checks the command's flags to determine if it makes sense to send in the current connection context. - The
Multiplexer
checks if the command should be pipelined or not. If not it attaches another oneshot sender to the command. - The
Multiplexer
hashes the command key (if necessary), looks up the socket connected to the node that should receive the command, and writes the command to the socket. - If the command cannot be written due to backpressure or network connectivity issues the
Multiplexer
either calls sleep or rebuilds the connections, replays any in-flight commands, and tries again. - If writing the command succeeds the
Multiplexer
decides to wait on a response or not based on the checks from step 7. If not it moves on to the next command without waiting on a response. - Some time later a response is received by the reader task which forwards the response to the oneshot channels from step 2 and step 7 (if necessary).
- When the future from step 4 resolves the client converts the response frame(s) to the correct output types for the Redis command and returns them to the caller.
Contributors only need to consider steps 1 and 12 when implementing commands - the other steps are the same regardless of the command.
Once a connection is established to a Redis server the client splits the connection into separate reader and writer halves. The writer half is covered above, and this section covers how the reader half is used.
The reader half processes frames asynchronously with respect to the writer half via a separate Tokio task. After the socket is split the client spawns another Tokio task where this task has access to a shallow clone of the Multiplexer
struct.
Once a connection is established the Multiplexer
does the following:
- Split the connection. The writer half is covered above.
- Spawn a task with access to the reader half, a reference to the server ID to which the reader half is connected, and a shallow clone of the
Multiplexer
. - Convert the reader half to a
Stream
, calling try_fold on it in the process. While this does mean the stream is processed in series the reader task neverawaits
a future so there wouldn't be any real benefit of processing the stream concurrently on an event loop. By processing the stream in series it also makes it very easy to handle situations where the command should be retried, or reconnection needs to occur, since the reader task can just put the command back at the front of the in-flight queue without worrying about another task having popped from the queue in the meantime.
Inside the try_fold
loop the reader task does the following:
- Check if the incoming frame is a pubsub message, keyspace event, or some other out-of-band message, and if so forward the message onto any of the appropriate channels.
- Check if the incoming frame is the response to a command. If so then determine whether more responses are necessary, or if not start processing the response.
- Link the response to the command at the front of the in-flight command queue, popping the command off the queue in the process.
- Take the response oneshot channel sender from the command, and forward the response frame to this channel.
If the try_fold
stream closes unexpectedly the reader task will broadcast a message on the Multiplexer
connection error broadcast channel. This will then trigger reconnection and replay logic in a separate task.
The response handling logic is in the responses.rs file.
Automatic tracing in client libraries can be very useful. However, Redis is fast enough that emitting trace data for a Redis client can result in a ton of network traffic to the tracing collector.
This document covers what is traced and how these relate to the two optional tracing features: full-tracing
and partial-tracing
. While writing and testing this library I often saw tracing exporter errors due to having filled up the buffer between the subscriber and exporter. As a result the tracing logic was separated into the two features mentioned earlier. Callers can see a lot of benefit just from the partial-tracing
feature, but it may also be beneficial to enable full-tracing
while debugging a difficult issue. However, callers that use full-tracing
should expect to spend some time tuning their subscriber and exporter settings to handle a lot of tracing output.
One thing to note with the tracing features is that they have a big impact on performance. For example, using the included pipeline_test
module in this app:
$ RUST_LOG=pipeline_test=debug cargo run --release --features "partial-tracing" -- -c 300000 -C 20 pipeline
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/pipeline_test -c 300000 -C 20 pipeline`
INFO pipeline_test > Running with configuration: Argv { tracing: false, count: 300000, tasks: 20, host: "127.0.0.1", port: 6379, pipeline: true }
INFO pipeline_test > Initialized opentelemetry-jaeger pipeline.
INFO pipeline_test > Connecting to 127.0.0.1:6379...
INFO pipeline_test > Connected to 127.0.0.1:6379.
INFO pipeline_test > Starting commands...
Performed 300000 operations in: 4.951572778s. Throughput: 60593 req/sec
$ RUST_LOG=pipeline_test=debug cargo run --release -- -c 300000 -C 20 pipeline
Finished release [optimized + debuginfo] target(s) in 0.04s
Running `target/release/pipeline_test -c 300000 -C 20 pipeline`
INFO pipeline_test > Running with configuration: Argv { tracing: false, count: 300000, tasks: 20, host: "127.0.0.1", port: 6379, pipeline: true }
INFO pipeline_test > Initialized opentelemetry-jaeger pipeline.
INFO pipeline_test > Connecting to 127.0.0.1:6379...
INFO pipeline_test > Connected to 127.0.0.1:6379.
INFO pipeline_test > Starting commands...
Performed 300000 operations in: 2.495155038s. Throughput: 120240 req/sec
Note that in the first example above the tracing data isn't even being emitted to the collector (the sampler is AlwaysOff
). Just including the tracing logic to add and track spans is enough to cut performance in half.
Many applications are not bounded by Redis throughput and so enabling the tracing features likely won't have any noticeable effect. However, the tracing features are opt-in for callers because of this performance impact. Callers will have to weigh the significant benefits of tracing against the performance loss to their application's use cases.
All commands depend on a utility function called request_response
in the command utils. This function implements tracing generically so that developers adding commands don't need to add any tracing logic to individual command functions.
One of the arguments to the request_response
utility is a closure that should be used to format and prepare any arguments. This is implemented as a closure so that the library can trace the time spent in this function without requiring callers to write the same tracing boilerplate in each command function.
This pattern is used in most commands, but in some commands the preprocessing is so trivial that the closure essentially becomes a no-op. However it's good practice to move as much of the preprocessing logic into this closure as possible so this time shows up in traces.
Individual commands are represented by the RedisCommand
struct and RedisCommandKind
enum in the protocol types file. All commands have the same general structure, represented by the RedisCommand
struct, while command-specific data is stored in the associated RedisCommandKind
enum variant.
There are several types of commands that are handled differently by the Multiplexer
:
- Blocking commands
- Compound commands (any command with a subcommand separated by a space in the command string).
- Commands that expect more than one distinct array of response frames (
PSUBSCRIBE
, etc). - Scanning commands.
- Commands that start or end a transaction.
- Special commands that don't map directly to a Redis command (
flushall_cluster
,split_cluster
, etc).
All commands also operate on RedisValue
enums as arguments, even for keys or other types of values. Contributors should use Into
or TryInto
as much as possible to leverage the type conversion utilities that exist in the top level types file.
One thing to consider when implementing any command is how that command will function in the context of a transaction. The RedisValue
enum supports a Queued
variant to make this easy, but not all commands should be allowed while inside a transaction. When a command is not allowed inside a transaction developers can return types that more closely match the command's return values instead of a generic RedisValue
.
However, most commands should work inside a transaction, so most command functions should be written to just return a RedisValue
.
If a command should not work inside a transaction then the command should use the disallow_during_transaction
utility function before calling the command function. Any command function that does not return a RedisValue
must perform this check.
This section will cover how to add new commands to the client.
There are usually only 2 files that require modifications to add a command, although certain commands may require modifying 3 or 4 files.
- The appropriate client file sometimes requires a line to implement a new interface trait. This is usually only the case when starting implementing the first command in a new command category.
- The interface file almost always requires changes to implement the generic interface to the command.
- The implementation file almost always requires changes to implement the actual command logic.
- The protocol types file sometimes requires changes to add a new variant to the
RedisCommandKind
enum.
When adding new commands a few new things often need to be added to the protocol types file.
- Add a variant to the
RedisCommandKind
enum for the command. For most commands this variant will be empty. However, if there are any special flags or state needed by the command that should go inside this new variant declaration. - Add the string representation of the new variant to the
to_str_debug
function. This is what will be used in tracing fields and log lines. - Add the first word of the string representation of the command to the
cmd_str
function. - If the command is a compound command add the subcommand string to the
subcommand_str
function. If not then skip this step. - If the command is a blocking command add it to the
is_blocking_command
function's match statement. - If the command uses a unique key structure, such as a set of keys at the end of the command args (like
XREAD
) it may be necessary to change thecustom_key_slot
function to account for this. This is very rare though since almost every command takes the key as the first argument.
Commands are organized in two folders in the commands folder.
The trait declarations exist in the interfaces folder by category, and the actual command implementation exists in the impl folder by category. The interfaces
file often calls the associated function from the impls
file.
These files contain the public interface declarations for subsets of the Redis interface.
- Async functions are not supported in traits, so we return an
AsyncResult
from each of these functions. This struct implements theFuture
trait so callers can use it like an async function. - Functions should take generic arguments to be flexible to the caller. Use
Into<RedisKey>
,TryInto<RedisValue>
, etc. There are quite a few helper structs in the types folder to make this easier. - These functions must convert generic arguments to the actual underlying types used by the associated impl file/function. The
into!()
andtry_into!()
macros can convert multiple types automatically, and are written to break out early withAsyncResult
errors as needed. - These functions should return generic response types in most cases. This usually means declaring the response as
FromRedis + Unpin + Send
. - These functions should use the
async_spawn
function from the top-level interfaces file to call an async block from a non-async function. - Contributors should add some docs from the Redis website (try to limit to one sentence or so), and a link to the actual command documentation.
These files contain the actual implementation details for each command. They are not called directly by users, but rather by the associated file/function in the interface file.
- All private command functions in this folder take their first argument as a
&Arc<RedisClientInner>
. This struct contains all the necessary state for any command. - These functions do not need to be written in a generic way. They can assume any callers will have converted values to any intermediate structs/enums. In the past they were written generically, so you may see that in some older code paths, but I'm trying to standardize these to use non-generic arguments going forward.
- Some helpful command function generation macros exist in the command mod.rs file to remove boilerplate for simple commands.
- All commands should use the
request_response
utility function from the top level utils file. - Private command functions are responsible for preparing their arguments array and converting the response frame to the appropriate return value type.
- It should not be necessary to add any tracing logic to individual command functions.
There are 2 functions in the protocol utils for converting response frames into RedisValue
enums.
frame_to_results
- Converts an arbitrarily nested response frame into an arbitrarily nestedRedisValue
, including support forQUEUED
responses during a transaction.frame_to_single_result
- The same asframe_to_results
, but with an added validation layer that only allows for non-nestedRedisValue
variants. This is useful to detect unexpected protocol errors if a command should only return aBulkString
but receives anArray
instead, for example.
Both of these functions will automatically check for error frames and will generate the appropriate RedisError
, if necessary.
Both of these functions will automatically convert single-element response arrays to the first element in the array. This is done because RESP2 sends all responses as an array of bulk strings, even when the response only contains one element in the array. It's up to the developer to consider when an array is an appropriate return type for a command.
Additionally, if callers need to avoid the logic that automatically unwraps single-element arrays they can use the frame_to_results_raw
function, which will not modify responses in any way.
There are also some utility functions for converting to other data types:
frame_to_map
- Convert a frame representing an array of nested frames with an even number of elements to aRedisMap
.array_to_map
- Convert an array with an even number ofRedisValue
structs to aRedisMap
.frame_to_error
- Convert aFrame
to aRedisError
.
... and some others.
Additionally, the convert
function on any RedisValue
can convert to any type that implements FromRedis
. See the response type conversion file for more information.
Once the trait interface function and private impl function have been implemented the same function needs to be exposed on any relevant client structs.
In most cases this will not require any changes, but when adding a new command category it can. If needed callers should go to the relevant client file and impl
the new trait.
This example shows how to add MGET
to the commands.
- Add the new variant to the
RedisCommandKind
enum, if needed.
pub enum RedisCommandKind {
// ...
Mget,
// ...
}
impl RedisCommandKind {
// ..
pub fn to_str_debug(&self) -> &'static str {
match *self {
// ..
RedisCommandKind::Mget => "MGET",
// ..
}
}
// ..
pub fn cmd_str(&self) -> &'static str {
match *self {
// ..
RedisCommandKind::Mget => "MGET"
// ..
}
}
// ..
}
- Create the private function implementing the command in src/commands/impls/keys.rs.
pub async fn mget(inner: &Arc<RedisClientInner>, keys: MultipleKeys) -> Result<RedisValue, RedisError> {
utils::check_empty_keys(&keys)?;
let frame = utils::request_response(inner, move || {
// time spent here will show up in traces
let args = keys.inner().into_iter().map(|k| k.into()).collect();
Ok((RedisCommandKind::Mget, args))
})
.await?;
protocol_utils::frame_to_results(frame)
}
- Create the public function in the src/commands/interfaces/keys.rs file.
// ...
pub trait KeysInterface: ClientLike + Sized {
// ...
/// Returns the values of all specified keys. For every key that does not hold a string value or does not exist, the special value nil is returned.
///
/// <https://redis.io/commands/mget>
fn mget<R, K>(&self, keys: K) -> AsyncResult<R>
where
R: FromRedis + Unpin + Send,
K: Into<MultipleKeys>,
{
into!(keys);
async_spawn(self, |inner| async move {
commands::keys::mget(&inner, keys).await?.convert()
})
}
// ...
}
Finally, if the actual client struct (such as the RedisClient
) doesn't already have a line to implement the KeysInterface
then contributors need to add that.
impl KeysInterface for RedisClient {}
Integration tests are in the tests/integration folder organized by category. See the tests README for more information.
Using MGET
as an example:
- Write tests in the keys file.
pub async fn should_mget_values(client: RedisClient, _: RedisConfig) -> Result<(), RedisError> {
check_null!(client, "a{1}");
check_null!(client, "b{1}");
check_null!(client, "c{1}");
let expected: Vec<(&str, RedisValue)> = vec![("a{1}", 1.into()), ("b{1}", 2.into()), ("c{1}", 3.into())];
for (key, value) in expected.iter() {
let _: () = client.set(*key, value.clone(), None, None, false).await?;
}
let values: Vec<i64> = client.mget(vec!["a{1}", "b{1}", "c{1}"]).await?;
assert_eq!(values, vec![1, 2, 3]);
Ok(())
}
- Call the tests from the centralized server tests.
mod keys {
// ..
centralized_test!(keys, should_mget_values);
}
- Call the tests from the cluster server tests.
mod keys {
// ..
cluster_test!(keys, should_mget_values);
}
This will generate test wrappers to call your test function against both centralized and clustered redis servers with pipelined and non-pipelined clients in RESP2 and RESP3 modes.
Scanning is implemented in a way where callers can throttle the rate at which they page results from the server. Scanning typically works by repeatedly running the SCAN
(or HSCAN
, SSCAN
, etc) command with a different cursor each time, where the new cursor comes from the response to the previous SCAN
call.
There are a few ways to implement SCAN
, but this library opted for a method that allows the caller to throttle the scanning for a few reasons:
SCAN
only returns keys, so it's very common for callers to read the associated values for each page of results.- Background scanning can cause the in-memory buffer of pages to grow very quickly.
- Background scanning can have a real impact on server CPU resources if it's not throttled in some way.
- Background scanning can have a real impact on network resources if it's not throttled.
In order to implement this pattern the client returns a stream of values such as the ScanResult
struct. One of the functions on this struct is called next()
, which controls when the next SCAN
call with an updated cursor is sent to the server. Since the SCAN
function returns a stream of results this function essentially controls when the next item in the stream will arrive, which gives the caller a mechanism to throttle the scanning.
If callers want to scan the keyspace as fast as possible they can call next
at the top of the function that handles scan results, otherwise they can call it after an await
point to throttle the scanning.
As mentioned earlier this module has been used in a few applications over the last few years. These apps used Elasticache and encountered a few fun problems in that time. Some of these issues involved adding workarounds to the app layer, and in some cases these workarounds made their way into this library.
One of the more interesting issues resulted in the addition of the reconnect-on-auth-error
feature. This feature was added to work around the fact that sometimes Elasticache would start returning NOAUTH errors (meaning a client had not provided a password) to a client that had authenticated already. This would happen even without network interruptions or any password changes. While debugging this issue we noticed that similar issues had been filed on other Redis clients as well, but only when using Elasticache, and none of those issues had good answers as to why this was happening.
The workaround enabled by the reconnect-on-auth-error
feature was to treat NOAUTH errors like connection errors such that they trigger a reconnection. After reconnecting the client will then run the AUTH command again before replaying any failed commands, including the one that previously saw the NOAUTH error.
While profiling this module a few things stood out:
- The process of copying a command's args into a redis protocol
Frame
can use a non-trivial amount of CPU for large values. - The process of encoding
Frame
values into byte arrays can use a non-trivial amount of CPU for large values. - The process of decoding a byte array into a
Frame
can use a non-trivial amount of CPU for large values.
This library uses the codec interface to encode and decode frames, which does not allow for async functions to do the encoding or decoding. This means we cannot use spawn_blocking since we can't call await
on the returned JoinHandle
.
Fortunately Tokio has a mechanism for tasks like this: block_in_place. However, this function only works when using a multi-thread runtime. Trying to use this function on a current-thread runtime will panic.
To make these operations less impactful on an application this library provides a feature called blocking-encoding
. This feature will use block_in_place
for CPU-bound operations that operate on values over a certain size, but should only be enabled if the caller uses a multi-thread runtime.
See the globals file for information on configuring the size threshold where CPU-bound tasks will use this interface.