Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 26, 2024
1 parent d6371ce commit 0e755bd
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 149 deletions.
42 changes: 22 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ crate-type = ["cdylib"]
[dependencies]
async-recursion = "1.0.5"
etcd-client = "0.10"
pyo3 = { version = "0.18", features = ["extension-module", "multiple-pymethods"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
pyo3 = { version = "0.20.2", features = ["extension-module", "multiple-pymethods"] }
pyo3-asyncio = { version = "0.20.0", features = ["tokio-runtime"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
Expand Down
178 changes: 58 additions & 120 deletions src/communicator.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,17 @@
use async_recursion::async_recursion;
use etcd_client::Client as RustClient;
use etcd_client::{DeleteOptions, GetOptions, WatchOptions};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3_asyncio::tokio::future_into_py;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use url::form_urlencoded;

use crate::error::Error;
use crate::utils::nested_hashmap::{
convert_pydict_to_nested_map, insert_into_map, put_recursive, NestedHashMap,
};
use crate::Watch;

type NestedHashMap = HashMap<String, NestedValue>;

#[derive(Clone)]
enum NestedValue {
StringValue(String),
MapValue(NestedHashMap),
}

fn convert_pydict_to_nested_map(py: Python, py_dict: &PyDict) -> PyResult<NestedHashMap> {
let mut map = NestedHashMap::new();
for (key, value) in py_dict.iter() {
let key: String = key.extract::<String>()?;

if let Ok(inner_dict) = value.downcast::<PyDict>() {
map.insert(
key,
NestedValue::MapValue(convert_pydict_to_nested_map(py, inner_dict)?),
);
} else if let Ok(val_str) = value.extract::<String>() {
map.insert(key, NestedValue::StringValue(val_str));
}
}
Ok(map)
}

fn encode_string(input: &str) -> String {
form_urlencoded::byte_serialize(input.as_bytes()).collect()
}

#[async_recursion]
async fn put_recursive(
client: Arc<Mutex<RustClient>>,
prefix: &str,
dict: &NestedHashMap,
) -> Result<(), Error> {
for (key, value) in dict {
match value {
NestedValue::StringValue(val_str) => {
let mut client = client.lock().await;

let full_key = if key.is_empty() {
prefix.to_owned()
} else {
format!("{}/{}", prefix, encode_string(key))
};

client.put(full_key, val_str.clone(), None).await;
}
NestedValue::MapValue(map) => {
put_recursive(
client.clone(),
&format!("{}/{}", prefix, encode_string(key)),
map,
)
.await?;
}
}
}
Ok(())
}

#[pyclass]
pub struct Communicator(pub Arc<Mutex<RustClient>>);

Expand All @@ -96,67 +35,27 @@ impl Communicator {
})
}

fn get_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = GetOptions::new().with_prefix();
let result = client.get(key.clone(), Some(options)).await;
result
.map(|response| {
let mut result = HashMap::new();
let kvs = response.kvs();
for kv in kvs {
let key = String::from_utf8(kv.key().to_owned())
.unwrap()
.strip_prefix(&key)
.unwrap()
.to_owned();
let value = String::from_utf8(kv.value().to_owned()).unwrap();
result.insert(key, value);
}
result
})
.map_err(|e| Error(e).into())
})
}

fn keys_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
fn get_prefix<'a>(&'a self, py: Python<'a>, prefix: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = GetOptions::new().with_prefix();
let result = client.get(key, Some(options)).await;
result
.map(|response| {
let mut result = Vec::new();
let kvs = response.kvs();
for kv in kvs {
let key = String::from_utf8(kv.key().to_owned()).unwrap();
result.push(key);
}
result
})
.map_err(|e| Error(e).into())
})
}

fn delete<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.delete(key, None).await;
result.map(|_| ()).map_err(|e| Error(e).into())
})
}
let response = client.get(prefix.clone(), Some(options)).await.unwrap();

let mut result = NestedHashMap::new();
for kv in response.kvs() {
let key = String::from_utf8(kv.key().to_owned())
.unwrap()
.strip_prefix(&format!("{}/", prefix))
.unwrap()
.to_owned();

let value = String::from_utf8(kv.value().to_owned()).unwrap();
let parts: Vec<&str> = key.split('/').collect();
insert_into_map(&mut result, &parts, value);
}

fn delete_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = DeleteOptions::new().with_prefix();
let result = client.delete(key, Some(options)).await;
result.map(|_| ()).map_err(|e| Error(e).into())
Ok(result)
})
}

Expand Down Expand Up @@ -187,6 +86,25 @@ impl Communicator {
})
}

fn delete<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.delete(key, None).await;
result.map(|_| ()).map_err(|e| Error(e).into())
})
}

fn delete_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = DeleteOptions::new().with_prefix();
let result = client.delete(key, Some(options)).await;
result.map(|_| ()).map_err(|e| Error(e).into())
})
}

// fn replace<'a>(&'a self, py: Python<'a>, key: String, initial_val: String, new_val: String) -> PyResult<&'a PyAny> {
// let client = self.0.clone();

Expand Down Expand Up @@ -215,6 +133,26 @@ impl Communicator {
// })
// }

fn keys_prefix<'a>(&'a self, py: Python<'a>, key: String) -> PyResult<&'a PyAny> {
let client = self.0.clone();
future_into_py(py, async move {
let mut client = client.lock().await;
let options = GetOptions::new().with_prefix();
let result = client.get(key, Some(options)).await;
result
.map(|response| {
let mut result = Vec::new();
let kvs = response.kvs();
for kv in kvs {
let key = String::from_utf8(kv.key().to_owned()).unwrap();
result.push(key);
}
result
})
.map_err(|e| Error(e).into())
})
}

fn watch(&self, key: String) -> Watch {
let client = self.0.clone();
Watch::new(client, key, None)
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod communicator;
mod error;
mod event;
mod stream;
mod utils;
mod watch;

use client::Client;
Expand Down
2 changes: 2 additions & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod nested_hashmap;
pub mod url;
Loading

0 comments on commit 0e755bd

Please sign in to comment.