Skip to content

Commit

Permalink
Encapsulate flume::Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Oct 3, 2024
1 parent 15a84de commit c39b25e
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 11 deletions.
212 changes: 209 additions & 3 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

//! Callback handler trait.

use std::sync::Arc;
use std::{
sync::Arc,
time::{Duration, Instant},
};

use zenoh_result::ZResult;

use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

Expand All @@ -41,11 +46,212 @@ impl Default for FifoChannel {
}
}

/// [`FifoChannel`] handler.
#[derive(Debug, Clone)]
pub struct FifoChannelHandler<T>(flume::Receiver<T>);

impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
type Handler = flume::Receiver<T>;
type Handler = FifoChannelHandler<T>;

fn into_handler(self) -> (Callback<T>, Self::Handler) {
flume::bounded(self.capacity).into_handler()
let (sender, receiver) = flume::bounded(self.capacity);
(
Callback::new(Arc::new(move |t| {
if let Err(error) = sender.send(t) {
tracing::error!(%error)
}
})),
FifoChannelHandler(receiver),
)
}
}

impl<T> FifoChannelHandler<T> {
/// Attempt to fetch an incoming value from the channel associated with this receiver, returning
/// an error if the channel is empty or if all senders have been dropped.
pub fn try_recv(&self) -> ZResult<T> {
self.0.try_recv().map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped.
pub fn recv(&self) -> ZResult<T> {
self.0.recv().map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the deadline has passed.
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<T> {
self.0.recv_deadline(deadline).map_err(Into::into)
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the timeout has expired.
pub fn recv_timeout(&self, dur: Duration) -> ZResult<T> {
self.0
.recv_deadline(Instant::now().checked_add(dur).unwrap())
.map_err(Into::into)
}

/// Create a blocking iterator over the values received on the channel that finishes iteration
/// when all senders have been dropped.
pub fn iter(&self) -> Iter<'_, T> {
Iter(self.0.iter())
}

/// A non-blocking iterator over the values received on the channel that finishes iteration when
/// all senders have been dropped or the channel is empty.
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter(self.0.try_iter())
}

/// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
/// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
/// the function has been called.
pub fn drain(&self) -> Drain<'_, T> {
Drain(self.0.drain())
}

/// Returns true if all senders for this channel have been dropped.
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}

/// Returns true if the channel is empty.
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Returns true if the channel is full.
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
self.0.is_full()
}

/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
self.0.len()
}

/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}

/// Get the number of senders that currently exist.
pub fn sender_count(&self) -> usize {
self.0.sender_count()
}

/// Get the number of receivers that currently exist, including this one.
pub fn receiver_count(&self) -> usize {
self.0.receiver_count()
}

/// Returns whether the receivers are belong to the same channel.
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}

/// This exists as a shorthand for [`FifoChannelHandler::iter`].
impl<'a, T> IntoIterator for &'a FifoChannelHandler<T> {
type Item = T;
type IntoIter = Iter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.0.iter())
}
}

impl<T> IntoIterator for FifoChannelHandler<T> {
type Item = T;
type IntoIter = IntoIter<T>;

/// Creates a self-owned but semantically equivalent alternative to [`FifoChannelHandler::iter`].
fn into_iter(self) -> Self::IntoIter {
IntoIter(self.0.into_iter())
}
}

/// An iterator over the msgs received from a channel.
pub struct Iter<'a, T>(flume::Iter<'a, T>);

impl<'a, T> Iterator for Iter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

/// An non-blocking iterator over the msgs received from a channel.
pub struct TryIter<'a, T>(flume::TryIter<'a, T>);

impl<'a, T> Iterator for TryIter<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

/// An fixed-sized iterator over the msgs drained from a channel.
#[derive(Debug)]
pub struct Drain<'a, T>(flume::Drain<'a, T>);

impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<'a, T> ExactSizeIterator for Drain<'a, T> {
fn len(&self) -> usize {
self.0.len()
}
}

/// An owned iterator over the msgs received from a channel.
pub struct IntoIter<T>(flume::IntoIter<T>);

impl<T> Iterator for IntoIter<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<T> FifoChannelHandler<T> {
/// Asynchronously receive a value from the channel, returning an error if all senders have been
/// dropped. If the channel is empty, the returned future will yield to the async runtime.
pub async fn recv_async(&self) -> ZResult<T> {
self.0.recv_async().await.map_err(Into::into)
}

/// Convert this receiver into a future that asynchronously receives a single message from the
/// channel, returning an error if all senders have been dropped. If the channel is empty, this
/// future will yield to the async runtime.
pub async fn into_recv_async(self) -> ZResult<T> {
self.0.into_recv_async().await.map_err(Into::into)
}

/// Create an asynchronous stream that uses this receiver to asynchronously receive messages
/// from the channel. The receiver will continue to be usable after the stream has been dropped.
pub fn stream(&self) -> impl futures::Stream<Item = T> + '_ {
self.0.stream()
}
}

impl<'a, T: 'a> FifoChannelHandler<T> {
/// Convert this receiver into a stream that allows asynchronously receiving messages from the
/// channel.
pub fn into_stream(self) -> impl futures::Stream<Item = T> + 'a {
self.0.into_stream()
}
}

Expand Down
29 changes: 21 additions & 8 deletions zenoh/tests/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use std::time::Duration;

use flume::RecvTimeoutError;
use zenoh::{sample::Locality, Result as ZResult, Session};
use zenoh_config::{ModeDependentValue, WhatAmI};
use zenoh_core::ztimeout;
Expand Down Expand Up @@ -59,7 +58,9 @@ async fn zenoh_matching_status_any() -> ZResult<()> {
let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand Down Expand Up @@ -113,23 +114,29 @@ async fn zenoh_matching_status_remote() -> ZResult<()> {
let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_remote_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand Down Expand Up @@ -168,7 +175,9 @@ async fn zenoh_matching_status_local() -> ZResult<()> {
let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand All @@ -192,15 +201,19 @@ async fn zenoh_matching_status_local() -> ZResult<()> {
let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_local_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.err() == Some(RecvTimeoutError::Timeout));
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand Down

0 comments on commit c39b25e

Please sign in to comment.