-
Notifications
You must be signed in to change notification settings - Fork 64
/
cluster.rs
169 lines (135 loc) · 5.26 KB
/
cluster.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use super::*;
use crate::{
interfaces,
protocol::{
command::{CommandKind, RouterCommand},
utils as protocol_utils,
},
runtime::oneshot_channel,
types::{cluster::*, Key, MultipleHashSlots},
utils,
};
use bytes_utils::Str;
use std::convert::TryInto;
value_cmd!(cluster_bumpepoch, ClusterBumpEpoch);
ok_cmd!(cluster_flushslots, ClusterFlushSlots);
value_cmd!(cluster_myid, ClusterMyID);
value_cmd!(cluster_nodes, ClusterNodes);
ok_cmd!(cluster_saveconfig, ClusterSaveConfig);
values_cmd!(cluster_slots, ClusterSlots);
pub async fn cluster_info<C: ClientLike>(client: &C) -> Result<Value, Error> {
let frame = utils::request_response(client, || Ok((CommandKind::ClusterInfo, vec![]))).await?;
protocol_utils::frame_to_results(frame)
}
pub async fn cluster_add_slots<C: ClientLike>(client: &C, slots: MultipleHashSlots) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
let mut args = Vec::with_capacity(slots.len());
for slot in slots.inner().into_iter() {
args.push(slot.into());
}
Ok((CommandKind::ClusterAddSlots, args))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
pub async fn cluster_count_failure_reports<C: ClientLike>(client: &C, node_id: Str) -> Result<Value, Error> {
let frame = utils::request_response(client, move || {
Ok((CommandKind::ClusterCountFailureReports, vec![node_id.into()]))
})
.await?;
protocol_utils::frame_to_results(frame)
}
pub async fn cluster_count_keys_in_slot<C: ClientLike>(client: &C, slot: u16) -> Result<Value, Error> {
let frame = utils::request_response(client, move || {
Ok((CommandKind::ClusterCountKeysInSlot, vec![slot.into()]))
})
.await?;
protocol_utils::frame_to_results(frame)
}
pub async fn cluster_del_slots<C: ClientLike>(client: &C, slots: MultipleHashSlots) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
let mut args = Vec::with_capacity(slots.len());
for slot in slots.inner().into_iter() {
args.push(slot.into());
}
Ok((CommandKind::ClusterDelSlots, args))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
pub async fn cluster_failover<C: ClientLike>(client: &C, flag: Option<ClusterFailoverFlag>) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
let args = if let Some(flag) = flag {
vec![flag.to_str().into()]
} else {
Vec::new()
};
Ok((CommandKind::ClusterFailOver, args))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
pub async fn cluster_forget<C: ClientLike>(client: &C, node_id: Str) -> Result<(), Error> {
one_arg_ok_cmd(client, CommandKind::ClusterForget, node_id.into()).await
}
pub async fn cluster_get_keys_in_slot<C: ClientLike>(client: &C, slot: u16, count: u64) -> Result<Value, Error> {
let count: Value = count.try_into()?;
let frame = utils::request_response(client, move || {
let args = vec![slot.into(), count];
Ok((CommandKind::ClusterGetKeysInSlot, args))
})
.await?;
protocol_utils::frame_to_results(frame)
}
pub async fn cluster_keyslot<C: ClientLike>(client: &C, key: Key) -> Result<Value, Error> {
one_arg_value_cmd(client, CommandKind::ClusterKeySlot, key.into()).await
}
pub async fn cluster_meet<C: ClientLike>(client: &C, ip: Str, port: u16) -> Result<(), Error> {
args_ok_cmd(client, CommandKind::ClusterMeet, vec![ip.into(), port.into()]).await
}
pub async fn cluster_replicate<C: ClientLike>(client: &C, node_id: Str) -> Result<(), Error> {
one_arg_ok_cmd(client, CommandKind::ClusterReplicate, node_id.into()).await
}
pub async fn cluster_replicas<C: ClientLike>(client: &C, node_id: Str) -> Result<Value, Error> {
one_arg_value_cmd(client, CommandKind::ClusterReplicas, node_id.into()).await
}
pub async fn cluster_reset<C: ClientLike>(client: &C, mode: Option<ClusterResetFlag>) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
let mut args = Vec::with_capacity(1);
if let Some(flag) = mode {
args.push(flag.to_str().into());
}
Ok((CommandKind::ClusterReset, args))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
pub async fn cluster_set_config_epoch<C: ClientLike>(client: &C, epoch: u64) -> Result<(), Error> {
let epoch: Value = epoch.try_into()?;
one_arg_ok_cmd(client, CommandKind::ClusterSetConfigEpoch, epoch).await
}
pub async fn cluster_setslot<C: ClientLike>(client: &C, slot: u16, state: ClusterSetSlotState) -> Result<(), Error> {
let frame = utils::request_response(client, move || {
let mut args = Vec::with_capacity(3);
args.push(slot.into());
let (state, arg) = state.to_str();
args.push(state.into());
if let Some(arg) = arg {
args.push(arg.into());
}
Ok((CommandKind::ClusterSetSlot, args))
})
.await?;
let response = protocol_utils::frame_to_results(frame)?;
protocol_utils::expect_ok(&response)
}
pub async fn sync_cluster<C: ClientLike>(client: &C) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
let command = RouterCommand::SyncCluster { tx };
interfaces::send_to_router(client.inner(), command)?;
rx.await?
}