-
Notifications
You must be signed in to change notification settings - Fork 64
/
client.rs
132 lines (108 loc) · 3.64 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use super::*;
use crate::{
protocol::{
command::{Command, CommandKind},
utils as protocol_utils,
},
types::{client::*, ClientUnblockFlag, Key},
utils,
};
use bytes_utils::Str;
value_cmd!(client_id, ClientID);
value_cmd!(client_info, ClientInfo);
pub async fn client_kill<C: ClientLike>(client: &C, filters: Vec<ClientKillFilter>) -> Result<Value, Error> {
let frame = utils::request_response(client, move || {
let mut args = Vec::with_capacity(filters.len() * 2);
for filter in filters.into_iter() {
let (field, value) = filter.to_str();
args.push(field.into());
args.push(value.into());
}
Ok((CommandKind::ClientKill, args))
})
.await?;
protocol_utils::frame_to_results(frame)
}
pub async fn client_list<C: ClientLike>(
client: &C,
r#type: Option<ClientKillType>,
ids: Option<Vec<String>>,
) -> Result<Value, Error> {
let ids: Option<Vec<Key>> = ids.map(|ids| ids.into_iter().map(|id| id.into()).collect());
let frame = utils::request_response(client, move || {
let max_args = 2 + ids.as_ref().map(|i| i.len()).unwrap_or(0);
let mut args = Vec::with_capacity(max_args);
if let Some(kind) = r#type {
args.push(static_val!(TYPE));
args.push(kind.to_str().into());
}
if let Some(ids) = ids {
if !ids.is_empty() {
args.push(static_val!(ID));
for id in ids.into_iter() {
args.push(id.into());
}
}
}
Ok((CommandKind::ClientList, args))
})
.await?;
protocol_utils::frame_to_results(frame)
}
pub async fn client_pause<C: ClientLike>(
client: &C,
timeout: i64,
mode: Option<ClientPauseKind>,
) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
let mut args = Vec::with_capacity(2);
args.push(timeout.into());
if let Some(mode) = mode {
args.push(mode.to_str().into());
}
Ok((CommandKind::ClientPause, args))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
value_cmd!(client_getname, ClientGetName);
pub async fn client_setname<C: ClientLike>(client: &C, name: Str) -> Result<(), Error> {
let frame = utils::request_response(client, move || Ok((CommandKind::ClientSetname, vec![name.into()]))).await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
ok_cmd!(client_unpause, ClientUnpause);
pub async fn client_reply<C: ClientLike>(client: &C, flag: ClientReplyFlag) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
Ok((CommandKind::ClientReply, vec![flag.to_str().into()]))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
pub async fn client_unblock<C: ClientLike>(
client: &C,
id: Value,
flag: Option<ClientUnblockFlag>,
) -> Result<Value, Error> {
let inner = client.inner();
let mut args = Vec::with_capacity(2);
args.push(id);
if let Some(flag) = flag {
args.push(flag.to_str().into());
}
let command = Command::new(CommandKind::ClientUnblock, args);
let frame = utils::backchannel_request_response(inner, command, false).await?;
protocol_utils::frame_to_results(frame)
}
pub async fn unblock_self<C: ClientLike>(client: &C, flag: Option<ClientUnblockFlag>) -> Result<(), Error> {
let inner = client.inner();
let flag = flag.unwrap_or(ClientUnblockFlag::Error);
let result = utils::interrupt_blocked_connection(inner, flag).await;
inner.backchannel.set_unblocked();
result
}
pub async fn echo<C: ClientLike>(client: &C, message: Value) -> Result<Value, Error> {
one_arg_value_cmd(client, CommandKind::Echo, message).await
}