Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deadline adoption for multi-fragment message #1510

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,40 @@ impl StageInMutex {
}
}

struct Deadline {
deadline: Option<Option<Instant>>,
wait_time: Duration,
}

impl Deadline {
fn new(wait_time: Duration) -> Self {
let deadline = Self::calc_deadline(wait_time);
Self {
deadline,
wait_time,
}
}

#[inline]
fn wait(&self, s_ref: &StageInRefill) -> bool {
match self.deadline {
None => false,
Some(None) => s_ref.wait(),
Some(Some(deadline)) => s_ref.wait_deadline(deadline),
}
}

#[inline]
fn on_next_fragment(&mut self) {
self.deadline = Self::calc_deadline(self.wait_time);
}

#[inline]
fn calc_deadline(wait_time: Duration) -> Option<Option<Instant>> {
(!wait_time.is_zero()).then_some(Instant::now().checked_add(wait_time))
}
}

// This is the initial stage of the pipeline where messages are serliazed on
struct StageIn {
s_ref: StageInRefill,
Expand All @@ -141,13 +175,13 @@ impl StageIn {
&mut self,
msg: &mut NetworkMessage,
priority: Priority,
deadline: Option<Option<Instant>>,
deadline: &mut Deadline,
) -> bool {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

macro_rules! zgetbatch_rets {
($fragment:expr, $restore_sn:expr) => {
($restore_sn:expr) => {
loop {
match c_guard.take() {
Some(batch) => break batch,
Expand All @@ -163,11 +197,8 @@ impl StageIn {
None => {
drop(c_guard);
// Wait for an available batch until deadline
if !match deadline {
None => false,
Some(None) => self.s_ref.wait(),
Some(Some(deadline)) => self.s_ref.wait_deadline(deadline),
} {
if !deadline.wait(&self.s_ref) {
// tracing::warn!("Zenoh message dropped because of deadline",);
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
Expand Down Expand Up @@ -198,7 +229,7 @@ impl StageIn {
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!(false, {});
let mut batch = zgetbatch_rets!({});
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch, msg),
Expand Down Expand Up @@ -228,7 +259,7 @@ impl StageIn {
if !batch.is_empty() {
// Move out existing batch
self.s_out.move_batch(batch);
batch = zgetbatch_rets!(false, tch.sn.set(sn).unwrap());
batch = zgetbatch_rets!(tch.sn.set(sn).unwrap());
}

// Attempt a second serialization on fully empty batch
Expand Down Expand Up @@ -258,8 +289,7 @@ impl StageIn {
let mut reader = self.fragbuf.reader();
while reader.can_read() {
// Get the current serialization batch
// Treat all messages as non-droppable once we start fragmenting
batch = zgetbatch_rets!(true, tch.sn.set(sn).unwrap());
batch = zgetbatch_rets!(tch.sn.set(sn).unwrap());

// Serialize the message fragment
match batch.encode((&mut reader, &mut fragment)) {
Expand All @@ -281,6 +311,9 @@ impl StageIn {
break;
}
}

// adopt deadline for the next fragment
deadline.on_next_fragment();
}

// Clean the fragbuf
Expand Down Expand Up @@ -628,10 +661,10 @@ impl TransmissionPipelineProducer {
} else {
self.wait_before_close
};
let deadline = (!wait_time.is_zero()).then_some(Instant::now().checked_add(wait_time));
let mut deadline = Deadline::new(wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, deadline)
queue.push_network_message(&mut msg, priority, &mut deadline)
}

#[inline]
Expand Down