Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing Tokio's TcpStream with async-std TcpStream #475

Merged
merged 13 commits into from
Jan 30, 2024
Next Next commit
adding trace logs to every function call.
adding trace logs for the stream's readiness to read data
b-yap committed Jan 26, 2024
commit 13d839e6c2afa14ec1f8a936370cc1f6f53c3dd1
Original file line number Diff line number Diff line change
@@ -26,22 +26,30 @@ pub(crate) async fn poll_messages_from_stellar(
log::info!("poll_messages_from_stellar(): started.");

loop {
log::trace!("poll_messages_from_stellar(): start loop");
if send_to_user_sender.is_closed() {
log::info!("poll_messages_from_stellar(): closing receiver during disconnection");
// close this channel as communication to user was closed.
break
}

log::trace!("poll_messages_from_stellar(): checking for messages from the user...");
// check for messages from user.
match send_to_node_receiver.try_recv() {
Ok(msg) =>
if let Err(e) = connector.send_to_node(msg).await {
log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}");
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {},
Err(TryRecvError::Disconnected) => {
log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected.");
break;
},
Err(TryRecvError::Empty) => {
log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) received empty messages.");
},
}

log::trace!("poll_messages_from_stellar(): checking for messages from Stellar Node...");
// check for messages from Stellar Node.
match read_message_from_stellar(&mut read_stream_overlay, connector.timeout_in_secs).await {
Err(e) => {
@@ -54,7 +62,9 @@ pub(crate) async fn poll_messages_from_stellar(
if let Err(e) = send_to_user_sender.send(stellar_msg).await {
log::warn!("poll_messages_from_stellar(): Error occurred during sending message to user: {e:?}");
},
Ok(_) => {},
Ok(None) => {
log::trace!("poll_messages_from_stellar(): No message to send to user.");
},
Err(e) => {
log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}");
break
@@ -63,6 +73,7 @@ pub(crate) async fn poll_messages_from_stellar(
}
}

log::trace!("poll_messages_from_stellar(): stop polling for messages...");
// make sure to drop/shutdown the stream
connector.write_stream_overlay.forget();
drop(read_stream_overlay);
@@ -78,6 +89,8 @@ async fn read_message_from_stellar(
r_stream: &mut tcp::OwnedReadHalf,
timeout_in_secs: u64,
) -> Result<Xdr, Error> {
log::trace!("read_message_from_stellar(): start");

// holds the number of bytes that were missing from the previous stellar message.
let mut lack_bytes_from_prev = 0;
let mut readbuf: Vec<u8> = vec![];
@@ -87,6 +100,26 @@ async fn read_message_from_stellar(
// check whether or not we should read the bytes as:
// 1. the length of the next stellar message
// 2. the remaining bytes of the previous stellar message
let read_ready = r_stream.ready(tokio::io::Interest::READABLE).await
.map_err(|e| {
log::error!("read_message_from_stellar(): failed to check if stream is ready: {e:?}");
Error::ReadFailed(e.to_string())
})?;

if read_ready.is_empty() {
log::trace!("read_message_from_stellar(): reading returned empty.");
continue
}

if read_ready.is_read_closed() {
log::trace!("read_message_from_stellar(): read stream is closed");
return Err(Error::Disconnected);
}

if read_ready.is_readable() {
log::trace!("read_message_from_stellar(): stream is read ready");
}

match timeout(Duration::from_secs(timeout_in_secs), r_stream.peek(&mut buff_for_peeking))
.await
{