Skip to content

Commit

Permalink
6.2.0 (#127)
Browse files Browse the repository at this point in the history
* feat: add pipeline.try_all
* fix: fix recovery issue in kubernetes
* fix: hgetall queued response
* feat: pubsub channels, numpat, numsub, shard channels, shard numsub
---------

Co-authored-by: stiffme <[email protected]>
  • Loading branch information
aembke and stiffme committed May 1, 2023
1 parent b63d373 commit c388b8c
Show file tree
Hide file tree
Showing 21 changed files with 508 additions and 61 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 6.2.0

* Add `Pipeline::try_all`
* Add missing pubsub commands
* Improve docs, examples

## 6.1.0

* Add a [client tracking](https://redis.io/docs/manual/client-side-caching/) interface.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "6.1.0"
version = "6.2.0"
authors = ["Alec Embke <[email protected]>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand Down
9 changes: 9 additions & 0 deletions examples/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ async fn main() -> Result<(), RedisError> {
let _: () = pipeline.incr("foo").await?;
assert_eq!(pipeline.last::<i64>().await?, 2);

let _: () = client.del("foo").await?;
// or handle each command result individually
let pipeline = client.pipeline();
let _: () = pipeline.incr("foo").await?;
let _: () = pipeline.hgetall("foo").await?; // this will result in a `WRONGTYPE` error
let results = pipeline.try_all::<i64>().await;
assert_eq!(results[0].clone().unwrap(), 1);
assert!(results[1].is_err());

let _ = client.quit().await?;
Ok(())
}
2 changes: 2 additions & 0 deletions src/clients/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use bytes_utils::Str;
use futures::Stream;
use std::sync::Arc;

use crate::interfaces::PubsubInterface;
#[cfg(feature = "client-tracking")]
use crate::{
interfaces::RedisResult,
Expand Down Expand Up @@ -117,6 +118,7 @@ impl SetsInterface for Node {}
impl SortedSetsInterface for Node {}
impl StreamsInterface for Node {}
impl FunctionInterface for Node {}
impl PubsubInterface for Node {}

// remove the restriction on clustered deployments with the basic `CLIENT TRACKING` commands here
#[async_trait]
Expand Down
114 changes: 93 additions & 21 deletions src/clients/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use crate::{
ListInterface,
MemoryInterface,
PubsubInterface,
Resp3Frame,
ServerInterface,
SetsInterface,
SlowlogInterface,
SortedSetsInterface,
StreamsInterface,
},
modules::{inner::RedisClientInner, response::FromRedis},
prelude::RedisValue,
prelude::{RedisResult, RedisValue},
protocol::{
command::{RedisCommand, RouterCommand},
responders::ResponseKind,
Expand All @@ -33,7 +34,33 @@ use crate::{
};
use parking_lot::Mutex;
use std::{collections::VecDeque, fmt, fmt::Formatter, sync::Arc};
use tokio::sync::oneshot::channel as oneshot_channel;
use tokio::sync::oneshot::{channel as oneshot_channel, Receiver as OneshotReceiver};

fn prepare_all_commands(
commands: VecDeque<RedisCommand>,
error_early: bool,
) -> (RouterCommand, OneshotReceiver<Result<Resp3Frame, RedisError>>) {
let (tx, rx) = oneshot_channel();
let expected_responses = commands
.iter()
.fold(0, |count, cmd| count + cmd.response.expected_response_frames());

let mut response = ResponseKind::new_buffer_with_size(expected_responses, tx);
response.set_error_early(error_early);

let commands: Vec<RedisCommand> = commands
.into_iter()
.enumerate()
.map(|(idx, mut cmd)| {
cmd.response = response.duplicate().unwrap_or(ResponseKind::Skip);
cmd.response.set_expected_index(idx);
cmd
})
.collect();
let command = RouterCommand::Pipeline { commands };

(command, rx)
}

/// Send a series of commands in a [pipeline](https://redis.io/docs/manual/pipelining/).
pub struct Pipeline<C: ClientLike> {
Expand Down Expand Up @@ -127,7 +154,6 @@ impl<C: ClientLike> Pipeline<C> {
///
/// ```rust no_run
/// # use fred::prelude::*;
///
/// async fn example(client: &RedisClient) -> Result<(), RedisError> {
/// let _ = client.mset(vec![("foo", 1), ("bar", 2)]).await?;
///
Expand All @@ -148,11 +174,43 @@ impl<C: ClientLike> Pipeline<C> {
send_all(self.client.inner(), commands).await?.convert()
}

/// Send the pipeline and respond with only the result of the last command.
/// Send the pipeline and respond with each individual result.
///
/// Note: use `RedisValue` as the return type (and [convert](crate::types::RedisValue::convert) as needed) to
/// support an array of different return types.
///
/// ```rust no_run
/// # use fred::prelude::*;
/// async fn example(client: &RedisClient) -> Result<(), RedisError> {
/// let _ = client.mset(vec![("foo", 1), ("bar", 2)]).await?;
///
/// let pipeline = client.pipeline();
/// let _: () = pipeline.get("foo").await?;
/// let _: () = pipeline.hgetall("bar").await?; // this will error since `bar` is an integer
///
/// let results = pipeline.try_all::<RedisValue>().await; // note the lack of `?`
/// assert!(results[0].unwrap().convert::<i64>(), 1);
/// assert!(results[1].is_err());
///
/// Ok(())
/// }
/// ```
pub async fn try_all<R>(self) -> Vec<RedisResult<R>>
where
R: FromRedis,
{
let commands = { self.commands.lock().drain(..).collect() };
try_send_all(self.client.inner(), commands)
.await
.into_iter()
.map(|v| v.and_then(|v| v.convert()))
.collect()
}

/// Send the pipeline and respond with only the result of the last command.
///
/// ```rust no_run
/// # use fred::prelude::*;
/// async fn example(client: &RedisClient) -> Result<(), RedisError> {
/// let pipeline = client.pipeline();
/// let _: () = pipeline.incr("foo").await?; // returns when the command is queued in memory
Expand All @@ -172,28 +230,42 @@ impl<C: ClientLike> Pipeline<C> {
}
}

async fn send_all(inner: &Arc<RedisClientInner>, commands: VecDeque<RedisCommand>) -> Result<RedisValue, RedisError> {
async fn try_send_all(
inner: &Arc<RedisClientInner>,
commands: VecDeque<RedisCommand>,
) -> Vec<Result<RedisValue, RedisError>> {
if commands.is_empty() {
return Ok(RedisValue::Array(Vec::new()));
return Vec::new();
}

let (tx, rx) = oneshot_channel();
let expected_responses = commands
.iter()
.fold(0, |count, cmd| count + cmd.response.expected_response_frames());
let (command, rx) = prepare_all_commands(commands, false);
if let Err(e) = interfaces::send_to_router(inner, command) {
return vec![Err(e)];
};
let frame = match utils::apply_timeout(rx, inner.default_command_timeout()).await {
Ok(result) => match result {
Ok(f) => f,
Err(e) => return vec![Err(e)],
},
Err(e) => return vec![Err(e)],
};

let response = ResponseKind::new_buffer_with_size(expected_responses, tx);
let commands: Vec<RedisCommand> = commands
.into_iter()
.enumerate()
.map(|(idx, mut cmd)| {
cmd.response = response.duplicate().unwrap_or(ResponseKind::Skip);
cmd.response.set_expected_index(idx);
cmd
})
.collect();
let command = RouterCommand::Pipeline { commands };
if let Resp3Frame::Array { data, .. } = frame {
data
.into_iter()
.map(|frame| protocol_utils::frame_to_results(frame))
.collect()
} else {
vec![protocol_utils::frame_to_results_raw(frame)]
}
}

async fn send_all(inner: &Arc<RedisClientInner>, commands: VecDeque<RedisCommand>) -> Result<RedisValue, RedisError> {
if commands.is_empty() {
return Ok(RedisValue::Array(Vec::new()));
}

let (command, rx) = prepare_all_commands(commands, true);
let _ = interfaces::send_to_router(inner, command)?;
let frame = utils::apply_timeout(rx, inner.default_command_timeout()).await??;
protocol_utils::frame_to_results_raw(frame)
Expand Down
9 changes: 7 additions & 2 deletions src/commands/impls/hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ pub async fn hget<C: ClientLike>(client: &C, key: RedisKey, field: RedisKey) ->

pub async fn hgetall<C: ClientLike>(client: &C, key: RedisKey) -> Result<RedisValue, RedisError> {
let frame = utils::request_response(client, move || Ok((RedisCommandKind::HGetAll, vec![key.into()]))).await?;
Ok(RedisValue::Map(protocol_utils::frame_to_map(frame)?))

if protocol_utils::frame_is_queued(&frame) {
protocol_utils::frame_to_results(frame)
} else {
Ok(RedisValue::Map(protocol_utils::frame_to_map(frame)?))
}
}

pub async fn hincrby<C: ClientLike>(
Expand Down Expand Up @@ -151,7 +156,7 @@ pub async fn hrandfield<C: ClientLike>(
.await?;

if has_count {
if has_values {
if has_values && !protocol_utils::frame_is_queued(&frame) {
let frame = protocol_utils::flatten_frame(frame);
protocol_utils::frame_to_map(frame).map(|m| RedisValue::Map(m))
} else {
Expand Down
71 changes: 71 additions & 0 deletions src/commands/impls/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,74 @@ pub async fn sunsubscribe<C: ClientLike>(client: &C, channels: MultipleStrings)
let _ = rx.await??;
Ok(RedisValue::Null)
}

pub async fn pubsub_channels<C: ClientLike>(client: &C, pattern: Str) -> Result<RedisValue, RedisError> {
let frame = utils::request_response(client, || {
let args = if pattern.is_empty() {
vec![]
} else {
vec![pattern.into()]
};

let mut command: RedisCommand = RedisCommand::new(RedisCommandKind::PubsubChannels, args);
cluster_hash_legacy_command(client, &mut command);

Ok(command)
})
.await?;

protocol_utils::frame_to_results_raw(frame)
}

pub async fn pubsub_numpat<C: ClientLike>(client: &C) -> Result<RedisValue, RedisError> {
let frame = utils::request_response(client, || {
let mut command: RedisCommand = RedisCommand::new(RedisCommandKind::PubsubNumpat, vec![]);
cluster_hash_legacy_command(client, &mut command);

Ok(command)
})
.await?;

protocol_utils::frame_to_results(frame)
}

pub async fn pubsub_numsub<C: ClientLike>(client: &C, channels: MultipleStrings) -> Result<RedisValue, RedisError> {
let frame = utils::request_response(client, || {
let args: Vec<RedisValue> = channels.inner().into_iter().map(|s| s.into()).collect();
let mut command: RedisCommand = RedisCommand::new(RedisCommandKind::PubsubNumsub, args);
cluster_hash_legacy_command(client, &mut command);

Ok(command)
})
.await?;

protocol_utils::frame_to_results_raw(frame)
}

pub async fn pubsub_shardchannels<C: ClientLike>(client: &C, pattern: Str) -> Result<RedisValue, RedisError> {
let frame = utils::request_response(client, || {
Ok((RedisCommandKind::PubsubShardchannels, vec![pattern.into()]))
})
.await?;

protocol_utils::frame_to_results_raw(frame)
}

pub async fn pubsub_shardnumsub<C: ClientLike>(
client: &C,
channels: MultipleStrings,
) -> Result<RedisValue, RedisError> {
let frame = utils::request_response(client, || {
let args: Vec<RedisValue> = channels.inner().into_iter().map(|s| s.into()).collect();
let has_args = args.len() > 0;
let mut command: RedisCommand = RedisCommand::new(RedisCommandKind::PubsubShardnumsub, args);
if !has_args {
cluster_hash_legacy_command(client, &mut command);
}

Ok(command)
})
.await?;

protocol_utils::frame_to_results_raw(frame)
}
2 changes: 2 additions & 0 deletions src/commands/impls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub async fn quit<C: ClientLike>(client: &C) -> Result<(), RedisError> {
let _ = rx.await??;
utils::abort_network_timeout_task(&inner);
inner.notifications.close_public_receivers();
inner.backchannel.write().await.check_and_disconnect(&inner, None).await;

Ok(())
}
Expand Down Expand Up @@ -64,6 +65,7 @@ pub async fn shutdown<C: ClientLike>(client: &C, flags: Option<ShutdownFlags>) -
let _ = rx.await??;
utils::abort_network_timeout_task(&inner);
inner.notifications.close_public_receivers();
inner.backchannel.write().await.check_and_disconnect(&inner, None).await;

Ok(())
}
Expand Down
58 changes: 57 additions & 1 deletion src/commands/interfaces/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,61 @@ pub trait PubsubInterface: ClientLike + Sized {
commands::pubsub::spublish(self, channel, message).await?.convert()
}

// TODO pubsub channels, pubsub numpat, pubsub numsub, pubsub shardchannels, pubsub shardnumsub
/// Lists the currently active channels.
///
/// <https://redis.io/commands/pubsub-channels/>
async fn pubsub_channels<R, S>(&self, pattern: S) -> RedisResult<R>
where
R: FromRedis,
S: Into<Str> + Send,
{
into!(pattern);
commands::pubsub::pubsub_channels(self, pattern).await?.convert()
}

/// Returns the number of unique patterns that are subscribed to by clients.
///
/// <https://redis.io/commands/pubsub-numpat/>
async fn pubsub_numpat<R>(&self) -> RedisResult<R>
where
R: FromRedis,
{
commands::pubsub::pubsub_numpat(self).await?.convert()
}

/// Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
///
/// <https://redis.io/commands/pubsub-numsub/>
async fn pubsub_numsub<R, S>(&self, channels: S) -> RedisResult<R>
where
R: FromRedis,
S: Into<MultipleStrings> + Send,
{
into!(channels);
commands::pubsub::pubsub_numsub(self, channels).await?.convert()
}

/// Lists the currently active shard channels.
///
/// <https://redis.io/commands/pubsub-shardchannels/>
async fn pubsub_shardchannels<R, S>(&self, pattern: S) -> RedisResult<R>
where
R: FromRedis,
S: Into<Str> + Send,
{
into!(pattern);
commands::pubsub::pubsub_shardchannels(self, pattern).await?.convert()
}

/// Returns the number of subscribers for the specified shard channels.
///
/// <https://redis.io/commands/pubsub-shardnumsub/>
async fn pubsub_shardnumsub<R, S>(&self, channels: S) -> RedisResult<R>
where
R: FromRedis,
S: Into<MultipleStrings> + Send,
{
into!(channels);
commands::pubsub::pubsub_shardnumsub(self, channels).await?.convert()
}
}
Loading

0 comments on commit c388b8c

Please sign in to comment.