Skip to content

Commit

Permalink
WIP - Add etcd test harness
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 29, 2024
1 parent 47c1e6a commit d04de81
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 98 deletions.
46 changes: 20 additions & 26 deletions src/communicator.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use etcd_client::Client as EtcdClient;
use etcd_client::{Client as EtcdClient, PutOptions};
use etcd_client::{DeleteOptions, GetOptions, WatchOptions};
use pyo3::exceptions::PyException;
use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3_asyncio::tokio::future_into_py;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::condvar::PyCondVar;
use crate::error::Error;
use crate::transaction::PyTxn;
use crate::txn_response::PyTxnResponse;
use crate::utils::nested_hashmap::{
convert_pydict_to_nested_map, insert_into_map, put_recursive, NestedHashMap,
};
use crate::PyWatch;

#[pyclass(name = "Communicator")]
Expand Down Expand Up @@ -44,23 +42,19 @@ impl PyCommunicator {
future_into_py(py, async move {
let mut client = client.lock().await;
let options = GetOptions::new().with_prefix();
let response = client.get(prefix.clone(), Some(options)).await.unwrap();

let mut result = NestedHashMap::new();
for kv in response.kvs() {
let key = String::from_utf8(kv.key().to_owned())
.unwrap()
// .strip_prefix(&format!("{}/", prefix))
.strip_prefix(&prefix)
.unwrap()
.to_owned();

let value = String::from_utf8(kv.value().to_owned()).unwrap();
let parts: Vec<&str> = key.split('/').collect();
insert_into_map(&mut result, &parts, value);
}

Ok(result)
let result = client.get(prefix, Some(options)).await;
result
.map(|response| {
let mut result = HashMap::new();
let kvs = response.kvs();
for kv in kvs {
let key = String::from_utf8(kv.key().to_owned()).unwrap();
let value = String::from_utf8(kv.value().to_owned()).unwrap();
result.insert(key, value);
}
result
})
.map_err(|e| Error(e).into())
})
}

Expand All @@ -77,14 +71,14 @@ impl PyCommunicator {
&'a self,
py: Python<'a>,
prefix: String,
dict: &PyDict,
value: String,
) -> PyResult<&'a PyAny> {
let client = self.0.clone();
let dict = convert_pydict_to_nested_map(py, dict).unwrap();
future_into_py(py, async move {
let result = put_recursive(client, prefix.as_str(), &dict).await;
result.map(|_| ()).map_err(|e| Error(e)).unwrap();
Ok(())
let mut client = client.lock().await;
let options = PutOptions::new().with_prev_key();
let result = client.put(prefix, value, Some(options)).await;
result.map(|_| ()).map_err(|e| Error(e).into())
})
}

