-
Notifications
You must be signed in to change notification settings - Fork 64
/
replica.rs
143 lines (132 loc) · 5.17 KB
/
replica.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use crate::{
clients::{Client, Pipeline},
error::Error,
interfaces::{self, *},
modules::inner::ClientInner,
protocol::command::{Command, RouterCommand},
runtime::{oneshot_channel, RefCount},
types::config::Server,
};
use std::{collections::HashMap, fmt, fmt::Formatter};
/// A struct for interacting with cluster replica nodes.
///
/// All commands sent via this interface will use a replica node, if possible. The underlying connections are shared
/// with the main client in order to maintain an up-to-date view of the system in the event that replicas change or
/// are promoted. The cached replica routing table will be updated on the client when following cluster redirections
/// or when any connection closes.
///
/// [Redis replication is asynchronous](https://redis.io/docs/management/replication/).
#[derive(Clone)]
#[cfg_attr(docsrs, doc(cfg(feature = "replicas")))]
pub struct Replicas<C: ClientLike> {
pub(crate) client: C,
}
impl<C: ClientLike> fmt::Debug for Replicas<C> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Replicas").field("id", &self.client.inner().id).finish()
}
}
#[doc(hidden)]
impl From<&RefCount<ClientInner>> for Replicas<Client> {
fn from(inner: &RefCount<ClientInner>) -> Self {
Replicas {
client: Client::from(inner),
}
}
}
impl<C: ClientLike> ClientLike for Replicas<C> {
#[doc(hidden)]
fn inner(&self) -> &RefCount<ClientInner> {
self.client.inner()
}
#[doc(hidden)]
fn change_command(&self, command: &mut Command) {
command.use_replica = true;
self.client.change_command(command);
}
#[doc(hidden)]
fn send_command<T>(&self, command: T) -> Result<(), Error>
where
T: Into<Command>,
{
self.client.send_command(command)
}
}
#[cfg(feature = "i-redis-json")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-redis-json")))]
impl<C: ClientLike> RedisJsonInterface for Replicas<C> {}
#[cfg(feature = "i-time-series")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-time-series")))]
impl<C: ClientLike> TimeSeriesInterface for Replicas<C> {}
#[cfg(feature = "i-cluster")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-cluster")))]
impl<C: ClientLike> ClusterInterface for Replicas<C> {}
#[cfg(feature = "i-config")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-config")))]
impl<C: ClientLike> ConfigInterface for Replicas<C> {}
#[cfg(feature = "i-geo")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-geo")))]
impl<C: ClientLike> GeoInterface for Replicas<C> {}
#[cfg(feature = "i-hashes")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-hashes")))]
impl<C: ClientLike> HashesInterface for Replicas<C> {}
#[cfg(feature = "i-hyperloglog")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-hyperloglog")))]
impl<C: ClientLike> HyperloglogInterface for Replicas<C> {}
#[cfg(feature = "i-keys")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-keys")))]
impl<C: ClientLike> KeysInterface for Replicas<C> {}
#[cfg(feature = "i-scripts")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-scripts")))]
impl<C: ClientLike> LuaInterface for Replicas<C> {}
#[cfg(feature = "i-lists")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-lists")))]
impl<C: ClientLike> ListInterface for Replicas<C> {}
#[cfg(feature = "i-memory")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-memory")))]
impl<C: ClientLike> MemoryInterface for Replicas<C> {}
#[cfg(feature = "i-server")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-server")))]
impl<C: ClientLike> ServerInterface for Replicas<C> {}
#[cfg(feature = "i-slowlog")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-slowlog")))]
impl<C: ClientLike> SlowlogInterface for Replicas<C> {}
#[cfg(feature = "i-sets")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-sets")))]
impl<C: ClientLike> SetsInterface for Replicas<C> {}
#[cfg(feature = "i-sorted-sets")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-sorted-sets")))]
impl<C: ClientLike> SortedSetsInterface for Replicas<C> {}
#[cfg(feature = "i-streams")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-streams")))]
impl<C: ClientLike> StreamsInterface for Replicas<C> {}
#[cfg(feature = "i-scripts")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-scripts")))]
impl<C: ClientLike> FunctionInterface for Replicas<C> {}
#[cfg(feature = "i-redisearch")]
#[cfg_attr(docsrs, doc(cfg(feature = "i-redisearch")))]
impl<C: ClientLike> RediSearchInterface for Replicas<C> {}
impl<C: ClientLike> Replicas<C> {
/// Read a mapping of replica server IDs to primary server IDs.
pub fn nodes(&self) -> HashMap<Server, Server> {
self.client.inner().server_state.read().replicas.clone()
}
/// Send a series of commands in a [pipeline](https://redis.io/docs/manual/pipelining/).
pub fn pipeline(&self) -> Pipeline<Replicas<C>> {
Pipeline::from(self.clone())
}
/// Read the underlying [RedisClient](crate::clients::Client) that interacts with primary nodes.
pub fn client(&self) -> Client {
Client::from(self.client.inner())
}
/// Sync the cached replica routing table with the server(s).
///
/// If `reset: true` the client will forcefully disconnect from replicas even if the connections could otherwise be
/// reused.
pub async fn sync(&self, reset: bool) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
let cmd = RouterCommand::SyncReplicas { tx, reset };
interfaces::send_to_router(self.client.inner(), cmd)?;
rx.await?
}
}