Skip to content

Commit

Permalink
10.0.3 (#324)
Browse files Browse the repository at this point in the history
* fix: support scan functions in mocks
  • Loading branch information
aembke authored Dec 29, 2024
1 parent bdfe4f8 commit 32a5e07
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 10.0.3

* Support SCAN functions in mocks

## 10.0.2

* Fix intermittent transaction timeouts
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ name = "fred"
readme = "README.md"
repository = "https://github.com/aembke/fred.rs"
rust-version = "1.75"
version = "10.0.2"
version = "10.0.3"

[package.metadata.docs.rs]
# do not show the glommio version of the docs
Expand Down Expand Up @@ -51,7 +51,7 @@ custom-reconnect-errors = []
default-nil-types = []
dns = ["hickory-resolver"]
metrics = []
mocks = []
mocks = ["glob-match"]
monitor = ["nom"]
replicas = []
sentinel-auth = []
Expand Down Expand Up @@ -186,6 +186,7 @@ sha-1 = { version = "0.10", optional = true }
socket2 = "0.5"
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }
glob-match = { version = "0.2.1", optional = true }
url = "2.4"
urlencoding = "2.1"
# DNS Features
Expand Down
2 changes: 1 addition & 1 deletion examples/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use fred::{
prelude::*,
types::{Library, Script},
types::scripts::{Library, Script},
util as fred_utils,
};

Expand Down
90 changes: 89 additions & 1 deletion src/modules/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
};
use bytes_utils::Str;
use fred_macros::rm_send_if;
use glob_match::glob_match;
use std::{
collections::{HashMap, VecDeque},
fmt::Debug,
Expand Down Expand Up @@ -206,6 +207,40 @@ impl SimpleMap {

Ok(count.into())
}

/// Perform a `SCAN` operation, returning all matching keys in one page.
pub fn scan(&self, args: Vec<Value>) -> Result<Value, Error> {
let match_idx = args.iter().enumerate().find_map(|(i, a)| {
if let Some("MATCH") = a.as_str().as_ref().map(|s| s.as_ref()) {
Some(i + 1)
} else {
None
}
});
let pattern = match_idx.and_then(|i| args[i].as_string());

let keys = self
.values
.lock()
.keys()
.filter_map(|k| {
if let Some(pattern) = pattern.as_ref() {
if let Some(_k) = k.as_str() {
if glob_match(pattern, _k) {
k.as_bytes_str().map(Value::String)
} else {
None
}
} else {
None
}
} else {
k.as_bytes_str().map(Value::String)
}
})
.collect();
Ok(Value::Array(vec![Value::from_static_str("0"), Value::Array(keys)]))
}
}

impl Mocks for SimpleMap {
Expand All @@ -214,6 +249,7 @@ impl Mocks for SimpleMap {
"GET" => self.get(command.args),
"SET" => self.set(command.args),
"DEL" => self.del(command.args),
"SCAN" => self.scan(command.args),
_ => Err(Error::new(ErrorKind::Unknown, "Unimplemented.")),
}
}
Expand Down Expand Up @@ -329,9 +365,10 @@ mod tests {
mocks::{Buffer, Echo, Mocks, SimpleMap},
prelude::Expiration,
runtime::JoinHandle,
types::{config::Config, SetOptions, Value},
types::{config::Config, scan::Scanner, SetOptions, Value},
};
use std::sync::Arc;
use tokio_stream::StreamExt;

async fn create_mock_client(mocks: Arc<dyn Mocks>) -> (Client, JoinHandle<Result<(), Error>>) {
let config = Config {
Expand Down Expand Up @@ -421,4 +458,55 @@ mod tests {
let last: Vec<String> = pipeline.last().await.unwrap();
assert_eq!(last, vec!["bar"]);
}

#[tokio::test]
async fn should_mock_scans() {
let (client, _) = create_mock_client(Arc::new(SimpleMap::new())).await;
client
.set::<(), _, _>("foo1", "bar1", None, None, false)
.await
.expect("Failed to call SET");
client
.set::<(), _, _>("foo2", "bar2", None, None, false)
.await
.expect("Failed to call SET");
let mut all: Vec<String> = Vec::new();
let mut scan_stream = client.scan("foo*", Some(10), None);
while let Some(mut page) = scan_stream.try_next().await.expect("failed to call try_next") {
if let Some(keys) = page.take_results() {
all.append(
&mut keys
.into_iter()
.filter_map(|v| v.as_str().map(|v| v.to_string()))
.collect(),
);
}
page.next();
}
all.sort();
assert_eq!(all, vec!["foo1".to_string(), "foo2".to_string()]);
}

#[tokio::test]
async fn should_mock_scans_buffered() {
let (client, _) = create_mock_client(Arc::new(SimpleMap::new())).await;
client
.set::<(), _, _>("foo1", "bar1", None, None, false)
.await
.expect("Failed to call SET");
client
.set::<(), _, _>("foo2", "bar2", None, None, false)
.await
.expect("Failed to call SET");

let mut keys: Vec<String> = client
.scan_buffered("foo*", Some(10), None)
.map(|k| k.map(|k| k.into_string().unwrap()))
.collect::<Result<Vec<String>, Error>>()
.await
.unwrap();
keys.sort();

assert_eq!(keys, vec!["foo1".to_string(), "foo2".to_string()]);
}
}
34 changes: 32 additions & 2 deletions src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ use std::{
time::{Duration, Instant},
};

#[cfg(feature = "mocks")]
use crate::modules::mocks::MockCommand;
#[cfg(any(feature = "full-tracing", feature = "partial-tracing"))]
use crate::trace::CommandTraces;
#[cfg(feature = "mocks")]
use crate::{
modules::mocks::MockCommand,
protocol::types::ValueScanResult,
runtime::Sender,
types::scan::ScanResult,
types::Key,
};

#[cfg(feature = "debug-ids")]
static COMMAND_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
Expand Down Expand Up @@ -1911,6 +1917,30 @@ impl Command {
}
}

