Skip to content

Commit

Permalink
Replace polling with tokio::sync::Notify
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jan 29, 2024
1 parent 2b498b6 commit 64ae685
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl PyCondVar {
})
}

pub fn notify_all<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
pub fn notify_waiters<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let inner = self.inner.clone();
let condition = self.condition.clone();
future_into_py(py, async move {
Expand All @@ -43,7 +43,7 @@ impl PyCondVar {
}

impl PyCondVar {
pub async fn _notify_all(&self) {
pub async fn _notify_waiters(&self) {
let inner = self.inner.clone();
let condition = self.condition.clone();
*condition.lock().await = true;
Expand Down
16 changes: 11 additions & 5 deletions src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use etcd_client::WatchOptions;
use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
use tokio::sync::Notify;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -16,6 +17,7 @@ pub struct PyWatch {
client: Arc<Mutex<EtcdClient>>,
key: String,
options: Option<WatchOptions>,
event_stream_init_notifier: Arc<Notify>,
event_stream: Arc<Mutex<Option<PyEventStream>>>,
ready_event: Option<PyCondVar>,
cleanup_event: Option<PyCondVar>,
Expand All @@ -33,6 +35,7 @@ impl PyWatch {
client,
key,
options,
event_stream_init_notifier: Arc::new(Notify::new()),
event_stream: Arc::new(Mutex::new(None)),
ready_event,
cleanup_event,
Expand All @@ -46,14 +49,17 @@ impl PyWatch {
return Ok(());
}

let event_stream_init_notifier = self.event_stream_init_notifier.clone();

let mut client = self.client.lock().await;

match client.watch(self.key.clone(), self.options.clone()).await {
Ok((_, stream)) => {
*event_stream = Some(PyEventStream::new(stream));
event_stream_init_notifier.notify_waiters();

if let Some(ready_event) = &self.ready_event {
ready_event._notify_all().await;
ready_event._notify_waiters().await;
}
Ok(())
}
Expand All @@ -70,16 +76,16 @@ impl PyWatch {

fn __anext__<'a>(&'a mut self, py: Python<'a>) -> PyResult<Option<PyObject>> {
let watch = Arc::new(Mutex::new(self.clone()));
let event_stream_init_notifier = self.event_stream_init_notifier.clone();

let result = future_into_py(py, async move {
let mut watch = watch.lock().await;
watch.init().await?;

let mut event_stream = watch.event_stream.lock().await;

while event_stream.is_none() {
// Wait for a short duration before checking again
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
event_stream = watch.event_stream.lock().await;
if event_stream.is_none() {
event_stream_init_notifier.notified().await;
}

let event_stream = event_stream.as_mut().unwrap();
Expand Down

0 comments on commit 64ae685

Please sign in to comment.