Skip to content

Commit

Permalink
Handle error properly
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 30, 2024
1 parent 7590451 commit dba1f27
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 148 deletions.
113 changes: 108 additions & 5 deletions etcd_client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Type hints for Native Rust Extension
"""

from enum import Enum
from typing import Any, AsyncIterator, Final, Optional

class CompareOp:
Expand Down Expand Up @@ -31,8 +32,8 @@ class Compare:
def value(key: str, cmp: "CompareOp", value: str) -> "Compare": ...
@classmethod
def lease(key: str, cmp: "CompareOp", lease: int) -> "Compare": ...
def with_range(end: list[int]) -> "Compare": ...
def with_prefix() -> "Compare": ...
def with_range(self, end: list[int]) -> "Compare": ...
def with_prefix(self) -> "Compare": ...

class Txn:
def __init__(self) -> None: ...
Expand All @@ -56,13 +57,24 @@ class TxnResponse:
class Client:
""" """

def __init__(self, endpoints: list[str]) -> None:
def __init__(
self, endpoints: list[str], options: Optional["ConnectOptions"] = None
) -> None:
""" """
def connect(self) -> "Client":
""" """
async def __aenter__(self) -> "Communicator":
""" """

class ConnectOptions:
def __init__(self) -> None: ...
def with_username(self, user: str, password: str) -> "ConnectOptions": ...
def with_keep_alive(self, interval: int, timeout: int) -> "ConnectOptions": ...
def with_keep_alive_while_idle(self, enabled: bool) -> "ConnectOptions": ...
def with_connect_timeout(self, connect_timeout: int) -> "ConnectOptions": ...
def with_timeout(self, timeout: int) -> "ConnectOptions": ...
def with_tcp_keepalive(self, tcp_keepalive: int) -> "ConnectOptions": ...

class Watch:
""" """

Expand All @@ -78,7 +90,7 @@ class CondVar:
""" """
async def wait(self) -> None:
""" """
async def notify_all(self) -> None:
async def notify_waiters(self) -> None:
""" """

class Communicator:
Expand Down Expand Up @@ -126,7 +138,10 @@ class WatchEvent:
prev_value: Optional[str]

def __init__(
key: str, value: str, event_type: "WatchEventType", prev_value: Optional[str]
key: str,
value: str,
event_type: "WatchEventType",
prev_value: Optional[str] = None,
) -> None: ...

class WatchEventType:
Expand All @@ -138,3 +153,91 @@ class WatchEventType:
DELETE: Final[Any]
"""
"""

class ClientError(Exception):
""" """

class GRpcStatusError(ClientError):
""" """

class InvalidArgsError(ClientError):
""" """

class IoError(ClientError):
""" """

class InvalidUriError(ClientError):
""" """

class TransportError(ClientError):
""" """

class WatchError(ClientError):
""" """

class Utf8Error(ClientError):
""" """

class LeaseKeepAliveError(ClientError):
""" """

class ElectError(ClientError):
""" """

class InvalidHeaderValueError(ClientError):
""" """

class EndpointError(ClientError):
""" """

class GRpcStatusCode(Enum):
Ok = 0
"""The operation completed successfully."""

Cancelled = 1
"""The operation was cancelled."""

Unknown = 2
"""Unknown error."""

InvalidArgument = 3
"""Client specified an invalid argument."""

DeadlineExceeded = 4
"""Deadline expired before operation could complete."""

NotFound = 5
"""Some requested entity was not found."""

AlreadyExists = 6
"""Some entity that we attempted to create already exists."""

PermissionDenied = 7
"""The caller does not have permission to execute the specified operation."""

ResourceExhausted = 8
"""Some resource has been exhausted."""

FailedPrecondition = 9
"""The system is not in a state required for the operation's execution."""

Aborted = 10
"""The operation was aborted."""

OutOfRange = 11
"""Operation was attempted past the valid range."""

Unimplemented = 12
"""Operation is not implemented or not supported."""

Internal = 13
"""Internal error."""

Unavailable = 14
"""The service is currently unavailable."""

DataLoss = 15
"""Unrecoverable data loss or corruption."""

Unauthenticated = 16
"""The request does not have valid authentication credentials."""
66 changes: 60 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,77 @@
use etcd_client::Client as EtcdClient;
use etcd_client::{Client as EtcdClient, ConnectOptions};
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use pyo3_asyncio::tokio::future_into_py;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

use crate::communicator::PyCommunicator;
use crate::error::Error;
use crate::error::PyClientError;

#[pyclass(name = "ConnectOptions")]
#[derive(Clone, Default)]
pub struct PyConnectOptions(pub ConnectOptions);