Expand Down
12 changes: 6 additions & 6 deletions src/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,31 @@ pub struct PyCompare(pub Compare);
#[pymethods]
impl PyCompare {
#[staticmethod]
fn version(key: Vec<u8>, cmp: PyCompareOp, version: i64) -> PyResult<Self> {
fn version(key: String, cmp: PyCompareOp, version: i64) -> PyResult<Self> {
Ok(PyCompare(Compare::version(key, cmp.0, version)))
}

#[staticmethod]
fn create_revision(key: Vec<u8>, cmp: PyCompareOp, revision: i64) -> PyResult<Self> {
fn create_revision(key: String, cmp: PyCompareOp, revision: i64) -> PyResult<Self> {
Ok(PyCompare(Compare::create_revision(key, cmp.0, revision)))
}

#[staticmethod]
fn mod_revision(key: Vec<u8>, cmp: PyCompareOp, revision: i64) -> PyResult<Self> {
fn mod_revision(key: String, cmp: PyCompareOp, revision: i64) -> PyResult<Self> {
Ok(PyCompare(Compare::mod_revision(key, cmp.0, revision)))
}

#[staticmethod]
fn value(key: Vec<u8>, cmp: PyCompareOp, value: Vec<u8>) -> PyResult<Self> {
fn value(key: String, cmp: PyCompareOp, value: String) -> PyResult<Self> {
Ok(PyCompare(Compare::value(key, cmp.0, value)))
}

#[staticmethod]
fn lease(key: Vec<u8>, cmp: PyCompareOp, lease: i64) -> PyResult<Self> {
fn lease(key: String, cmp: PyCompareOp, lease: i64) -> PyResult<Self> {
Ok(PyCompare(Compare::lease(key, cmp.0, lease)))
}

fn with_range(&self, end: Vec<u8>) -> PyResult<Self> {
fn with_range(&self, end: String) -> PyResult<Self> {
Ok(PyCompare(self.0.clone().with_range(end)))
}

Expand Down
9 changes: 8 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ mod watch;

use client::PyClient;
use communicator::PyCommunicator;
use compare::{PyCompare, PyCompareOp};
use condvar::PyCondVar;
use error::ClientError;
use event::{PyWatchEvent, PyWatchEventType};
use pyo3::prelude::*;
use transaction::PyTxn;
use transaction::{PyTxn, PyTxnOp};
use txn_response::PyTxnResponse;
use watch::PyWatch;

#[pymodule]
Expand All @@ -30,7 +32,12 @@ fn etcd_client(py: Python, module: &PyModule) -> PyResult<()> {
module.add_class::<PyWatchEventType>()?;

module.add_class::<PyCondVar>()?;
module.add_class::<PyCompare>()?;
module.add_class::<PyCompareOp>()?;

module.add_class::<PyTxn>()?;
module.add_class::<PyTxnOp>()?;
module.add_class::<PyTxnResponse>()?;

module.add("ClientError", py.get_type::<ClientError>())?;
Ok(())
Expand Down
43 changes: 43 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest
from tests.harness import AsyncEtcd, ConfigScopes, HostPortPair


@pytest.fixture
async def etcd():
etcd = AsyncEtcd(
addr=HostPortPair(host="127.0.0.1", port=2379),
namespace="test",
scope_prefix_map={
ConfigScopes.GLOBAL: "global",
ConfigScopes.SGROUP: "sgroup/testing",
ConfigScopes.NODE: "node/i-test",
},
)
try:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
await etcd.delete_prefix("", scope=ConfigScopes.SGROUP)
await etcd.delete_prefix("", scope=ConfigScopes.NODE)
return etcd
finally:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
await etcd.delete_prefix("", scope=ConfigScopes.SGROUP)
await etcd.delete_prefix("", scope=ConfigScopes.NODE)
await etcd.close()
del etcd


@pytest.fixture
async def gateway_etcd():
etcd = AsyncEtcd(
addr=HostPortPair(host="127.0.0.1", port=2379),
namespace="test",
scope_prefix_map={
ConfigScopes.GLOBAL: "",
},
)
try:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
yield etcd
finally:
await etcd.delete_prefix("", scope=ConfigScopes.GLOBAL)
del etcd
21 changes: 5 additions & 16 deletions tests/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def __init__(
self.watch_reconnect_intvl = watch_reconnect_intvl

self.etcd = EtcdClient(
["http://localhost:2379"],
[f"http://{addr.host}:{addr.port}"],
# credentials=self._creds,
# encoding=self.encoding,
)
Expand Down Expand Up @@ -425,27 +425,16 @@ async def replace(
scope_prefix = self._merge_scope_prefix_map(scope_prefix_map)[scope]
mangled_key = self._mangle_key(f"{_slash(scope_prefix)}{key}")

# def _txn(success: EtcdTransactionAction, _):
# success.put(mangled_key, new_val)

# async with self.etcd.connect() as communicator:
# _, success = await communicator.txn_compare(
# [
# CompareKey(mangled_key).value == initial_val,
# ],
# _txn,
# )
# return success

# TODO: Test below transaction codes
async with self.etcd.connect() as communicator:
put_action = TxnOp.put(mangled_key, new_val)

txn = EtcdTransactionAction()

communicator.txn(txn.when([
result = await communicator.txn(txn.when([
CompareKey.value(mangled_key, CompareOp.EQUAL, initial_val),
]).and_then(list[put_action]).or_else([]))
]).and_then([put_action]).or_else([]))

return result.succeeded()

async def delete(
self,
Expand Down
99 changes: 50 additions & 49 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,64 @@

import pytest
from etcd_client import Client, CondVar, WatchEventType
from tests.harness import AsyncEtcd, HostPortPair

etcd_client = Client(["http://localhost:2379"])

# @pytest.mark.asyncio
# async def test_basic_crud() -> None:
# async with etcd_client.connect() as etcd:
# await etcd.put("wow", "abc")
@pytest.mark.asyncio
async def test_basic_crud(etcd: AsyncEtcd) -> None:
etcd = await etcd

# v = await etcd.get("wow")
# assert v == "abc"
# vp = await etcd.get_prefix("wow")
# assert len(vp) == 1
# assert vp == {"": "abc"}
await etcd.put("wow", "abc")

# r = await etcd.replace("wow", "aaa", "ccc")
# assert r is False
# r = await etcd.replace("wow", "abc", "def")
# assert r is True
# v = await etcd.get("wow")
# assert v == "def"
v = await etcd.get("wow")
assert v == "abc"
vp = await etcd.get_prefix("wow")
assert len(vp) == 1
assert vp == {"": "abc"}

# await etcd.delete("wow")
r = await etcd.replace("wow", "aaa", "ccc")
assert r is False
r = await etcd.replace("wow", "abc", "def")
assert r is True
v = await etcd.get("wow")
assert v == "def"

# v = await etcd.get("wow")
# assert v is None
# vp = await etcd.get_prefix("wow")
# assert len(vp) == 0
await etcd.delete("wow")

v = await etcd.get("wow")
assert v is None
vp = await etcd.get_prefix("wow")
assert len(vp) == 0

@pytest.mark.asyncio
async def test_quote_for_put_prefix() -> None:
async with etcd_client.connect() as etcd:
await etcd.put_prefix(
"data",
{
"aa:bb": {
"option1": "value1",
"option2": "value2",
"myhost/path": "this",
},
"aa:cc": "wow",
"aa:dd": {
"": "oops",
},
},
)

v = await etcd.get("data/aa%3Abb/option1")
assert v == "value1"
v = await etcd.get("data/aa%3Abb/option2")
assert v == "value2"
v = await etcd.get("data/aa%3Abb/myhost%2Fpath")
assert v == "this"
v = await etcd.get("data/aa%3Acc")
assert v == "wow"
v = await etcd.get("data/aa%3Add")
assert v == "oops"

# @pytest.mark.asyncio
# async def test_quote_for_put_prefix() -> None:
# async with etcd_client.connect() as etcd:
# await etcd.put_prefix(
# "data",
# {
# "aa:bb": {
# "option1": "value1",
# "option2": "value2",
# "myhost/path": "this",
# },
# "aa:cc": "wow",
# "aa:dd": {
# "": "oops",
# },
# },
# )

# v = await etcd.get("data/aa%3Abb/option1")
# assert v == "value1"
# v = await etcd.get("data/aa%3Abb/option2")
# assert v == "value2"
# v = await etcd.get("data/aa%3Abb/myhost%2Fpath")
# assert v == "this"
# v = await etcd.get("data/aa%3Acc")
# assert v == "wow"
# v = await etcd.get("data/aa%3Add")
# assert v == "oops"


# @pytest.mark.asyncio
Expand Down

0 comments on commit d04de81

Please sign in to comment.