"Parse Error: Cannot convert to map." when using xread_map #319
-
I'm trying to understand why an xread_map operation fails with a "Parse Error: Cannot convert to map.". I'm using something like: let entries: XReadResponse<String, String, String, RedisValue> = xread_map(None, Some(1000), keys, ids).await?; where keys and ids are two vectors (with 1 entry if that matters, but there could be many). If I do the XREAD operations using redis-cli, I get a "CROSSSLOT Keys in request don't hash to the same slot" which I guess is related to the fact I'm using a cluster, but I don't know if this is related to the reason why my code is failing, and if so, how come the streams example does not fail? does this ring any bells to anybody? any advise on how to troubleshoot this further? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 8 replies
-
Hey @hmacias-avaya, so the issue is probably related to the CROSSSLOT error you got from redis-cli. That comes from the fact that the keys you're passing to There's a utility function that can make this easier, but you'll end up having to send multiple commands and join the responses together at the end. Something like this is probably what you want: let keys = vec!["foo", "bar", "baz"]; // etc
let groups = fred::util::group_by_hash_slot(keys)?;
let mut results = Vec::with_capacity(groups.len());
for (_, keys) in groups.into_iter() {
results.push(client.xread_map(None, Some(1000), keys, ids).await?);
}
// at this point you'd have a `Vec<XReadResponse<...>>`
// then join together the results somehow, depending on whether you'd expect key collisions, etc In your case it may be slightly more complicated if the IDs are specific to the keys in each group. But ultimately the issue is that each key may be located on a separate cluster node, so the server requires a separate command/query for them. The |
Beta Was this translation helpful? Give feedback.
-
For what it's worth, here's probably how I'd do this. Unfortunately this is probably the most complicated use case for multi-key cluster commands since there's some complexity around managing the mapping between input keys and input IDs. async fn xread_map_cluster<I: FromValue, K2: FromKey + Hash + Eq, V: FromValue>(
client: &Client,
keys: &HashMap<Key, XID>,
count: Option<u64>,
block: Option<u64>,
) -> Result<XReadResponse<Key, I, K2, V>, Error> {
let mut out = HashMap::with_capacity(keys.len());
// group_by_hash_slot() probably isn't that useful here since XREAD takes both a set of keys and a set of associated
// IDs, and group_by_hash_slot doesn't preserve the mapping between these inputs. it was intended more for things
// like MGET, SSUBSCRIBE, etc
for (key, id) in keys.iter() {
// only one key will be returned per command so we can use into_iter().next() to read it. but you probably want to
// do something smarter than unwrap() here if the key is not found.
let (key, results) = client
.xread_map::<Key, I, K2, V, _, _>(count, block, key, id)
.await?
.into_iter()
.next()
.unwrap();
out.insert(key, results);
}
Ok(out)
} |
Beta Was this translation helpful? Give feedback.
They share the same connection(s) under the hood, so it depends on a few things. Generally speaking I'd recommend using entirely separate clients/connections if one of them is expected to block though. In your case what will happen is the XADD commands will just wait until the XREAD call finishes or times out, then all the queued XADD commands will run, then it'll do the next XREAD call, etc. In most cases that's not what callers want, so usually I'd recommend using separate connections. The
clone_new
function can be used to create a new client that will use new connections if you need that - just don't forget toinit
the new client after calling that.For what it's worth, this kind of th…