#[pymethods]
impl PyConnectOptions {
#[new]
fn new() -> Self {
Self(ConnectOptions::default())
}

fn with_user(&self, name: String, password: String) -> Self {
PyConnectOptions(self.0.clone().with_user(name, password))
}

fn with_keep_alive(&self, interval: u64, timeout: u64) -> Self {
PyConnectOptions(
self.0
.clone()
.with_keep_alive(Duration::from_secs(interval), Duration::from_secs(timeout)),
)
}

fn with_keep_alive_while_idle(&self, enabled: bool) -> Self {
PyConnectOptions(self.0.clone().with_keep_alive_while_idle(enabled))
}

fn with_connect_timeout(&self, connect_timeout: u64) -> Self {
PyConnectOptions(
self.0
.clone()
.with_connect_timeout(Duration::from_secs(connect_timeout)),
)
}

fn with_timeout(&self, timeout: u64) -> Self {
PyConnectOptions(self.0.clone().with_timeout(Duration::from_secs(timeout)))
}

fn with_tcp_keepalive(&self, tcp_keepalive: u64) -> Self {
PyConnectOptions(
self.0
.clone()
.with_tcp_keepalive(Duration::from_secs(tcp_keepalive)),
)
}

// TODO: Implement "tls", "tls-openssl" authentification
}

#[pyclass(name = "Client")]
#[derive(Clone)]
pub struct PyClient {
endpoints: Vec<String>,
options: PyConnectOptions,
}

#[pymethods]
impl PyClient {
#[new]
fn new(endpoints: Vec<String>) -> Self {
Self { endpoints }
fn new(endpoints: Vec<String>, options: Option<PyConnectOptions>) -> Self {
let options = options.unwrap_or(PyConnectOptions::default());
Self { endpoints, options }
}

fn connect(&self) -> Self {
Expand All @@ -27,11 +80,12 @@ impl PyClient {

fn __aenter__<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let endpoints = self.endpoints.clone();
let options = self.options.clone();
future_into_py(py, async move {
let result = EtcdClient::connect(endpoints, None).await;
let result = EtcdClient::connect(endpoints, Some(options.0)).await;
result
.map(|client| PyCommunicator(Arc::new(Mutex::new(client))))
.map_err(|e| Error(e).into())
.map_err(|e| PyClientError(e).into())
})
}

Expand Down
26 changes: 14 additions & 12 deletions src/communicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::sync::Arc;
use tokio::sync::Mutex;

use crate::condvar::PyCondVar;
use crate::error::Error;
use crate::transaction::PyTxn;
use crate::error::PyClientError;
use crate::txn::PyTxn;
use crate::txn_response::PyTxnResponse;
use crate::watch::PyWatch;

Expand All @@ -32,7 +32,7 @@ impl PyCommunicator {
None
}
})
.map_err(|e| Error(e).into())
.map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -53,7 +53,7 @@ impl PyCommunicator {
}
result
})
.map_err(|e| Error(e).into())
.map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -62,7 +62,7 @@ impl PyCommunicator {
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.put(key, value, None).await;
result.map(|_| ()).map_err(|e| Error(e).into())
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -77,7 +77,7 @@ impl PyCommunicator {
let mut client = client.lock().await;
let options = PutOptions::new().with_prev_key();
let result = client.put(prefix, value, Some(options)).await;
result.map(|_| ()).map_err(|e| Error(e).into())
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -87,7 +87,7 @@ impl PyCommunicator {
let mut client = client.lock().await;

let result = client.delete(key, None).await;
result.map(|_| ()).map_err(|e| Error(e).into())
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -97,7 +97,7 @@ impl PyCommunicator {
let mut client = client.lock().await;
let options = DeleteOptions::new().with_prefix();
let result = client.delete(key, Some(options)).await;
result.map(|_| ()).map_err(|e| Error(e).into())
result.map(|_| ()).map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -107,7 +107,9 @@ impl PyCommunicator {
future_into_py(py, async move {
let mut client = client.lock().await;
let result = client.txn(txn.0).await;
result.map(PyTxnResponse).map_err(|e| Error(e).into())
result
.map(PyTxnResponse)
.map_err(|e| PyClientError(e).into())
})
}

Expand All @@ -127,7 +129,7 @@ impl PyCommunicator {
if *key_value.value_str().unwrap() == initial_val {
match client.put(key, new_val, None).await {
Ok(_) => Ok(true), // replace successful
Err(e) => Err(Error(e)),
Err(e) => Err(PyClientError(e)),
}
} else {
Ok(false) // initial_val not equal to current value
Expand All @@ -136,7 +138,7 @@ impl PyCommunicator {
Ok(false) // Key does not exist
}
}
Err(e) => Err(Error(e)),
Err(e) => Err(PyClientError(e)),
}
.map_err(|e| PyErr::new::<PyException, _>(format!("{}", e.0)))
})
Expand All @@ -158,7 +160,7 @@ impl PyCommunicator {
}
result
})
.map_err(|e| Error(e).into())
.map_err(|e| PyClientError(e).into())
})
}

Expand Down
Loading

0 comments on commit dba1f27

Please sign in to comment.