#[cfg(feature = "mocks")]
pub fn take_key_scan_tx(&mut self) -> Option<Sender<Result<ScanResult, Error>>> {
match mem::replace(&mut self.response, ResponseKind::Skip) {
ResponseKind::KeyScan(inner) => Some(inner.tx),
_ => None,
}
}

#[cfg(feature = "mocks")]
pub fn take_key_scan_buffered_tx(&mut self) -> Option<Sender<Result<Key, Error>>> {
match mem::replace(&mut self.response, ResponseKind::Skip) {
ResponseKind::KeyScanBuffered(inner) => Some(inner.tx),
_ => None,
}
}

#[cfg(feature = "mocks")]
pub fn take_value_scan_tx(&mut self) -> Option<Sender<Result<ValueScanResult, Error>>> {
match mem::replace(&mut self.response, ResponseKind::Skip) {
ResponseKind::ValueScan(inner) => Some(inner.tx),
_ => None,
}
}

/// Whether the command has a channel for sending responses to the caller.
pub fn has_response_tx(&self) -> bool {
match self.response {
Expand Down
82 changes: 78 additions & 4 deletions src/router/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,60 @@ mod mocking {
use super::*;
use crate::{
modules::mocks::Mocks,
protocol::{responders::ResponseKind, utils as protocol_utils},
protocol::{
responders,
responders::ResponseKind,
types::{KeyScanBufferedInner, KeyScanInner, ValueScanInner},
utils as protocol_utils,
},
};
use redis_protocol::resp3::types::BytesFrame;
use std::sync::Arc;

fn mock_host(inner: &RefCount<ClientInner>) -> Server {
inner
.config
.server
.hosts()
.first()
.cloned()
.unwrap_or(Server::from(("localhost", 6379)))
}

fn key_scanner(command: &mut Command) -> KeyScanInner {
KeyScanInner {
args: command.args().to_vec(),
hash_slot: None,
server: None,
cursor_idx: 0,
tx: command.take_key_scan_tx().unwrap(),
}
}

fn buffered_key_scanner(command: &mut Command) -> KeyScanBufferedInner {
KeyScanBufferedInner {
args: command.args().to_vec(),
hash_slot: None,
server: None,
cursor_idx: 0,
tx: command.take_key_scan_buffered_tx().unwrap(),
}
}

fn value_scanner(command: &mut Command) -> ValueScanInner {
ValueScanInner {
args: command.args().to_vec(),
cursor_idx: 0,
tx: command.take_value_scan_tx().unwrap(),
}
}

/// Process any kind of router command.
pub fn process_command(mocks: &Arc<dyn Mocks>, command: RouterCommand) -> Result<(), Error> {
pub fn process_command(
inner: &RefCount<ClientInner>,
mocks: &Arc<dyn Mocks>,
command: RouterCommand,
) -> Result<(), Error> {
match command {
#[cfg(feature = "transactions")]
RouterCommand::Transaction { commands, mut tx, .. } => {
Expand Down Expand Up @@ -642,7 +689,34 @@ mod mocking {
let result = mocks
.process_command(command.to_mocked())
.map(protocol_utils::mocked_value_to_frame);
command.respond_to_caller(result);

match result {
Ok(frame) => match command.kind {
CommandKind::Scan => {
let is_buffered = matches!(command.response, ResponseKind::KeyScanBuffered(_));
let server = mock_host(inner);

if is_buffered {
let scanner = buffered_key_scanner(&mut command);
responders::respond_key_scan_buffered(inner, &server, command, scanner, frame)
.expect("Failed to respond key scan buffered");
} else {
let scanner = key_scanner(&mut command);
responders::respond_key_scan(inner, &server, command, scanner, frame)
.expect("Failed to respond key scan");
}
},
CommandKind::Sscan | CommandKind::Hscan | CommandKind::Zscan => {
let server = mock_host(inner);
let scanner = value_scanner(&mut command);

responders::respond_value_scan(inner, &server, command, scanner, frame)
.expect("Failed to respond value scan");
},
_ => command.respond_to_caller(Ok(frame)),
},
Err(err) => command.respond_to_caller(Err(err)),
};

Ok(())
},
Expand All @@ -659,7 +733,7 @@ mod mocking {
inner.counters.decr_cmd_buffer_len();

_trace!(inner, "Recv mock command: {:?}", command);
if let Err(e) = process_command(mocks, command) {
if let Err(e) = process_command(inner, mocks, command) {
// errors on this interface end the client connection task
_error!(inner, "Ending early after error processing mock command: {:?}", e);
if e.is_canceled() {
Expand Down

0 comments on commit 32a5e07

Please sign in to comment.