Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 29, 2024
1 parent 1260699 commit 816d030
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 69 deletions.
62 changes: 56 additions & 6 deletions etcd_client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,55 @@
Type hints for Native Rust Extension
"""

from typing import Any, AsyncIterator, Final, Iterator, Optional
from typing import Any, AsyncIterator, Final, Optional

class CompareOp:
""" """

EQUAL: Final[Any]
"""
"""
NOT_EQUAL: Final[Any]
"""
"""
GREATER: Final[Any]
"""
"""
LESS: Final[Any]
"""
"""

class Compare:
@classmethod
def version(key: str, cmp: "CompareOp", version: int) -> "Compare": ...
@classmethod
def create_revision(key: str, cmp: "CompareOp", revision: int) -> "Compare": ...
@classmethod
def mod_revision(key: str, cmp: "CompareOp", revision: int) -> "Compare": ...
@classmethod
def value(key: str, cmp: "CompareOp", value: str) -> "Compare": ...
@classmethod
def lease(key: str, cmp: "CompareOp", lease: int) -> "Compare": ...
def with_range(end: list[int]) -> "Compare": ...
def with_prefix() -> "Compare": ...

class Txn:
def when(self, compares: list["Compare"]) -> "Txn": ...
def and_then(self, operations: list["TxnOp"]) -> "Txn": ...
def or_else(self, operations: list["TxnOp"]) -> "Txn": ...

class TxnOp:
@classmethod
def get(key: str) -> "TxnOp": ...
@classmethod
def put(key: str, value: str) -> "TxnOp": ...
@classmethod
def delete(key: str) -> "TxnOp": ...
@classmethod
def txn(txn: "Txn") -> "TxnOp": ...

class TxnResponse:
def succeeded(self) -> bool: ...

class Client:
""" """
Expand All @@ -19,7 +67,7 @@ class Watch:

async def __aiter__(self) -> AsyncIterator["Watch"]:
""" """
async def __anext__(self) -> "Event":
async def __anext__(self) -> "WatchEvent":
""" """

class CondVar:
Expand All @@ -41,6 +89,8 @@ class Communicator:
""" """
async def put_prefix(self, key: str, value: dict[str, Any]) -> None:
""" """
async def txn(self, txn: "Txn") -> "TxnResponse":
""" """
async def delete(self, key: str) -> None:
""" """
async def delete_prefix(self, key: str) -> None:
Expand All @@ -66,19 +116,19 @@ class Communicator:
) -> "Watch":
""" """

class Event:
class WatchEvent:
""" """

key: str
value: str
event_type: "EventType"
event_type: "WatchEventType"
prev_value: Optional[str]

def __init__(
key: str, value: str, event_type: "EventType", prev_value: Optional[str]
key: str, value: str, event_type: "WatchEventType", prev_value: Optional[str]
) -> None: ...

class EventType:
class WatchEventType:
""" """

PUT: Final[Any]
Expand Down
37 changes: 10 additions & 27 deletions src/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use tokio::sync::Mutex;

use crate::condvar::PyCondVar;
use crate::error::Error;
use crate::transaction::PyTxn;
use crate::txn_response::PyTxnResponse;
use crate::utils::nested_hashmap::{
convert_pydict_to_nested_map, insert_into_map, put_recursive, NestedHashMap,
};
Expand Down Expand Up @@ -106,35 +108,16 @@ impl PyCommunicator {
})
}

fn delete_multi<'a>(&'a self, py: Python<'a>, keys: Vec<String>) -> PyResult<&'a PyAny> {
fn txn<'a>(&'a self, py: Python<'a>, txn: PyTxn) -> PyResult<&'a PyAny> {
let client = self.0.clone();
todo!();

// future_into_py(py, async move {
// let mut client = client.lock().await;
// result.map(|_| ()).map_err(|e| Error(e).into())
// })
}

fn txn<'a>(&'a self, py: Python<'a>, keys: Vec<String>) -> PyResult<&'a PyAny> {
let client = self.0.clone();
todo!();

// future_into_py(py, async move {
// let mut client = client.lock().await;
// client.txn(txn)
// result.map(|_| ()).map_err(|e| Error(e).into())
// })
}

fn txn_compare<'a>(&'a self, py: Python<'a>, keys: Vec<String>) -> PyResult<&'a PyAny> {
let client = self.0.clone();
todo!();

// future_into_py(py, async move {
// let mut client = client.lock().await;
// result.map(|_| ()).map_err(|e| Error(e).into())
// })
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.txn(txn.0).await;
result
.map(|response| PyTxnResponse(response))
.map_err(|e| Error(e).into())
})
}

fn replace<'a>(
Expand Down
27 changes: 16 additions & 11 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ use pyo3::prelude::*;
use pyo3::pyclass::CompareOp;

// Note: Event = namedtuple("Event", "key event value"), not asyncio.Event, threading.Event
#[pyclass(get_all, name = "Event")]
#[pyclass(get_all, name = "WatchEvent")]
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct PyEvent {
pub struct PyWatchEvent {
key: String,
value: String,
event: PyEventType,
event: PyWatchEventType,
prev_value: Option<String>,
}

#[pymethods]
impl PyEvent {
impl PyWatchEvent {
#[new]
#[pyo3(signature = (key, value, event, prev_value))]
fn new(key: String, value: String, event: PyEventType, prev_value: Option<String>) -> Self {
fn new(
key: String,
value: String,
event: PyWatchEventType,
prev_value: Option<String>,
) -> Self {
Self {
key,
value,
Expand All @@ -42,13 +47,13 @@ impl PyEvent {
}
}

impl From<EtcdClientEvent> for PyEvent {
impl From<EtcdClientEvent> for PyWatchEvent {
fn from(event: EtcdClientEvent) -> Self {
let kv = event.kv().unwrap();
let key = String::from_utf8(kv.key().to_owned()).unwrap();
let value = String::from_utf8(kv.value().to_owned()).unwrap();
let prev_value = None;
let event = PyEventType(event.event_type());
let event = PyWatchEventType(event.event_type());
Self {
key,
value,
Expand All @@ -58,12 +63,12 @@ impl From<EtcdClientEvent> for PyEvent {
}
}

#[pyclass(name = "EventType")]
#[pyclass(name = "WatchEventType")]
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct PyEventType(EtcdClientEventType);
pub struct PyWatchEventType(EtcdClientEventType);

#[pymethods]
impl PyEventType {
impl PyWatchEventType {
#[classattr]
const PUT: Self = Self(EtcdClientEventType::Put);

Expand All @@ -77,7 +82,7 @@ impl PyEventType {
}
}

pub fn __richcmp__(&self, py: Python, rhs: &PyEventType, op: CompareOp) -> PyObject {
pub fn __richcmp__(&self, py: Python, rhs: &PyWatchEventType, op: CompareOp) -> PyObject {
match op {
CompareOp::Eq => (self.0 == rhs.0).into_py(py),
CompareOp::Ne => (self.0 != rhs.0).into_py(py),
Expand Down
9 changes: 6 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ mod event;
mod request_generator;
mod stream;
mod transaction;
mod txn_response;
mod utils;
mod watch;

use client::PyClient;
use communicator::PyCommunicator;
use condvar::PyCondVar;
use error::ClientError;
use event::{PyEvent, PyEventType};
use event::{PyWatchEvent, PyWatchEventType};
use pyo3::prelude::*;
use transaction::PyTxn;
use watch::PyWatch;
Expand All @@ -23,9 +24,11 @@ use watch::PyWatch;
fn etcd_client(py: Python, module: &PyModule) -> PyResult<()> {
module.add_class::<PyClient>()?;
module.add_class::<PyCommunicator>()?;
module.add_class::<PyEvent>()?;
module.add_class::<PyEventType>()?;

module.add_class::<PyWatch>()?;
module.add_class::<PyWatchEvent>()?;
module.add_class::<PyWatchEventType>()?;

module.add_class::<PyCondVar>()?;
module.add_class::<PyTxn>()?;

Expand Down
12 changes: 6 additions & 6 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use etcd_client::WatchStream;
use pyo3::pyclass;
use tokio_stream::StreamExt;

use crate::{error::Error, event::PyEvent};
use crate::{error::Error, event::PyWatchEvent};

#[pyclass(name = "EventStream")]
pub struct PyEventStream {
#[pyclass(name = "WatchEventStream")]
pub struct PyWatchEventStream {
stream: WatchStream,
events: Vec<PyEvent>,
events: Vec<PyWatchEvent>,
index: usize,
once: bool,
}

impl PyEventStream {
impl PyWatchEventStream {
pub fn new(stream: WatchStream, once: bool) -> Self {
Self {
stream,
Expand All @@ -22,7 +22,7 @@ impl PyEventStream {
}
}

pub async fn next(&mut self) -> Option<Result<PyEvent, Error>> {
pub async fn next(&mut self) -> Option<Result<PyWatchEvent, Error>> {
if self.once && self.index > 0 {
return None;
}
Expand Down
13 changes: 9 additions & 4 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use pyo3::prelude::*;

use crate::compare::PyCompare;

#[derive(Clone)]
#[derive(Debug, Clone)]
#[pyclass(name = "TxnOp")]
pub struct PyTxnOp(TxnOp);
pub struct PyTxnOp(pub TxnOp);

#[pymethods]
impl PyTxnOp {
Expand All @@ -25,11 +25,16 @@ impl PyTxnOp {
let options = DeleteOptions::new();
Ok(PyTxnOp(TxnOp::delete(key, Some(options))))
}

#[staticmethod]
fn txn(txn: PyTxn) -> PyResult<Self> {
Ok(PyTxnOp(TxnOp::txn(txn.0)))
}
}

#[derive(Clone)]
#[derive(Debug, Default, Clone)]
#[pyclass(name = "Txn")]
pub struct PyTxn(Txn);
pub struct PyTxn(pub Txn);

#[pymethods]
impl PyTxn {
Expand Down
14 changes: 14 additions & 0 deletions src/txn_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use etcd_client::TxnResponse;
use pyo3::prelude::*;

#[derive(Clone)]
#[pyclass(name = "TxnResponse")]
pub struct PyTxnResponse(pub TxnResponse);

// TODO: Add ResponseHeader, TxnOpResponse
#[pymethods]
impl PyTxnResponse {
pub fn succeeded(&self) -> PyResult<bool> {
Ok(self.0.succeeded())
}
}
6 changes: 3 additions & 3 deletions src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::sync::Notify;

use crate::condvar::PyCondVar;
use crate::error::Error;
use crate::stream::PyEventStream;
use crate::stream::PyWatchEventStream;

#[pyclass(name = "Watch")]
#[derive(Clone)]
Expand All @@ -21,7 +21,7 @@ pub struct PyWatch {
options: Option<WatchOptions>,
watcher: Arc<Mutex<Option<Watcher>>>,
event_stream_init_notifier: Arc<Notify>,
event_stream: Arc<Mutex<Option<PyEventStream>>>,
event_stream: Arc<Mutex<Option<PyWatchEventStream>>>,
ready_event: Option<PyCondVar>,
cleanup_event: Option<PyCondVar>,
}
Expand Down Expand Up @@ -61,7 +61,7 @@ impl PyWatch {

match client.watch(self.key.clone(), self.options.clone()).await {
Ok((watcher, stream)) => {
*event_stream = Some(PyEventStream::new(stream, self.once));
*event_stream = Some(PyWatchEventStream::new(stream, self.once));
*self.watcher.lock().await = Some(watcher);

event_stream_init_notifier.notify_waiters();
Expand Down
Loading

0 comments on commit 816d030

Please sign in to comment.