Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] WakeOnWrite #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ authors = [
[features]

[dependencies]
async-std = "1"

[dev-dependencies]
async-std = { version = "1", features = ["attributes", "unstable"] }
pin-utils = "0.1.0-alpha.4"
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version = "Two"
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]

pub mod wake_on;
117 changes: 117 additions & 0 deletions src/wake_on.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Smart pointers to wake tasks on access
use async_std::task::Waker;
use std::ops::{Deref, DerefMut};

/// A wrapper type which wakes tasks whenever the wrapped value is accessed
/// through an `&mut` reference.
///
/// `T` is the type of the value being wrapped. This struct is `Deref` and
/// `DerefMut` for that type, giving `&T` and `&mut T` respectively.
/// When a `Waker` is registered with `set_waker`, that `Waker` is woken
/// whenever the wrapped value is accessed through an `&mut` reference
/// and therefore potentially mutated.
///
/// This is useful when there is a future polling the state of the wrapped
/// value. It needs to be awoken whenever that value changes so that they
/// can check whether or not its value is in a state that will let the
/// future make progress. That future can register the `Waker` from the
/// `Context` it is passed with the `WakeOnWrite` wrapping the value it is
/// interested in so that all mutations cause it to be woken going forward.
///
/// This type isn't effective for observing changes on values with interior
/// mutablity, because it only wakes on `&mut` access.
#[derive(Default, Debug, Clone)]
pub struct WakeOnWrite<T> {
inner: T,
waker: Option<Waker>,
}

impl<T> WakeOnWrite<T> {
/// Create a new `WakeOnWrite` with the given value.
pub fn new(value: T) -> Self {
Self {
inner: value,
waker: None,
}
}

/// Set the `Waker` to be awoken when this value is mutated.
///
/// Returns the currently registered `Waker`, if there is one.
pub fn set_waker(wow: &mut Self, waker: Waker) -> Option<Waker> {
wow.waker.replace(waker)
}

/// Removes and returns the currently registered `Waker`, if there is one.
pub fn take_waker(wow: &mut Self) -> Option<Waker> {
wow.waker.take()
}

NoraCodes marked this conversation as resolved.
Show resolved Hide resolved
/// Returns the currently registered `Waker`, leaving it registered, if
/// there is one.
pub fn waker(wow: &Self) -> Option<&Waker> {
wow.waker.as_ref()
}
}

impl<T> Deref for WakeOnWrite<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> DerefMut for WakeOnWrite<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.waker.as_ref().map(|w| w.wake_by_ref());
&mut self.inner
}
}

#[async_std::test]
async fn wow_wakes_target_on_mut_access() {
use async_std::future::poll_fn;
use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::sync::Mutex;
use async_std::task::Poll;
use pin_utils::pin_mut;
use std::future::Future;

let data: Arc<Mutex<WakeOnWrite<u8>>> = Default::default();
let data_checker = {
let data_ref = data.clone();
poll_fn(move |ctx| {
// This is an inefficient use of futures, but it does work in this
// case.
let data_lock_future = data_ref.lock();
pin_mut!(data_lock_future);
match data_lock_future.poll(ctx) {
Poll::Ready(mut lock) => match **lock {
10 => Poll::Ready(()),
_ => {
WakeOnWrite::set_waker(&mut lock, ctx.waker().clone());
Poll::Pending
}
},
Poll::Pending => Poll::Pending,
}
})
};

let data_incrementor = {
let data_ref = data.clone();
async move {
for _ in 0..10u8 {
let mut lock = data_ref.lock().await;
**lock += 1;
}
}
};

data_checker
.join(data_incrementor)
.timeout(core::time::Duration::new(1, 0))
.await
.unwrap();
}