From 13d839e6c2afa14ec1f8a936370cc1f6f53c3dd1 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 11 Jan 2024 21:11:38 +0800 Subject: [PATCH 01/13] adding trace logs to every function call. adding trace logs for the stream's readiness to read data --- .../connection/connector/message_reader.rs | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 8836fd998..50ae7a046 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -26,22 +26,30 @@ pub(crate) async fn poll_messages_from_stellar( log::info!("poll_messages_from_stellar(): started."); loop { + log::trace!("poll_messages_from_stellar(): start loop"); if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); // close this channel as communication to user was closed. break } + log::trace!("poll_messages_from_stellar(): checking for messages from the user..."); // check for messages from user. match send_to_node_receiver.try_recv() { Ok(msg) => if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => {}, + Err(TryRecvError::Disconnected) => { + log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); + break; + }, + Err(TryRecvError::Empty) => { + log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) received empty messages."); + }, } + log::trace!("poll_messages_from_stellar(): checking for messages from Stellar Node..."); // check for messages from Stellar Node. match read_message_from_stellar(&mut read_stream_overlay, connector.timeout_in_secs).await { Err(e) => { @@ -54,7 +62,9 @@ pub(crate) async fn poll_messages_from_stellar( if let Err(e) = send_to_user_sender.send(stellar_msg).await { log::warn!("poll_messages_from_stellar(): Error occurred during sending message to user: {e:?}"); }, - Ok(_) => {}, + Ok(None) => { + log::trace!("poll_messages_from_stellar(): No message to send to user."); + }, Err(e) => { log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); break @@ -63,6 +73,7 @@ pub(crate) async fn poll_messages_from_stellar( } } + log::trace!("poll_messages_from_stellar(): stop polling for messages..."); // make sure to drop/shutdown the stream connector.write_stream_overlay.forget(); drop(read_stream_overlay); @@ -78,6 +89,8 @@ async fn read_message_from_stellar( r_stream: &mut tcp::OwnedReadHalf, timeout_in_secs: u64, ) -> Result { + log::trace!("read_message_from_stellar(): start"); + // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; @@ -87,6 +100,26 @@ async fn read_message_from_stellar( // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message + let read_ready = r_stream.ready(tokio::io::Interest::READABLE).await + .map_err(|e| { + log::error!("read_message_from_stellar(): failed to check if stream is ready: {e:?}"); + Error::ReadFailed(e.to_string()) + })?; + + if read_ready.is_empty() { + log::trace!("read_message_from_stellar(): reading returned empty."); + continue + } + + if read_ready.is_read_closed() { + log::trace!("read_message_from_stellar(): read stream is closed"); + return Err(Error::Disconnected); + } + + if read_ready.is_readable() { + log::trace!("read_message_from_stellar(): stream is read ready"); + } + match timeout(Duration::from_secs(timeout_in_secs), r_stream.peek(&mut buff_for_peeking)) .await { From 68c98097d5aa215a72ee2fac2f534bd106115e35 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Mon, 15 Jan 2024 19:53:39 +0800 Subject: [PATCH 02/13] adding trace logs for ready status --- clients/stellar-relay-lib/examples/connect.rs | 12 ++++---- .../connection/connector/message_reader.rs | 29 ++++++++++++++++++- .../connection/connector/message_sender.rs | 28 +++++++++++++++++- .../src/connection/helper.rs | 19 ++++++++++++ 4 files changed, 80 insertions(+), 8 deletions(-) diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 94a8ab1be..49092cf79 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -40,12 +40,12 @@ async fn main() -> Result<(), Box> { ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", }; - log::info!( - "{} sent StellarMessage of type {} for ledger {}", - node_id, - stmt_type, - slot - ); + // log::info!( + // "{} sent StellarMessage of type {} for ledger {}", + // node_id, + // stmt_type, + // slot + // ); }, _ => { let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await; diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 50ae7a046..1d80366d9 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -100,7 +100,34 @@ async fn read_message_from_stellar( // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - let read_ready = r_stream.ready(tokio::io::Interest::READABLE).await + match timeout(Duration::from_secs(timeout_in_secs), + r_stream.ready( + tokio::io::Interest::READABLE | + tokio::io::Interest::WRITABLE) + ).await { + Ok(Ok(res)) => { + if res.is_readable() { + log::trace!("read_message_from_stellar(): stream is readable"); + } + + if res.is_writable() { + log::trace!("read_message_from_stellar(): stream is writable"); + } + + if res.is_empty() { + log::trace!("read_message_from_stellar(): stream is empty"); + } + } + Ok(Err(e)) => { + log::error!("read_message_from_stellar(): Error occurred during checking ready status: {e:?}"); + } + Err(_) => { + log::error!("read_message_from_stellar(): timeout elapsed for checking ready status"); + return Err(Error::Timeout) + } + } + let read_ready = + r_stream.ready(tokio::io::Interest::READABLE).await .map_err(|e| { log::error!("read_message_from_stellar(): failed to check if stream is ready: {e:?}"); Error::ReadFailed(e.to_string()) diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index d5504e5c9..775eae459 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,6 +1,7 @@ use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; use tokio::{io::AsyncWriteExt, time::timeout}; +use tokio::io::Interest; use crate::connection::{ flow_controller::MAX_FLOOD_MSG_CAP, @@ -19,7 +20,32 @@ impl Connector { ) .await { - Ok(res) => res.map_err(|e| Error::WriteFailed(e.to_string())), + Ok(res) => { + let result = res.map_err(|e| { + log::error!("send_to_node(): Failed to send message to node: {e:?}"); + Error::WriteFailed(e.to_string()) + }); + + let ready_status = self.write_stream_overlay.ready(Interest::WRITABLE | Interest::READABLE) + .await.map_err(|e| { + log::error!("send_to_node(): Stream not ready for reading or writing: {e:?}"); + Error::ConnectionFailed(e.to_string()) + })?; + + if ready_status.is_readable() { + log::trace!("send_to_node(): stream is readable"); + } + + if ready_status.is_writable() { + log::trace!("send_to_node(): stream is writable"); + } + + if ready_status.is_empty() { + log::trace!("send_to_node(): stream is empty"); + } + + result + }, Err(_) => Err(Error::Timeout), } } diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 051357777..a07c4f150 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -5,6 +5,7 @@ use substrate_stellar_sdk::{ types::{Error, Uint256}, SecretKey, XdrCodec, }; +use tokio::io::Interest; use tokio::net::{tcp, TcpStream}; /// Returns a new BigNumber with a pseudo-random value equal to or greater than 0 and less than 1. @@ -49,5 +50,23 @@ pub async fn create_stream( .await .map_err(|e| crate::Error::ConnectionFailed(e.to_string()))?; + let res = stream.ready(Interest::READABLE | Interest::WRITABLE).await + .map_err(|e| { + log::error!("create_stream(): Stream not ready for reading or writing: {e:?}"); + crate::Error::ConnectionFailed(e.to_string()) + })?; + + if res.is_readable() { + log::trace!("create_stream(): stream is readable"); + } + + if res.is_writable() { + log::trace!("create_stream(): stream is writable"); + } + + if res.is_empty() { + log::trace!("create_stream(): stream is empty"); + } + Ok(stream.into_split()) } From fc3fb12f40c8fd18c817f742161767c57761877d Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Tue, 16 Jan 2024 10:38:39 +0800 Subject: [PATCH 03/13] show peer_addr and local_addr --- .../connection/connector/message_reader.rs | 45 ++++++++++--------- .../src/connection/helper.rs | 4 ++ 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 1d80366d9..8558c5ac3 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -106,6 +106,24 @@ async fn read_message_from_stellar( tokio::io::Interest::WRITABLE) ).await { Ok(Ok(res)) => { + match r_stream.peer_addr() { + Ok(peer_addr) => { + log::trace!("read_message_from_stellar(): peer_addr: {peer_addr}"); + } + Err(e) => { + log::error!("read_message_from_stellar(): no peer addresses returned: {e:?}"); + } + } + + match r_stream.local_addr() { + Ok(local_addr) => { + log::trace!("read_message_from_stellar(): local_addr: {local_addr}"); + } + Err(e) => { + log::error!("read_message_from_stellar(): no local addresses returned: {e:?}"); + } + } + if res.is_readable() { log::trace!("read_message_from_stellar(): stream is readable"); } @@ -117,6 +135,13 @@ async fn read_message_from_stellar( if res.is_empty() { log::trace!("read_message_from_stellar(): stream is empty"); } + + if res.is_read_closed() { + log::error!("read_message_from_stellar(): stream's read half is closed"); + drop(r_stream); + return Err(Error::Disconnected); + } + } Ok(Err(e)) => { log::error!("read_message_from_stellar(): Error occurred during checking ready status: {e:?}"); @@ -126,26 +151,6 @@ async fn read_message_from_stellar( return Err(Error::Timeout) } } - let read_ready = - r_stream.ready(tokio::io::Interest::READABLE).await - .map_err(|e| { - log::error!("read_message_from_stellar(): failed to check if stream is ready: {e:?}"); - Error::ReadFailed(e.to_string()) - })?; - - if read_ready.is_empty() { - log::trace!("read_message_from_stellar(): reading returned empty."); - continue - } - - if read_ready.is_read_closed() { - log::trace!("read_message_from_stellar(): read stream is closed"); - return Err(Error::Disconnected); - } - - if read_ready.is_readable() { - log::trace!("read_message_from_stellar(): stream is read ready"); - } match timeout(Duration::from_secs(timeout_in_secs), r_stream.peek(&mut buff_for_peeking)) .await diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index a07c4f150..2b4a4873c 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -68,5 +68,9 @@ pub async fn create_stream( log::trace!("create_stream(): stream is empty"); } + if res.is_read_closed() { + log::trace!("create_strea(): stream's read half is closed."); + } + Ok(stream.into_split()) } From 828da4bdefa6a129ecc54f730d3dffdd4dc544c0 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Tue, 16 Jan 2024 14:14:08 +0800 Subject: [PATCH 04/13] adding std::net::TcpStream --- .../src/connection/helper.rs | 11 +++++++-- clients/stellar-relay-lib/src/overlay.rs | 24 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 2b4a4873c..49736ca4b 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -45,7 +45,14 @@ pub fn to_base64_xdr_string(msg: &T) -> String { pub async fn create_stream( address: &str, -) -> Result<(tcp::OwnedReadHalf, tcp::OwnedWriteHalf), crate::Error> { +) -> Result<((tcp::OwnedReadHalf, tcp::OwnedWriteHalf), std::net::TcpStream), crate::Error> { + let net_stream = std::net::TcpStream::connect(address) + .map_err(|e| { + log::error!("create_stream(): net stream failed to connect: {e:?}"); + crate::Error::ConnectionFailed(e.to_string()) + })?; + + let stream = TcpStream::connect(address) .await .map_err(|e| crate::Error::ConnectionFailed(e.to_string()))?; @@ -72,5 +79,5 @@ pub async fn create_stream( log::trace!("create_strea(): stream's read half is closed."); } - Ok(stream.into_split()) + Ok((stream.into_split(), net_stream)) } diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 82b3f52ee..3736ed647 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -1,3 +1,4 @@ +use std::io::{Read, Write}; use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; use tokio::sync::{ mpsc, @@ -13,6 +14,8 @@ use crate::{ node::NodeInfo, Error, }; +use crate::helper::time_now; +use crate::xdr_converter::get_xdr_message_length; /// Used to send/receive messages to/from Stellar Node pub struct StellarOverlayConnection { @@ -42,10 +45,29 @@ impl StellarOverlayConnection { let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); // split the stream for easy handling of read and write - let (read_stream_overlay, write_stream_overlay) = + let ((read_stream_overlay, write_stream_overlay), mut net_stream) = create_stream(&conn_info.address()).await?; let mut connector = Connector::new(local_node_info, conn_info, write_stream_overlay); + + let hello = connector.create_hello_message(time_now())?; + let hello = connector.create_xdr_message(hello)?; + + let size = net_stream.write(&hello).map_err(|e| { + log::error!("connect(): Failed to send hello message to net tcpstream: {e:?}"); + Error::WriteFailed(e.to_string()) + })?; + log::trace!("connect(): sent hello message to net tcpstream: {size} bytes"); + + let mut buffer = [0; 4]; + let _ = net_stream.read(&mut buffer[..]) + .map_err(|e| { + log::error!("connect(): Failed to read from net tcpstream: {e:?}"); + Error::ReadFailed(e.to_string()) + })?; + let xdr_msg_len = get_xdr_message_length(&buffer); + log::trace!("connect(): taken from net tcp stream: the xdr msg len is {xdr_msg_len}"); + connector.send_hello_message().await?; tokio::spawn(poll_messages_from_stellar( From 4ec63a44f130bd83233118d8d07d1b6fdc6e68cc Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Tue, 16 Jan 2024 19:23:14 +0800 Subject: [PATCH 05/13] convert std::net::TcpStream to tokio --- .../src/connection/connector/connector.rs | 4 +++ .../src/connection/helper.rs | 11 ++------ clients/stellar-relay-lib/src/overlay.rs | 28 ++++++++++++------- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 05e6a70c1..dd35a44f7 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -135,6 +135,10 @@ impl Connector { } } + pub fn set_write_stream_overlay(&mut self, write_stream_overlay: OwnedWriteHalf) { + self.write_stream_overlay = write_stream_overlay; + } + pub fn local(&self) -> &LocalInfo { &self.local } diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 49736ca4b..2b4a4873c 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -45,14 +45,7 @@ pub fn to_base64_xdr_string(msg: &T) -> String { pub async fn create_stream( address: &str, -) -> Result<((tcp::OwnedReadHalf, tcp::OwnedWriteHalf), std::net::TcpStream), crate::Error> { - let net_stream = std::net::TcpStream::connect(address) - .map_err(|e| { - log::error!("create_stream(): net stream failed to connect: {e:?}"); - crate::Error::ConnectionFailed(e.to_string()) - })?; - - +) -> Result<(tcp::OwnedReadHalf, tcp::OwnedWriteHalf), crate::Error> { let stream = TcpStream::connect(address) .await .map_err(|e| crate::Error::ConnectionFailed(e.to_string()))?; @@ -79,5 +72,5 @@ pub async fn create_stream( log::trace!("create_strea(): stream's read half is closed."); } - Ok((stream.into_split(), net_stream)) + Ok(stream.into_split()) } diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 3736ed647..c9457d6e8 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -1,5 +1,6 @@ use std::io::{Read, Write}; use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; +use tokio::net::TcpStream; use tokio::sync::{ mpsc, mpsc::{ @@ -45,10 +46,18 @@ impl StellarOverlayConnection { let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); // split the stream for easy handling of read and write - let ((read_stream_overlay, write_stream_overlay), mut net_stream) = + let mut net_stream = std::net::TcpStream::connect(&conn_info.address()) + .map_err(|e| { + log::error!("connect(): net stream failed to connect: {e:?}"); + Error::ConnectionFailed(e.to_string()) + })?; + + + let (read_stream_overlay, write_stream_overlay) = create_stream(&conn_info.address()).await?; let mut connector = Connector::new(local_node_info, conn_info, write_stream_overlay); + drop(read_stream_overlay); let hello = connector.create_hello_message(time_now())?; let hello = connector.create_xdr_message(hello)?; @@ -59,16 +68,15 @@ impl StellarOverlayConnection { })?; log::trace!("connect(): sent hello message to net tcpstream: {size} bytes"); - let mut buffer = [0; 4]; - let _ = net_stream.read(&mut buffer[..]) - .map_err(|e| { - log::error!("connect(): Failed to read from net tcpstream: {e:?}"); - Error::ReadFailed(e.to_string()) - })?; - let xdr_msg_len = get_xdr_message_length(&buffer); - log::trace!("connect(): taken from net tcp stream: the xdr msg len is {xdr_msg_len}"); + let new_stream = TcpStream::from_std(net_stream).map_err(|e| { + log::error!("connect(): Failed to convert net tcpstream to tokio tcpstream: {e:?}"); + Error::ConnectionFailed(e.to_string()) + })?; + let (read_stream_overlay, write_stream_overlay) = new_stream.into_split(); + + connector.set_write_stream_overlay(write_stream_overlay); - connector.send_hello_message().await?; + //connector.send_hello_message().await?; tokio::spawn(poll_messages_from_stellar( connector, From de9ac932ff178b2e2f212c5bb759e8b58b8fb9d5 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Wed, 17 Jan 2024 16:46:49 +0800 Subject: [PATCH 06/13] completely change tokio to std::net --- clients/stellar-relay-lib/examples/connect.rs | 12 +- .../src/connection/connector/connector.rs | 90 +++++---- .../connection/connector/message_handler.rs | 16 +- .../connection/connector/message_reader.rs | 185 ++++++------------ .../connection/connector/message_sender.rs | 59 ++---- .../stellar-relay-lib/src/connection/error.rs | 9 + .../src/connection/helper.rs | 15 +- clients/stellar-relay-lib/src/overlay.rs | 40 +--- clients/stellar-relay-lib/src/tests/mod.rs | 11 +- 9 files changed, 160 insertions(+), 277 deletions(-) diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 49092cf79..94a8ab1be 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -40,12 +40,12 @@ async fn main() -> Result<(), Box> { ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", }; - // log::info!( - // "{} sent StellarMessage of type {} for ledger {}", - // node_id, - // stmt_type, - // slot - // ); + log::info!( + "{} sent StellarMessage of type {} for ledger {}", + node_id, + stmt_type, + slot + ); }, _ => { let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await; diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index dd35a44f7..6f9240e2b 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -1,9 +1,12 @@ -use std::fmt::{Debug, Formatter}; +use std::{ + fmt::{Debug, Formatter}, + net::TcpStream, + time::Duration, +}; use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, XdrCodec, }; -use tokio::net::tcp::OwnedWriteHalf; use crate::{ connection::{ @@ -33,7 +36,7 @@ pub struct Connector { flow_controller: FlowController, /// for writing xdr messages to stream. - pub(crate) write_stream_overlay: OwnedWriteHalf, + pub(crate) tcp_stream: TcpStream, } impl Debug for Connector { @@ -109,18 +112,24 @@ impl Connector { } } - pub fn new( - local_node: NodeInfo, - conn_info: ConnectionInfo, - write_stream_overlay: OwnedWriteHalf, - ) -> Self { + /// returns a Connector and starts creating a connection to Stellar + pub fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { let connection_auth = ConnectionAuth::new( &local_node.network_id, conn_info.keypair(), conn_info.auth_cert_expiration, ); - Connector { + let tcp_stream = TcpStream::connect(conn_info.address()) + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; + + if let Err(e) = + tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs))) + { + log::warn!("start(): failed to set ttl: {e:?}"); + } + + let mut connector = Connector { local: LocalInfo::new(local_node), remote_info: None, hmac_keys: None, @@ -131,14 +140,17 @@ impl Connector { receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - write_stream_overlay, - } - } + tcp_stream, + }; + + connector.send_hello_message()?; - pub fn set_write_stream_overlay(&mut self, write_stream_overlay: OwnedWriteHalf) { - self.write_stream_overlay = write_stream_overlay; + Ok(connector) } +} +// getters setters +impl Connector { pub fn local(&self) -> &LocalInfo { &self.local } @@ -211,25 +223,24 @@ impl Connector { self.flow_controller.enable(local_overlay_version, remote_overlay_version) } } - #[cfg(test)] mod test { use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig}; use serial_test::serial; + use std::net::Shutdown; use substrate_stellar_sdk::{ compound_types::LimitedString, types::{Hello, MessageType}, PublicKey, }; - use tokio::{io::AsyncWriteExt, net::tcp::OwnedReadHalf}; use crate::{ connection::{ authentication::{create_auth_cert, ConnectionAuth}, Connector, }, - helper::{create_stream, time_now}, + helper::time_now, node::NodeInfo, ConnectionInfo, }; @@ -249,14 +260,14 @@ mod test { } impl Connector { - fn shutdown(&mut self, read_half: OwnedReadHalf) { - let _ = self.write_stream_overlay.shutdown(); - - drop(read_half); + fn shutdown(&mut self) { + self.tcp_stream + .shutdown(Shutdown::Both) + .expect("should shutdown both read and write of stream"); } } - async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector, OwnedReadHalf) { + async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector) { let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json"; let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet"; let secret_key = @@ -268,16 +279,15 @@ mod test { let conn_info = cfg.connection_info(&secret_key).expect("should create a connection info"); // this is a channel to communicate with the connection/config (this needs renaming) - let (read_half, write_half) = - create_stream(&conn_info.address()).await.expect("should return a stream"); - let connector = Connector::new(node_info.clone(), conn_info.clone(), write_half); - (node_info, conn_info, connector, read_half) + let connector = Connector::start(node_info.clone(), conn_info.clone()) + .expect("should create a connector"); + (node_info, conn_info, connector) } #[tokio::test] #[serial] async fn create_new_connector_works() { - let (node_info, _, mut connector, read_half) = create_connector().await; + let (node_info, _, mut connector) = create_connector().await; let connector_local_node = connector.local.node(); @@ -287,24 +297,24 @@ mod test { assert_eq!(connector_local_node.version_str, node_info.version_str); assert_eq!(connector_local_node.network_id, node_info.network_id); - connector.shutdown(read_half); + connector.shutdown(); } #[tokio::test] #[serial] async fn connector_local_sequence_works() { - let (_, _, mut connector, read_half) = create_connector().await; + let (_, _, mut connector) = create_connector().await; assert_eq!(connector.local_sequence(), 0); connector.increment_local_sequence(); assert_eq!(connector.local_sequence(), 1); - connector.shutdown(read_half); + connector.shutdown(); } #[tokio::test] #[serial] async fn connector_set_remote_works() { - let (_, _, mut connector, read_half) = create_connector().await; + let (_, _, mut connector) = create_connector().await; let connector_auth = &connector.connection_auth; let new_auth_cert = create_auth_cert_from_connection_auth(connector_auth); @@ -324,13 +334,13 @@ mod test { assert!(connector.remote().is_some()); - connector.shutdown(read_half); + connector.shutdown(); } #[tokio::test] #[serial] async fn connector_increment_remote_sequence_works() { - let (_, _, mut connector, read_half) = create_connector().await; + let (_, _, mut connector) = create_connector().await; let connector_auth = &connector.connection_auth; let new_auth_cert = create_auth_cert_from_connection_auth(connector_auth); @@ -354,14 +364,14 @@ mod test { connector.increment_remote_sequence().unwrap(); assert_eq!(connector.remote().unwrap().sequence(), 3); - connector.shutdown(read_half); + connector.shutdown(); } #[tokio::test] #[serial] async fn connector_get_and_set_hmac_keys_works() { //arrange - let (_, _, mut connector, read_half) = create_connector().await; + let (_, _, mut connector) = create_connector().await; let connector_auth = &connector.connection_auth; let new_auth_cert = create_auth_cert_from_connection_auth(connector_auth); @@ -392,13 +402,13 @@ mod test { //assert assert!(connector.hmac_keys().is_some()); - connector.shutdown(read_half); + connector.shutdown(); } #[tokio::test] #[serial] async fn connector_method_works() { - let (_, conn_config, mut connector, read_half) = create_connector().await; + let (_, conn_config, mut connector) = create_connector().await; assert_eq!(connector.remote_called_us(), conn_config.remote_called_us); assert_eq!(connector.receive_tx_messages(), conn_config.recv_tx_msgs); @@ -410,17 +420,17 @@ mod test { connector.handshake_completed(); assert!(connector.is_handshake_created()); - connector.shutdown(read_half); + connector.shutdown(); } #[tokio::test] #[serial] async fn enable_flow_controller_works() { - let (node_info, _, mut connector, read_half) = create_connector().await; + let (node_info, _, mut connector) = create_connector().await; assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage)); connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version); - connector.shutdown(read_half); + connector.shutdown(); } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index 3333f09c5..6434c73ca 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -24,7 +24,7 @@ impl Connector { match msg_type { MessageType::Transaction | MessageType::FloodAdvert if !self.receive_tx_messages() => { self.increment_remote_sequence()?; - self.check_to_send_more(MessageType::Transaction).await?; + self.check_to_send_more(MessageType::Transaction)?; }, MessageType::ScpMessage if !self.receive_scp_messages() => { @@ -76,15 +76,15 @@ impl Connector { self.got_hello(); if self.remote_called_us() { - self.send_hello_message().await?; + self.send_hello_message()?; } else { - self.send_auth_message().await?; + self.send_auth_message()?; } log::info!("process_stellar_message(): Hello message processed successfully"); }, StellarMessage::Auth(_) => { - self.process_auth_message().await?; + self.process_auth_message()?; }, StellarMessage::ErrorMsg(e) => { @@ -101,7 +101,7 @@ impl Connector { log::trace!( "process_stellar_message(): Processing {other:?} message: received from overlay" ); - self.check_to_send_more(msg_type).await?; + self.check_to_send_more(msg_type)?; return Ok(Some(other)) }, } @@ -109,9 +109,9 @@ impl Connector { Ok(None) } - async fn process_auth_message(&mut self) -> Result<(), Error> { + fn process_auth_message(&mut self) -> Result<(), Error> { if self.remote_called_us() { - self.send_auth_message().await?; + self.send_auth_message()?; } self.handshake_completed(); @@ -126,7 +126,7 @@ impl Connector { log::warn!("process_auth_message(): No remote overlay version after handshake."); } - self.check_to_send_more(MessageType::Auth).await + self.check_to_send_more(MessageType::Auth) } /// Updates the config based on the hello message that was received from the Stellar Node diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 8558c5ac3..5030ce2da 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,13 +1,11 @@ use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; -use std::time::Duration; +use std::{ + io::Read, + net::{Shutdown, TcpStream}, +}; use substrate_stellar_sdk::types::StellarMessage; -use tokio::{ - io::AsyncReadExt, - net::{tcp, tcp::OwnedReadHalf}, - sync::{mpsc, mpsc::error::TryRecvError}, - time::timeout, -}; +use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user /// @@ -19,7 +17,6 @@ use tokio::{ /// stream. pub(crate) async fn poll_messages_from_stellar( mut connector: Connector, - mut read_stream_overlay: OwnedReadHalf, send_to_user_sender: mpsc::Sender, mut send_to_node_receiver: mpsc::Receiver, ) { @@ -37,46 +34,57 @@ pub(crate) async fn poll_messages_from_stellar( // check for messages from user. match send_to_node_receiver.try_recv() { Ok(msg) => - if let Err(e) = connector.send_to_node(msg).await { + if let Err(e) = connector.send_to_node(msg) { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, Err(TryRecvError::Disconnected) => { log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); - break; + break }, Err(TryRecvError::Empty) => { log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) received empty messages."); }, } + let stream_clone = + match connector.tcp_stream.try_clone() { + Err(e) => { + log::error!("poll_messages_from_stellar(): Error occurred during cloning tcp stream: {e:?}"); + break + }, + Ok(stream_clone) => stream_clone, + }; + log::trace!("poll_messages_from_stellar(): checking for messages from Stellar Node..."); + // check for messages from Stellar Node. - match read_message_from_stellar(&mut read_stream_overlay, connector.timeout_in_secs).await { + let xdr = match read_message_from_stellar(stream_clone) { Err(e) => { log::error!("poll_messages_from_stellar(): {e:?}"); break }, - Ok(xdr) => match connector.process_raw_message(xdr).await { - Ok(Some(stellar_msg)) => - // push message to user - if let Err(e) = send_to_user_sender.send(stellar_msg).await { - log::warn!("poll_messages_from_stellar(): Error occurred during sending message to user: {e:?}"); - }, - Ok(None) => { - log::trace!("poll_messages_from_stellar(): No message to send to user."); - }, - Err(e) => { - log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); - break + Ok(xdr) => xdr, + }; + + match connector.process_raw_message(xdr).await { + Ok(Some(stellar_msg)) => + // push message to user + if let Err(e) = send_to_user_sender.send(stellar_msg).await { + log::warn!("poll_messages_from_stellar(): Error occurred during sending message to user: {e:?}"); }, + Ok(None) => log::trace!("poll_messages_from_stellar(): No message to send to user."), + Err(e) => { + log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); + break }, } } log::trace!("poll_messages_from_stellar(): stop polling for messages..."); // make sure to drop/shutdown the stream - connector.write_stream_overlay.forget(); - drop(read_stream_overlay); + if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { + log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); + }; send_to_node_receiver.close(); drop(send_to_user_sender); @@ -85,10 +93,7 @@ pub(crate) async fn poll_messages_from_stellar( } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node -async fn read_message_from_stellar( - r_stream: &mut tcp::OwnedReadHalf, - timeout_in_secs: u64, -) -> Result { +fn read_message_from_stellar(mut stream: TcpStream) -> Result { log::trace!("read_message_from_stellar(): start"); // holds the number of bytes that were missing from the previous stellar message. @@ -97,74 +102,18 @@ async fn read_message_from_stellar( let mut buff_for_peeking = vec![0; 4]; loop { + let stream_clone = stream.try_clone()?; // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - match timeout(Duration::from_secs(timeout_in_secs), - r_stream.ready( - tokio::io::Interest::READABLE | - tokio::io::Interest::WRITABLE) - ).await { - Ok(Ok(res)) => { - match r_stream.peer_addr() { - Ok(peer_addr) => { - log::trace!("read_message_from_stellar(): peer_addr: {peer_addr}"); - } - Err(e) => { - log::error!("read_message_from_stellar(): no peer addresses returned: {e:?}"); - } - } - - match r_stream.local_addr() { - Ok(local_addr) => { - log::trace!("read_message_from_stellar(): local_addr: {local_addr}"); - } - Err(e) => { - log::error!("read_message_from_stellar(): no local addresses returned: {e:?}"); - } - } - - if res.is_readable() { - log::trace!("read_message_from_stellar(): stream is readable"); - } - - if res.is_writable() { - log::trace!("read_message_from_stellar(): stream is writable"); - } - - if res.is_empty() { - log::trace!("read_message_from_stellar(): stream is empty"); - } - - if res.is_read_closed() { - log::error!("read_message_from_stellar(): stream's read half is closed"); - drop(r_stream); - return Err(Error::Disconnected); - } - - } - Ok(Err(e)) => { - log::error!("read_message_from_stellar(): Error occurred during checking ready status: {e:?}"); - } - Err(_) => { - log::error!("read_message_from_stellar(): timeout elapsed for checking ready status"); - return Err(Error::Timeout) - } - } - - match timeout(Duration::from_secs(timeout_in_secs), r_stream.peek(&mut buff_for_peeking)) - .await - { - Ok(Ok(0)) => { - log::trace!("read_message_from_stellar(): ERROR: Received 0 size"); - return Err(Error::Disconnected) - }, - - Ok(Ok(_)) if lack_bytes_from_prev == 0 => { + match stream.read(&mut buff_for_peeking) { + Ok(size) if size == 0 => continue, + Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. // If it's not enough, skip it. - let expect_msg_len = next_message_length(r_stream).await; + let expect_msg_len = get_xdr_message_length(&buff_for_peeking); + log::trace!( "read_message_from_stellar(): The next message length is {expect_msg_len}" ); @@ -181,13 +130,11 @@ async fn read_message_from_stellar( readbuf = vec![0; expect_msg_len]; match read_message( - r_stream, + stream_clone, &mut lack_bytes_from_prev, &mut readbuf, expect_msg_len, - ) - .await - { + ) { Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { @@ -196,11 +143,13 @@ async fn read_message_from_stellar( }, } }, + Ok(size) => { + // The next few bytes was read. Add it to the readbuf. + lack_bytes_from_prev -= size; + readbuf.append(&mut buff_for_peeking); - Ok(Ok(_)) => { // let's read the continuation number of bytes from the previous message. - match read_unfinished_message(r_stream, &mut lack_bytes_from_prev, &mut readbuf) - .await + match read_unfinished_message(stream_clone, &mut lack_bytes_from_prev, &mut readbuf) { Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), @@ -210,14 +159,10 @@ async fn read_message_from_stellar( }, } }, - Ok(Err(e)) => { - log::trace!("read_message_from_stellar(): ERROR: {e:?}"); - return Err(Error::ReadFailed(e.to_string())) - }, - Err(_) => { - log::error!("read_message_from_stellar(): timeout elapsed."); - return Err(Error::Timeout) + Err(e) => { + log::trace!("read_message_from_stellar(): ERROR peeking for messages: {e:?}"); + return Err(Error::ReadFailed(e.to_string())) }, } } @@ -231,13 +176,13 @@ async fn read_message_from_stellar( /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message /// * `xpect_msg_len` - the expected # of bytes of the Stellar message -async fn read_message( - r_stream: &mut tcp::OwnedReadHalf, +fn read_message( + mut stream: TcpStream, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, xpect_msg_len: usize, ) -> Result, Error> { - let actual_msg_len = read_stream(r_stream, readbuf).await?; + let actual_msg_len = stream.read(readbuf).map_err(|e| Error::ReadFailed(e.to_string()))?; // only when the message has the exact expected size bytes, should we send to user. if actual_msg_len == xpect_msg_len { @@ -262,15 +207,16 @@ async fn read_message( /// * `r_stream` - the read stream for reading the xdr stellar message /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -async fn read_unfinished_message( - r_stream: &mut tcp::OwnedReadHalf, +fn read_unfinished_message( + mut stream: TcpStream, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, ) -> Result, Error> { // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; - let actual_msg_len = read_stream(r_stream, &mut cont_buf).await?; + let actual_msg_len = + stream.read(&mut cont_buf).map_err(|e| Error::ReadFailed(e.to_string()))?; // this partial message completes the previous message. if actual_msg_len == *lack_bytes_from_prev { @@ -292,20 +238,3 @@ async fn read_unfinished_message( Ok(None) } - -/// checks the length of the next stellar message. -async fn next_message_length(r_stream: &mut tcp::OwnedReadHalf) -> usize { - // let's check for messages. - let mut sizebuf = [0; 4]; - - if r_stream.read(&mut sizebuf).await.unwrap_or(0) == 0 { - return 0 - } - - get_xdr_message_length(&sizebuf) -} - -/// reads data from the stream and store to buffer -async fn read_stream(r_stream: &mut tcp::OwnedReadHalf, buffer: &mut [u8]) -> Result { - r_stream.read(buffer).await.map_err(|e| Error::ReadFailed(e.to_string())) -} diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 775eae459..e2fe5de6e 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,7 +1,5 @@ -use std::time::Duration; +use std::io::Write; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; -use tokio::{io::AsyncWriteExt, time::timeout}; -use tokio::io::Interest; use crate::connection::{ flow_controller::MAX_FLOOD_MSG_CAP, @@ -11,68 +9,35 @@ use crate::connection::{ }; impl Connector { - pub async fn send_to_node(&mut self, msg: StellarMessage) -> Result<(), Error> { + pub fn send_to_node(&mut self, msg: StellarMessage) -> Result<(), Error> { let xdr_msg = &self.create_xdr_message(msg)?; - match timeout( - Duration::from_secs(self.timeout_in_secs), - self.write_stream_overlay.write_all(&xdr_msg), - ) - .await - { - Ok(res) => { - let result = res.map_err(|e| { - log::error!("send_to_node(): Failed to send message to node: {e:?}"); - Error::WriteFailed(e.to_string()) - }); - - let ready_status = self.write_stream_overlay.ready(Interest::WRITABLE | Interest::READABLE) - .await.map_err(|e| { - log::error!("send_to_node(): Stream not ready for reading or writing: {e:?}"); - Error::ConnectionFailed(e.to_string()) - })?; - - if ready_status.is_readable() { - log::trace!("send_to_node(): stream is readable"); - } - - if ready_status.is_writable() { - log::trace!("send_to_node(): stream is writable"); - } - - if ready_status.is_empty() { - log::trace!("send_to_node(): stream is empty"); - } - - result - }, - Err(_) => Err(Error::Timeout), - } + self.tcp_stream.write_all(&xdr_msg).map_err(|e| { + log::error!("send_to_node(): Failed to send message to node: {e:?}"); + Error::WriteFailed(e.to_string()) + }) } - pub async fn send_hello_message(&mut self) -> Result<(), Error> { + pub fn send_hello_message(&mut self) -> Result<(), Error> { let msg = self.create_hello_message(time_now())?; log::info!("send_hello_message(): Sending Hello Message: {}", to_base64_xdr_string(&msg)); - self.send_to_node(msg).await + self.send_to_node(msg) } - pub(super) async fn send_auth_message(&mut self) -> Result<(), Error> { + pub(super) fn send_auth_message(&mut self) -> Result<(), Error> { let msg = create_auth_message(); log::info!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg)); - self.send_to_node(create_auth_message()).await + self.send_to_node(create_auth_message()) } - pub(super) async fn check_to_send_more( - &mut self, - message_type: MessageType, - ) -> Result<(), Error> { + pub(super) fn check_to_send_more(&mut self, message_type: MessageType) -> Result<(), Error> { if !self.inner_check_to_send_more(message_type) { return Ok(()) } let msg = StellarMessage::SendMore(SendMore { num_messages: MAX_FLOOD_MSG_CAP }); - self.send_to_node(msg).await + self.send_to_node(msg) } } diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index 334600383..ba3438e71 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -51,6 +51,9 @@ pub enum Error { #[error(display = "{:?}", _0)] XDRConversionError(XDRError), + #[error(display = "{:?}", _0)] + StdIOError(std::io::Error), + #[error(display = "{:?}", _0)] StellarSdkError(StellarSdkError), @@ -70,6 +73,12 @@ pub enum Error { VersionStrTooLong, } +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::StdIOError(e) + } +} + impl From for Error { fn from(e: XDRError) -> Self { Error::XDRConversionError(e) diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 2b4a4873c..6180311d9 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -5,8 +5,10 @@ use substrate_stellar_sdk::{ types::{Error, Uint256}, SecretKey, XdrCodec, }; -use tokio::io::Interest; -use tokio::net::{tcp, TcpStream}; +use tokio::{ + io::Interest, + net::{tcp, TcpStream}, +}; /// Returns a new BigNumber with a pseudo-random value equal to or greater than 0 and less than 1. pub fn generate_random_nonce() -> Uint256 { @@ -50,11 +52,10 @@ pub async fn create_stream( .await .map_err(|e| crate::Error::ConnectionFailed(e.to_string()))?; - let res = stream.ready(Interest::READABLE | Interest::WRITABLE).await - .map_err(|e| { - log::error!("create_stream(): Stream not ready for reading or writing: {e:?}"); - crate::Error::ConnectionFailed(e.to_string()) - })?; + let res = stream.ready(Interest::READABLE | Interest::WRITABLE).await.map_err(|e| { + log::error!("create_stream(): Stream not ready for reading or writing: {e:?}"); + crate::Error::ConnectionFailed(e.to_string()) + })?; if res.is_readable() { log::trace!("create_stream(): stream is readable"); diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index c9457d6e8..4fe64db6b 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -1,6 +1,4 @@ -use std::io::{Read, Write}; use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; -use tokio::net::TcpStream; use tokio::sync::{ mpsc, mpsc::{ @@ -11,12 +9,10 @@ use tokio::sync::{ use crate::{ connection::{poll_messages_from_stellar, ConnectionInfo, Connector}, - helper::{create_stream, error_to_string}, + helper::error_to_string, node::NodeInfo, Error, }; -use crate::helper::time_now; -use crate::xdr_converter::get_xdr_message_length; /// Used to send/receive messages to/from Stellar Node pub struct StellarOverlayConnection { @@ -45,42 +41,10 @@ impl StellarOverlayConnection { let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); - // split the stream for easy handling of read and write - let mut net_stream = std::net::TcpStream::connect(&conn_info.address()) - .map_err(|e| { - log::error!("connect(): net stream failed to connect: {e:?}"); - Error::ConnectionFailed(e.to_string()) - })?; - - - let (read_stream_overlay, write_stream_overlay) = - create_stream(&conn_info.address()).await?; - - let mut connector = Connector::new(local_node_info, conn_info, write_stream_overlay); - drop(read_stream_overlay); - - let hello = connector.create_hello_message(time_now())?; - let hello = connector.create_xdr_message(hello)?; - - let size = net_stream.write(&hello).map_err(|e| { - log::error!("connect(): Failed to send hello message to net tcpstream: {e:?}"); - Error::WriteFailed(e.to_string()) - })?; - log::trace!("connect(): sent hello message to net tcpstream: {size} bytes"); - - let new_stream = TcpStream::from_std(net_stream).map_err(|e| { - log::error!("connect(): Failed to convert net tcpstream to tokio tcpstream: {e:?}"); - Error::ConnectionFailed(e.to_string()) - })?; - let (read_stream_overlay, write_stream_overlay) = new_stream.into_split(); - - connector.set_write_stream_overlay(write_stream_overlay); - - //connector.send_hello_message().await?; + let connector = Connector::start(local_node_info, conn_info)?; tokio::spawn(poll_messages_from_stellar( connector, - read_stream_overlay, send_to_user_sender, send_to_node_receiver, )); diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index ad138ec2f..a1845e4f0 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -2,7 +2,7 @@ use crate::{ connection::ConnectionInfo, node::NodeInfo, StellarOverlayConfig, StellarOverlayConnection, }; use serial_test::serial; -use std::{sync::Arc, time::Duration}; +use std::{sync::Arc, thread::sleep, time::Duration}; use substrate_stellar_sdk::{ types::{ScpStatementExternalize, ScpStatementPledges, StellarMessage}, Hash, IntoHash, @@ -138,12 +138,17 @@ async fn stellar_overlay_should_receive_tx_set() { #[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_disconnect_works() { + env_logger::init(); let (node_info, conn_info) = overlay_infos(false); let mut overlay_connection = StellarOverlayConnection::connect(node_info.clone(), conn_info).await.unwrap(); - let _ = overlay_connection.listen().await.unwrap(); - + // let it run for a second, before disconnecting. + sleep(Duration::from_secs(1)); overlay_connection.disconnect(); + + // let the disconnection call pass for a few seconds, before checking its status. + sleep(Duration::from_secs(3)); + assert!(!overlay_connection.is_alive()); } From 2d390fb53fd8429db0cfbfded43cb25e308832a9 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 18 Jan 2024 14:27:48 +0800 Subject: [PATCH 07/13] code cleanup, ready for review --- .../src/connection/connector/connector.rs | 7 ++- .../connection/connector/message_handler.rs | 10 ++-- .../connection/connector/message_reader.rs | 46 ++++++++----------- .../src/connection/helper.rs | 35 -------------- clients/stellar-relay-lib/src/tests/mod.rs | 1 - 5 files changed, 28 insertions(+), 71 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 6f9240e2b..55a21e7e8 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -35,7 +35,7 @@ pub struct Connector { handshake_state: HandshakeState, flow_controller: FlowController, - /// for writing xdr messages to stream. + /// for writing/reading xdr messages to/from Stellar Node. pub(crate) tcp_stream: TcpStream, } @@ -120,13 +120,14 @@ impl Connector { conn_info.auth_cert_expiration, ); + // Create the stream let tcp_stream = TcpStream::connect(conn_info.address()) .map_err(|e| Error::ConnectionFailed(e.to_string()))?; if let Err(e) = tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs))) { - log::warn!("start(): failed to set ttl: {e:?}"); + log::warn!("start(): failed to set read timeout for the stream: {e:?}"); } let mut connector = Connector { @@ -143,6 +144,7 @@ impl Connector { tcp_stream, }; + // To start the handshake, send a hello message to Stellar connector.send_hello_message()?; Ok(connector) @@ -223,6 +225,7 @@ impl Connector { self.flow_controller.enable(local_overlay_version, remote_overlay_version) } } + #[cfg(test)] mod test { use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig}; diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index 6434c73ca..d4a280dbd 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -40,7 +40,7 @@ impl Connector { return Err(Error::from(e)) }, other => log::error!( - "process_raw_message(): Received ErroMsg during authentication: {:?}", + "process_raw_message(): Received ErrorMsg during authentication: {:?}", other ), }, @@ -99,8 +99,11 @@ impl Connector { other => { log::trace!( - "process_stellar_message(): Processing {other:?} message: received from overlay" + "process_stellar_message(): Processing {} message: received from overlay", + String::from_utf8(other.to_base64_xdr()) + .unwrap_or(format!("{:?}", other.to_base64_xdr())) ); + self.check_to_send_more(msg_type)?; return Ok(Some(other)) }, @@ -117,13 +120,10 @@ impl Connector { self.handshake_completed(); if let Some(remote) = self.remote() { - log::debug!("process_auth_message(): sending connect message: {remote:?}"); self.enable_flow_controller( self.local().node().overlay_version, remote.node().overlay_version, ); - } else { - log::warn!("process_auth_message(): No remote overlay version after handshake."); } self.check_to_send_more(MessageType::Auth) diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 5030ce2da..ea2fd0e5a 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -3,7 +3,7 @@ use std::{ io::Read, net::{Shutdown, TcpStream}, }; -use substrate_stellar_sdk::types::StellarMessage; +use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; use tokio::sync::{mpsc, mpsc::error::TryRecvError}; @@ -11,7 +11,6 @@ use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// /// # Arguments /// * `connector` - contains the config and necessary info for connecting to Stellar Node -/// * `read_stream_overlay` - the read half of the stream that is connected to Stellar Node /// * `send_to_user_sender` - sends message from Stellar to the user /// * `send_to_node_receiver` - receives message from user and writes it to the write half of the /// stream. @@ -23,14 +22,12 @@ pub(crate) async fn poll_messages_from_stellar( log::info!("poll_messages_from_stellar(): started."); loop { - log::trace!("poll_messages_from_stellar(): start loop"); if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); // close this channel as communication to user was closed. break } - log::trace!("poll_messages_from_stellar(): checking for messages from the user..."); // check for messages from user. match send_to_node_receiver.try_recv() { Ok(msg) => @@ -41,11 +38,10 @@ pub(crate) async fn poll_messages_from_stellar( log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); break }, - Err(TryRecvError::Empty) => { - log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) received empty messages."); - }, + Err(TryRecvError::Empty) => {}, } + // clone the stream to perform a read operation on the next function calls let stream_clone = match connector.tcp_stream.try_clone() { Err(e) => { @@ -55,8 +51,6 @@ pub(crate) async fn poll_messages_from_stellar( Ok(stream_clone) => stream_clone, }; - log::trace!("poll_messages_from_stellar(): checking for messages from Stellar Node..."); - // check for messages from Stellar Node. let xdr = match read_message_from_stellar(stream_clone) { Err(e) => { @@ -69,10 +63,13 @@ pub(crate) async fn poll_messages_from_stellar( match connector.process_raw_message(xdr).await { Ok(Some(stellar_msg)) => // push message to user - if let Err(e) = send_to_user_sender.send(stellar_msg).await { - log::warn!("poll_messages_from_stellar(): Error occurred during sending message to user: {e:?}"); + if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await { + log::warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", + String::from_utf8(stellar_msg.to_base64_xdr()) + .unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr())) + ); }, - Ok(None) => log::trace!("poll_messages_from_stellar(): No message to send to user."), + Ok(None) => {}, Err(e) => { log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); break @@ -80,8 +77,7 @@ pub(crate) async fn poll_messages_from_stellar( } } - log::trace!("poll_messages_from_stellar(): stop polling for messages..."); - // make sure to drop/shutdown the stream + // make sure to shutdown the stream if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); }; @@ -94,30 +90,24 @@ pub(crate) async fn poll_messages_from_stellar( /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node fn read_message_from_stellar(mut stream: TcpStream) -> Result { - log::trace!("read_message_from_stellar(): start"); - // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; - let mut buff_for_peeking = vec![0; 4]; + let mut buff_for_reading = vec![0; 4]; loop { let stream_clone = stream.try_clone()?; // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - match stream.read(&mut buff_for_peeking) { + match stream.read(&mut buff_for_reading) { Ok(size) if size == 0 => continue, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. - // If it's not enough, skip it. - let expect_msg_len = get_xdr_message_length(&buff_for_peeking); - - log::trace!( - "read_message_from_stellar(): The next message length is {expect_msg_len}" - ); + let expect_msg_len = get_xdr_message_length(&buff_for_reading); + // If it's not enough, skip it. if expect_msg_len == 0 { // there's nothing to read; wait for the next iteration log::trace!( @@ -146,7 +136,7 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { Ok(size) => { // The next few bytes was read. Add it to the readbuf. lack_bytes_from_prev -= size; - readbuf.append(&mut buff_for_peeking); + readbuf.append(&mut buff_for_reading); // let's read the continuation number of bytes from the previous message. match read_unfinished_message(stream_clone, &mut lack_bytes_from_prev, &mut readbuf) @@ -161,7 +151,7 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { }, Err(e) => { - log::trace!("read_message_from_stellar(): ERROR peeking for messages: {e:?}"); + log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); return Err(Error::ReadFailed(e.to_string())) }, } @@ -172,7 +162,7 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { /// This reads a number of bytes based on the expected message length. /// /// # Arguments -/// * `r_stream` - the read stream for reading the xdr stellar message +/// * `stream` - the TcpStream for reading the xdr stellar message /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message /// * `xpect_msg_len` - the expected # of bytes of the Stellar message @@ -204,7 +194,7 @@ fn read_message( /// Reads a continuation of bytes that belong to the previous message /// /// # Arguments -/// * `r_stream` - the read stream for reading the xdr stellar message +/// * `stream` - the TcpStream for reading the xdr stellar message /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message fn read_unfinished_message( diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 6180311d9..57f9877b7 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -5,10 +5,6 @@ use substrate_stellar_sdk::{ types::{Error, Uint256}, SecretKey, XdrCodec, }; -use tokio::{ - io::Interest, - net::{tcp, TcpStream}, -}; /// Returns a new BigNumber with a pseudo-random value equal to or greater than 0 and less than 1. pub fn generate_random_nonce() -> Uint256 { @@ -44,34 +40,3 @@ pub fn to_base64_xdr_string(msg: &T) -> String { let xdr = msg.to_base64_xdr(); String::from_utf8(xdr.clone()).unwrap_or(format!("{:?}", xdr)) } - -pub async fn create_stream( - address: &str, -) -> Result<(tcp::OwnedReadHalf, tcp::OwnedWriteHalf), crate::Error> { - let stream = TcpStream::connect(address) - .await - .map_err(|e| crate::Error::ConnectionFailed(e.to_string()))?; - - let res = stream.ready(Interest::READABLE | Interest::WRITABLE).await.map_err(|e| { - log::error!("create_stream(): Stream not ready for reading or writing: {e:?}"); - crate::Error::ConnectionFailed(e.to_string()) - })?; - - if res.is_readable() { - log::trace!("create_stream(): stream is readable"); - } - - if res.is_writable() { - log::trace!("create_stream(): stream is writable"); - } - - if res.is_empty() { - log::trace!("create_stream(): stream is empty"); - } - - if res.is_read_closed() { - log::trace!("create_strea(): stream's read half is closed."); - } - - Ok(stream.into_split()) -} diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index a1845e4f0..1f91a17ed 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -138,7 +138,6 @@ async fn stellar_overlay_should_receive_tx_set() { #[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_disconnect_works() { - env_logger::init(); let (node_info, conn_info) = overlay_infos(false); let mut overlay_connection = From 1a71b00b91f844da967f8c4f25abf39921506a70 Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Thu, 18 Jan 2024 14:30:56 +0800 Subject: [PATCH 08/13] remove unnecessary features of tokio --- clients/stellar-relay-lib/Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index 3ebd68324..50bb38442 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -36,9 +36,7 @@ err-derive = "0.3.1" tokio = { version = "1.0", features = [ "macros", # allows main function to be async "rt-multi-thread", # for multi-thread runtime - "net", # contains the TcpStream "sync", # to make channels available - "io-util", # for async read/write operations "time" # for timeouts and sleep, when reconnecting ] } From 3ab5d1c98897808fdab3d80e3018bc31b96f7cce Mon Sep 17 00:00:00 2001 From: b-yap <2826165+b-yap@users.noreply.github.com> Date: Fri, 19 Jan 2024 21:19:08 +0800 Subject: [PATCH 09/13] from async to just normal funcs --- clients/stellar-relay-lib/examples/connect.rs | 2 +- clients/stellar-relay-lib/src/overlay.rs | 2 +- clients/stellar-relay-lib/src/tests/mod.rs | 4 ++-- clients/vault/src/oracle/agent.rs | 9 +++++---- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 94a8ab1be..9dd308798 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -27,7 +27,7 @@ async fn main() -> Result<(), Box> { let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?; - while let Ok(Some(msg)) = overlay_connection.listen().await { + while let Ok(Some(msg)) = overlay_connection.listen() { match msg { StellarMessage::ScpMessage(msg) => { let node_id = msg.statement.node_id.to_encoding(); diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 4fe64db6b..637e1cda5 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -55,7 +55,7 @@ impl StellarOverlayConnection { }) } - pub async fn listen(&mut self) -> Result, Error> { + pub fn listen(&mut self) -> Result, Error> { loop { if !self.is_alive() { self.disconnect(); diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 1f91a17ed..0bd51ff45 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -56,7 +56,7 @@ async fn stellar_overlay_should_receive_scp_messages() { timeout(Duration::from_secs(300), async move { let mut ov_conn_locked = ov_conn.lock().await; - if let Ok(Some(msg)) = ov_conn_locked.listen().await { + if let Ok(Some(msg)) = ov_conn_locked.listen() { scps_vec_clone.lock().await.push(msg); ov_conn_locked.disconnect(); @@ -93,7 +93,7 @@ async fn stellar_overlay_should_receive_tx_set() { timeout(Duration::from_secs(500), async move { let mut ov_conn_locked = ov_conn.lock().await; - while let Ok(Some(msg)) = ov_conn_locked.listen().await { + while let Ok(Some(msg)) = ov_conn_locked.listen() { match msg { StellarMessage::ScpMessage(msg) => if let ScpStatementPledges::ScpStExternalize(stmt) = &msg.statement.pledges { diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 83bd70978..8945bf878 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -98,7 +98,7 @@ pub async fn start_oracle_agent( } // listen for messages from Stellar - match overlay_conn.listen().await { + match overlay_conn.listen() { Ok(Some(msg)) => { let msg_as_str = to_base64_xdr_string(&msg); if let Err(e) = @@ -127,7 +127,7 @@ pub async fn start_oracle_agent( }); tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); if let Err(e) = disconnect_signal_sender.send(()).await { tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); } @@ -193,7 +193,7 @@ impl OracleAgent { /// Stops listening for new SCP messages. pub fn stop(&self) -> Result<(), Error> { - tracing::debug!("stop(): Shutting down OracleAgent..."); + tracing::info!("stop(): Shutting down OracleAgent..."); if let Err(e) = self.shutdown_sender.send(()) { tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); } @@ -309,6 +309,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_fails_without_archives() { + env_logger::init(); let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); @@ -320,7 +321,6 @@ mod tests { let agent = start_oracle_agent(modified_config, &get_test_secret_key(true), shutdown) .await .expect("Failed to start agent"); - sleep(Duration::from_secs(5)).await; // This slot should be archived on the public network let target_slot = 44041116; @@ -332,6 +332,7 @@ mod tests { let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); + println!("HOY PLEAAASE"); agent.stop().expect("Failed to stop the agent"); } } From d28282643f1b616cc489d5314f78069ab2ec1f86 Mon Sep 17 00:00:00 2001 From: Gianfranco Tasteri Date: Fri, 19 Jan 2024 18:07:45 -0300 Subject: [PATCH 10/13] testing fixes for test not stopping --- .../src/connection/connector/connector.rs | 12 ++- .../connection/connector/message_handler.rs | 16 ++-- .../connection/connector/message_reader.rs | 85 +++++++++++-------- .../connection/connector/message_sender.rs | 46 +++++++--- clients/stellar-relay-lib/src/overlay.rs | 2 +- clients/vault/src/oracle/agent.rs | 2 +- 6 files changed, 101 insertions(+), 62 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 55a21e7e8..f9bb8d56a 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -1,6 +1,7 @@ use std::{ fmt::{Debug, Formatter}, net::TcpStream, + sync::{Arc, Mutex}, time::Duration, }; use substrate_stellar_sdk::{ @@ -36,7 +37,7 @@ pub struct Connector { flow_controller: FlowController, /// for writing/reading xdr messages to/from Stellar Node. - pub(crate) tcp_stream: TcpStream, + pub(crate) tcp_stream: Arc>, } impl Debug for Connector { @@ -113,7 +114,7 @@ impl Connector { } /// returns a Connector and starts creating a connection to Stellar - pub fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { + pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { let connection_auth = ConnectionAuth::new( &local_node.network_id, conn_info.keypair(), @@ -141,11 +142,11 @@ impl Connector { receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - tcp_stream, + tcp_stream: Arc::new(Mutex::new(tcp_stream)), }; // To start the handshake, send a hello message to Stellar - connector.send_hello_message()?; + connector.send_hello_message().await?; Ok(connector) } @@ -265,6 +266,8 @@ mod test { impl Connector { fn shutdown(&mut self) { self.tcp_stream + .lock() + .unwrap() .shutdown(Shutdown::Both) .expect("should shutdown both read and write of stream"); } @@ -283,6 +286,7 @@ mod test { // this is a channel to communicate with the connection/config (this needs renaming) let connector = Connector::start(node_info.clone(), conn_info.clone()) + .await .expect("should create a connector"); (node_info, conn_info, connector) } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index d4a280dbd..87d2c2fde 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -24,7 +24,7 @@ impl Connector { match msg_type { MessageType::Transaction | MessageType::FloodAdvert if !self.receive_tx_messages() => { self.increment_remote_sequence()?; - self.check_to_send_more(MessageType::Transaction)?; + self.check_to_send_more(MessageType::Transaction).await?; }, MessageType::ScpMessage if !self.receive_scp_messages() => { @@ -76,15 +76,15 @@ impl Connector { self.got_hello(); if self.remote_called_us() { - self.send_hello_message()?; + self.send_hello_message().await?; } else { - self.send_auth_message()?; + self.send_auth_message().await?; } log::info!("process_stellar_message(): Hello message processed successfully"); }, StellarMessage::Auth(_) => { - self.process_auth_message()?; + self.process_auth_message().await?; }, StellarMessage::ErrorMsg(e) => { @@ -104,7 +104,7 @@ impl Connector { .unwrap_or(format!("{:?}", other.to_base64_xdr())) ); - self.check_to_send_more(msg_type)?; + self.check_to_send_more(msg_type).await?; return Ok(Some(other)) }, } @@ -112,9 +112,9 @@ impl Connector { Ok(None) } - fn process_auth_message(&mut self) -> Result<(), Error> { + async fn process_auth_message(&mut self) -> Result<(), Error> { if self.remote_called_us() { - self.send_auth_message()?; + self.send_auth_message().await?; } self.handshake_completed(); @@ -126,7 +126,7 @@ impl Connector { ); } - self.check_to_send_more(MessageType::Auth) + self.check_to_send_more(MessageType::Auth).await } /// Updates the config based on the hello message that was received from the Stellar Node diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index ea2fd0e5a..7bf633a6b 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -2,9 +2,11 @@ use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, use std::{ io::Read, net::{Shutdown, TcpStream}, + sync::{Arc, Mutex}, }; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; +use std::{thread, time::Duration}; use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user @@ -20,7 +22,7 @@ pub(crate) async fn poll_messages_from_stellar( mut send_to_node_receiver: mpsc::Receiver, ) { log::info!("poll_messages_from_stellar(): started."); - + // clone the stream to perform a read operation on the next function calls loop { if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); @@ -31,7 +33,7 @@ pub(crate) async fn poll_messages_from_stellar( // check for messages from user. match send_to_node_receiver.try_recv() { Ok(msg) => - if let Err(e) = connector.send_to_node(msg) { + if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, Err(TryRecvError::Disconnected) => { @@ -41,18 +43,13 @@ pub(crate) async fn poll_messages_from_stellar( Err(TryRecvError::Empty) => {}, } - // clone the stream to perform a read operation on the next function calls - let stream_clone = - match connector.tcp_stream.try_clone() { - Err(e) => { - log::error!("poll_messages_from_stellar(): Error occurred during cloning tcp stream: {e:?}"); - break - }, - Ok(stream_clone) => stream_clone, - }; - // check for messages from Stellar Node. - let xdr = match read_message_from_stellar(stream_clone) { + let stream_clone = connector.tcp_stream.clone(); + // Spawn a blocking task to read from the stream + let xdr_result = read_message_from_stellar(stream_clone).await; + + // Check the result of the blocking task + let xdr = match xdr_result { Err(e) => { log::error!("poll_messages_from_stellar(): {e:?}"); break @@ -76,32 +73,39 @@ pub(crate) async fn poll_messages_from_stellar( }, } } - // make sure to shutdown the stream - if let Err(e) = connector.tcp_stream.shutdown(Shutdown::Both) { + if let Err(e) = connector.tcp_stream.clone().lock().unwrap().shutdown(Shutdown::Both) { log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); }; - send_to_node_receiver.close(); drop(send_to_user_sender); - log::debug!("poll_messages_from_stellar(): stopped."); + log::info!("poll_messages_from_stellar(): stopped."); } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node -fn read_message_from_stellar(mut stream: TcpStream) -> Result { +async fn read_message_from_stellar(stream_clone: Arc>) -> Result { // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; - let mut buff_for_reading = vec![0; 4]; + loop { - let stream_clone = stream.try_clone()?; // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - match stream.read(&mut buff_for_reading) { - Ok(size) if size == 0 => continue, + // Temporary scope for locking + let result = { + let mut stream = stream_clone.lock().unwrap(); + stream.read(&mut buff_for_reading) + }; + + match result { + Ok(size) if size == 0 => { + // No data available to read + tokio::task::yield_now().await; + continue + }, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. @@ -113,19 +117,20 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { log::trace!( "read_message_from_stellar(): Nothing left to read; waiting for next loop" ); + tokio::task::yield_now().await; continue } - - // let's start reading the actual stellar message. readbuf = vec![0; expect_msg_len]; - match read_message( - stream_clone, + stream_clone.clone(), &mut lack_bytes_from_prev, &mut readbuf, expect_msg_len, ) { - Ok(None) => continue, + Ok(None) => { + tokio::task::yield_now().await; + continue + }, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { log::trace!("read_message_from_stellar(): ERROR: {e:?}"); @@ -139,9 +144,15 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { readbuf.append(&mut buff_for_reading); // let's read the continuation number of bytes from the previous message. - match read_unfinished_message(stream_clone, &mut lack_bytes_from_prev, &mut readbuf) - { - Ok(None) => continue, + match read_unfinished_message( + stream_clone.clone(), + &mut lack_bytes_from_prev, + &mut readbuf, + ) { + Ok(None) => { + tokio::task::yield_now().await; + continue + }, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { log::trace!("read_message_from_stellar(): ERROR: {e:?}"); @@ -149,7 +160,7 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { }, } }, - + Err(e) => { log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); return Err(Error::ReadFailed(e.to_string())) @@ -167,11 +178,12 @@ fn read_message_from_stellar(mut stream: TcpStream) -> Result { /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message /// * `xpect_msg_len` - the expected # of bytes of the Stellar message fn read_message( - mut stream: TcpStream, + stream: Arc>, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, xpect_msg_len: usize, ) -> Result, Error> { + let mut stream = stream.lock().unwrap(); let actual_msg_len = stream.read(readbuf).map_err(|e| Error::ReadFailed(e.to_string()))?; // only when the message has the exact expected size bytes, should we send to user. @@ -198,15 +210,18 @@ fn read_message( /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message fn read_unfinished_message( - mut stream: TcpStream, + mut stream: Arc>, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, ) -> Result, Error> { // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; - let actual_msg_len = - stream.read(&mut cont_buf).map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = stream + .lock() + .unwrap() + .read(&mut cont_buf) + .map_err(|e| Error::ReadFailed(e.to_string()))?; // this partial message completes the previous message. if actual_msg_len == *lack_bytes_from_prev { diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index e2fe5de6e..1f1a0916a 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -9,35 +9,55 @@ use crate::connection::{ }; impl Connector { - pub fn send_to_node(&mut self, msg: StellarMessage) -> Result<(), Error> { - let xdr_msg = &self.create_xdr_message(msg)?; - - self.tcp_stream.write_all(&xdr_msg).map_err(|e| { - log::error!("send_to_node(): Failed to send message to node: {e:?}"); - Error::WriteFailed(e.to_string()) - }) + pub async fn send_to_node(&mut self, msg: StellarMessage) -> Result<(), Error> { + // Create the XDR message outside the closure + let xdr_msg = self.create_xdr_message(msg)?; + + // Clone the TcpStream (or its Arc> wrapper) + let stream_clone = self.tcp_stream.clone(); + + // this may really not be necessary + let write_result = tokio::task::spawn_blocking(move || { + let mut stream = stream_clone.lock().unwrap(); + stream.write_all(&xdr_msg).map_err(|e| { + log::error!("send_to_node(): Failed to send message to node: {e:?}"); + Error::WriteFailed(e.to_string()) + }) + }); + + // Await the result of the blocking task + match write_result.await { + Ok(result) => result, + Err(e) => { + log::error!("send_to_node(): Error occurred in blocking task: {e:?}"); + Err(Error::WriteFailed(e.to_string())) + }, + } } - pub fn send_hello_message(&mut self) -> Result<(), Error> { + pub async fn send_hello_message(&mut self) -> Result<(), Error> { let msg = self.create_hello_message(time_now())?; log::info!("send_hello_message(): Sending Hello Message: {}", to_base64_xdr_string(&msg)); - self.send_to_node(msg) + self.send_to_node(msg).await } - pub(super) fn send_auth_message(&mut self) -> Result<(), Error> { + pub(super) async fn send_auth_message(&mut self) -> Result<(), Error> { let msg = create_auth_message(); log::info!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg)); - self.send_to_node(create_auth_message()) + self.send_to_node(create_auth_message()).await } - pub(super) fn check_to_send_more(&mut self, message_type: MessageType) -> Result<(), Error> { + pub(super) async fn check_to_send_more( + &mut self, + message_type: MessageType, + ) -> Result<(), Error> { if !self.inner_check_to_send_more(message_type) { return Ok(()) } let msg = StellarMessage::SendMore(SendMore { num_messages: MAX_FLOOD_MSG_CAP }); - self.send_to_node(msg) + self.send_to_node(msg).await } } diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 637e1cda5..fcf0e440d 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -41,7 +41,7 @@ impl StellarOverlayConnection { let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); - let connector = Connector::start(local_node_info, conn_info)?; + let connector = Connector::start(local_node_info, conn_info).await?; tokio::spawn(poll_messages_from_stellar( connector, diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 8945bf878..4e6026fb7 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -153,7 +153,7 @@ impl OracleAgent { let collector = self.collector.clone(); #[cfg(test)] - let timeout_seconds = 180; + let timeout_seconds = 60; #[cfg(not(test))] let timeout_seconds = 60; From d089c0fa270637d88f690ecef8f183ec0eb23aa3 Mon Sep 17 00:00:00 2001 From: Gianfranco Tasteri Date: Fri, 19 Jan 2024 18:36:12 -0300 Subject: [PATCH 11/13] set timeout for tests to 180 --- clients/vault/src/oracle/agent.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 4e6026fb7..8945bf878 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -153,7 +153,7 @@ impl OracleAgent { let collector = self.collector.clone(); #[cfg(test)] - let timeout_seconds = 60; + let timeout_seconds = 180; #[cfg(not(test))] let timeout_seconds = 60; From 3e8c6a97470ecea3baf746b02a76e927457bca1d Mon Sep 17 00:00:00 2001 From: Gianfranco Tasteri Date: Fri, 19 Jan 2024 19:11:39 -0300 Subject: [PATCH 12/13] fixes warnings --- .../src/connection/connector/message_reader.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 7bf633a6b..7b48daf99 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -6,7 +6,6 @@ use std::{ }; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; -use std::{thread, time::Duration}; use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user @@ -160,7 +159,7 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul }, } }, - + Err(e) => { log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); return Err(Error::ReadFailed(e.to_string())) @@ -210,7 +209,7 @@ fn read_message( /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message fn read_unfinished_message( - mut stream: Arc>, + stream: Arc>, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, ) -> Result, Error> { From c52db4f932e4303855b981f9fdfa1f7f40586b4f Mon Sep 17 00:00:00 2001 From: "B. Yap" <2826165+b-yap@users.noreply.github.com> Date: Fri, 26 Jan 2024 13:10:31 +0800 Subject: [PATCH 13/13] Replace tokio's TcpStream with async-std (#481) * async-std * works, but still in progress * working test withouth extra signals * remove comments and re-add connector drop trait * cleanup * fix the failing test about current slot * fix the failing test about current slot, by connecting to specifically different nodes * update config files * use a different account for testing * fix rustfmt --------- Co-authored-by: Gianfranco --- Cargo.lock | 119 ++++++++++++++++++ clients/stellar-relay-lib/Cargo.toml | 1 + .../src/connection/connector/connector.rs | 79 ++++++------ .../connection/connector/message_handler.rs | 3 +- .../connection/connector/message_reader.rs | 107 +++++++--------- .../connection/connector/message_sender.rs | 31 ++--- .../stellar-relay-lib/src/connection/error.rs | 9 -- clients/stellar-relay-lib/src/overlay.rs | 9 +- clients/stellar-relay-lib/src/tests/mod.rs | 10 +- clients/vault/Cargo.toml | 1 + .../stellar_relay_config_sdftest1.json | 2 +- .../stellar_relay_config_sdftest2.json | 2 +- .../stellar_relay_config_sdftest3.json | 2 +- clients/vault/src/oracle/agent.rs | 50 +++++--- .../vault/src/oracle/collector/collector.rs | 4 +- clients/vault/src/oracle/storage/impls.rs | 6 +- clients/vault/src/oracle/testing_utils.rs | 38 ++++-- clients/vault/tests/helper/mod.rs | 4 +- 18 files changed, 293 insertions(+), 184 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10847a510..6a2563ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -393,6 +403,35 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-executor" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b0c4a4f319e45986f347ee47fef8bf5e81c9abc3f6f58dc2391439f30df65f0" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + [[package]] name = "async-io" version = "1.13.0" @@ -422,6 +461,39 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite 0.2.10", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "async-trait" version = "0.1.72" @@ -695,6 +767,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" +[[package]] +name = "blocking" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.0", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + [[package]] name = "bounded-collections" version = "0.1.8" @@ -3826,6 +3914,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "kvdb" version = "0.13.0" @@ -4495,6 +4592,9 @@ name = "log" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -6022,6 +6122,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.0", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -9965,6 +10076,7 @@ dependencies = [ name = "stellar-relay-lib" version = "1.0.3" dependencies = [ + "async-std", "base64 0.13.1", "env_logger 0.9.3", "err-derive", @@ -11119,10 +11231,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cdbaf5e132e593e9fc1de6a15bbec912395b11fb9719e061cf64f804524c503" + [[package]] name = "vault" version = "1.0.3" dependencies = [ + "async-std", "async-trait", "base64 0.13.1", "bincode", diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index 50bb38442..5446a6534 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -39,6 +39,7 @@ tokio = { version = "1.0", features = [ "sync", # to make channels available "time" # for timeouts and sleep, when reconnecting ] } +async-std = { version = "1.12.0", features = ["attributes"] } [features] std = [ diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index f9bb8d56a..a588a46de 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -1,8 +1,7 @@ +use async_std::net::TcpStream; use std::{ fmt::{Debug, Formatter}, - net::TcpStream, - sync::{Arc, Mutex}, - time::Duration, + net::Shutdown, }; use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, @@ -37,7 +36,7 @@ pub struct Connector { flow_controller: FlowController, /// for writing/reading xdr messages to/from Stellar Node. - pub(crate) tcp_stream: Arc>, + pub(crate) tcp_stream: TcpStream, } impl Debug for Connector { @@ -53,10 +52,32 @@ impl Debug for Connector { .field("receive_scp_messages", &self.receive_scp_messages) .field("handshake_state", &self.handshake_state) .field("flow_controller", &self.flow_controller) + .field( + "local_addr", + &self + .tcp_stream + .local_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()), + ) + .field( + "peer_addr", + &self + .tcp_stream + .peer_addr() + .map(|addr| addr.to_string()) + .unwrap_or("cannot provide".to_string()), + ) .finish() } } +impl Drop for Connector { + fn drop(&mut self) { + self.stop(); + } +} + impl Connector { /// Verifies the AuthenticatedMessage, received from the Stellar Node pub(super) fn verify_auth( @@ -115,22 +136,17 @@ impl Connector { /// returns a Connector and starts creating a connection to Stellar pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result { + // Create the stream + let tcp_stream = TcpStream::connect(conn_info.address()) + .await + .map_err(|e| Error::ConnectionFailed(e.to_string()))?; + let connection_auth = ConnectionAuth::new( &local_node.network_id, conn_info.keypair(), conn_info.auth_cert_expiration, ); - // Create the stream - let tcp_stream = TcpStream::connect(conn_info.address()) - .map_err(|e| Error::ConnectionFailed(e.to_string()))?; - - if let Err(e) = - tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs))) - { - log::warn!("start(): failed to set read timeout for the stream: {e:?}"); - } - let mut connector = Connector { local: LocalInfo::new(local_node), remote_info: None, @@ -142,7 +158,7 @@ impl Connector { receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - tcp_stream: Arc::new(Mutex::new(tcp_stream)), + tcp_stream, }; // To start the handshake, send a hello message to Stellar @@ -150,6 +166,12 @@ impl Connector { Ok(connector) } + + pub fn stop(&mut self) { + if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { + log::error!("stop(): failed to shutdown tcp stream: {}", e); + } + } } // getters setters @@ -231,7 +253,6 @@ impl Connector { mod test { use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig}; use serial_test::serial; - use std::net::Shutdown; use substrate_stellar_sdk::{ compound_types::LimitedString, @@ -263,16 +284,6 @@ mod test { new_auth_cert } - impl Connector { - fn shutdown(&mut self) { - self.tcp_stream - .lock() - .unwrap() - .shutdown(Shutdown::Both) - .expect("should shutdown both read and write of stream"); - } - } - async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector) { let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json"; let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet"; @@ -294,7 +305,7 @@ mod test { #[tokio::test] #[serial] async fn create_new_connector_works() { - let (node_info, _, mut connector) = create_connector().await; + let (node_info, _, connector) = create_connector().await; let connector_local_node = connector.local.node(); @@ -303,8 +314,6 @@ mod test { assert_eq!(connector_local_node.overlay_min_version, node_info.overlay_min_version); assert_eq!(connector_local_node.version_str, node_info.version_str); assert_eq!(connector_local_node.network_id, node_info.network_id); - - connector.shutdown(); } #[tokio::test] @@ -314,8 +323,6 @@ mod test { assert_eq!(connector.local_sequence(), 0); connector.increment_local_sequence(); assert_eq!(connector.local_sequence(), 1); - - connector.shutdown(); } #[tokio::test] @@ -340,8 +347,6 @@ mod test { connector.set_remote(RemoteInfo::new(&hello)); assert!(connector.remote().is_some()); - - connector.shutdown(); } #[tokio::test] @@ -370,8 +375,6 @@ mod test { connector.increment_remote_sequence().unwrap(); connector.increment_remote_sequence().unwrap(); assert_eq!(connector.remote().unwrap().sequence(), 3); - - connector.shutdown(); } #[tokio::test] @@ -408,8 +411,6 @@ mod test { )); //assert assert!(connector.hmac_keys().is_some()); - - connector.shutdown(); } #[tokio::test] @@ -426,8 +427,6 @@ mod test { connector.handshake_completed(); assert!(connector.is_handshake_created()); - - connector.shutdown(); } #[tokio::test] @@ -437,7 +436,5 @@ mod test { assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage)); connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version); - - connector.shutdown(); } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index 87d2c2fde..b4101ea71 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -103,7 +103,6 @@ impl Connector { String::from_utf8(other.to_base64_xdr()) .unwrap_or(format!("{:?}", other.to_base64_xdr())) ); - self.check_to_send_more(msg_type).await?; return Ok(Some(other)) }, @@ -124,6 +123,8 @@ impl Connector { self.local().node().overlay_version, remote.node().overlay_version, ); + } else { + log::warn!("process_auth_message(): No remote overlay version after handshake."); } self.check_to_send_more(MessageType::Auth).await diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 7b48daf99..4e308c7ad 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,11 +1,6 @@ use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; -use std::{ - io::Read, - net::{Shutdown, TcpStream}, - sync::{Arc, Mutex}, -}; +use async_std::io::ReadExt; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; - use tokio::sync::{mpsc, mpsc::error::TryRecvError}; /// Polls for messages coming from the Stellar Node and communicates it back to the user @@ -21,7 +16,7 @@ pub(crate) async fn poll_messages_from_stellar( mut send_to_node_receiver: mpsc::Receiver, ) { log::info!("poll_messages_from_stellar(): started."); - // clone the stream to perform a read operation on the next function calls + loop { if send_to_user_sender.is_closed() { log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); @@ -35,20 +30,12 @@ pub(crate) async fn poll_messages_from_stellar( if let Err(e) = connector.send_to_node(msg).await { log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, - Err(TryRecvError::Disconnected) => { - log::trace!("poll_messages_from_stellar(): Recv channel (for sending message to node) got disconnected."); - break - }, + Err(TryRecvError::Disconnected) => break, Err(TryRecvError::Empty) => {}, } // check for messages from Stellar Node. - let stream_clone = connector.tcp_stream.clone(); - // Spawn a blocking task to read from the stream - let xdr_result = read_message_from_stellar(stream_clone).await; - - // Check the result of the blocking task - let xdr = match xdr_result { + let xdr = match read_message_from_stellar(&mut connector).await { Err(e) => { log::error!("poll_messages_from_stellar(): {e:?}"); break @@ -72,18 +59,17 @@ pub(crate) async fn poll_messages_from_stellar( }, } } - // make sure to shutdown the stream - if let Err(e) = connector.tcp_stream.clone().lock().unwrap().shutdown(Shutdown::Both) { - log::error!("poll_messages_from_stellar(): Failed to shutdown the tcp stream: {e:?}"); - }; + + // make sure to shutdown the connector + connector.stop(); send_to_node_receiver.close(); drop(send_to_user_sender); - log::info!("poll_messages_from_stellar(): stopped."); + log::debug!("poll_messages_from_stellar(): stopped."); } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node -async fn read_message_from_stellar(stream_clone: Arc>) -> Result { +async fn read_message_from_stellar(connector: &mut Connector) -> Result { // holds the number of bytes that were missing from the previous stellar message. let mut lack_bytes_from_prev = 0; let mut readbuf: Vec = vec![]; @@ -93,18 +79,8 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul // check whether or not we should read the bytes as: // 1. the length of the next stellar message // 2. the remaining bytes of the previous stellar message - // Temporary scope for locking - let result = { - let mut stream = stream_clone.lock().unwrap(); - stream.read(&mut buff_for_reading) - }; - - match result { - Ok(size) if size == 0 => { - // No data available to read - tokio::task::yield_now().await; - continue - }, + match connector.tcp_stream.read(&mut buff_for_reading).await { + Ok(size) if size == 0 => continue, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. @@ -113,23 +89,22 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul // If it's not enough, skip it. if expect_msg_len == 0 { // there's nothing to read; wait for the next iteration - log::trace!( - "read_message_from_stellar(): Nothing left to read; waiting for next loop" - ); - tokio::task::yield_now().await; + log::trace!("read_message_from_stellar(): expect_msg_len == 0"); continue } + + // let's start reading the actual stellar message. readbuf = vec![0; expect_msg_len]; + match read_message( - stream_clone.clone(), + connector, &mut lack_bytes_from_prev, &mut readbuf, expect_msg_len, - ) { - Ok(None) => { - tokio::task::yield_now().await; - continue - }, + ) + .await + { + Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { log::trace!("read_message_from_stellar(): ERROR: {e:?}"); @@ -141,17 +116,14 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul // The next few bytes was read. Add it to the readbuf. lack_bytes_from_prev -= size; readbuf.append(&mut buff_for_reading); + // make sure to cleanup the buffer + buff_for_reading = vec![0; 4]; // let's read the continuation number of bytes from the previous message. - match read_unfinished_message( - stream_clone.clone(), - &mut lack_bytes_from_prev, - &mut readbuf, - ) { - Ok(None) => { - tokio::task::yield_now().await; - continue - }, + match read_unfinished_message(connector, &mut lack_bytes_from_prev, &mut readbuf) + .await + { + Ok(None) => continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { log::trace!("read_message_from_stellar(): ERROR: {e:?}"); @@ -172,18 +144,22 @@ async fn read_message_from_stellar(stream_clone: Arc>) -> Resul /// This reads a number of bytes based on the expected message length. /// /// # Arguments -/// * `stream` - the TcpStream for reading the xdr stellar message +/// * `connector` - a ref struct that contains the config and necessary info for connecting to +/// Stellar Node /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message /// * `xpect_msg_len` - the expected # of bytes of the Stellar message -fn read_message( - stream: Arc>, +async fn read_message( + connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, xpect_msg_len: usize, ) -> Result, Error> { - let mut stream = stream.lock().unwrap(); - let actual_msg_len = stream.read(readbuf).map_err(|e| Error::ReadFailed(e.to_string()))?; + let actual_msg_len = connector + .tcp_stream + .read(readbuf) + .await + .map_err(|e| Error::ReadFailed(e.to_string()))?; // only when the message has the exact expected size bytes, should we send to user. if actual_msg_len == xpect_msg_len { @@ -205,21 +181,22 @@ fn read_message( /// Reads a continuation of bytes that belong to the previous message /// /// # Arguments -/// * `stream` - the TcpStream for reading the xdr stellar message +/// * `connector` - a ref struct that contains the config and necessary info for connecting to +/// Stellar Node /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -fn read_unfinished_message( - stream: Arc>, +async fn read_unfinished_message( + connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, ) -> Result, Error> { // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; - let actual_msg_len = stream - .lock() - .unwrap() + let actual_msg_len = connector + .tcp_stream .read(&mut cont_buf) + .await .map_err(|e| Error::ReadFailed(e.to_string()))?; // this partial message completes the previous message. diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 1f1a0916a..1145dddb6 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,5 +1,7 @@ -use std::io::Write; +use async_std::io::WriteExt; +use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; +use tokio::time::timeout; use crate::connection::{ flow_controller::MAX_FLOOD_MSG_CAP, @@ -13,25 +15,14 @@ impl Connector { // Create the XDR message outside the closure let xdr_msg = self.create_xdr_message(msg)?; - // Clone the TcpStream (or its Arc> wrapper) - let stream_clone = self.tcp_stream.clone(); - - // this may really not be necessary - let write_result = tokio::task::spawn_blocking(move || { - let mut stream = stream_clone.lock().unwrap(); - stream.write_all(&xdr_msg).map_err(|e| { - log::error!("send_to_node(): Failed to send message to node: {e:?}"); - Error::WriteFailed(e.to_string()) - }) - }); - - // Await the result of the blocking task - match write_result.await { - Ok(result) => result, - Err(e) => { - log::error!("send_to_node(): Error occurred in blocking task: {e:?}"); - Err(Error::WriteFailed(e.to_string())) - }, + match timeout( + Duration::from_secs(self.timeout_in_secs), + self.tcp_stream.write_all(&xdr_msg), + ) + .await + { + Ok(res) => res.map_err(|e| Error::WriteFailed(e.to_string())), + Err(_) => Err(Error::Timeout), } } diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index ba3438e71..334600383 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -51,9 +51,6 @@ pub enum Error { #[error(display = "{:?}", _0)] XDRConversionError(XDRError), - #[error(display = "{:?}", _0)] - StdIOError(std::io::Error), - #[error(display = "{:?}", _0)] StellarSdkError(StellarSdkError), @@ -73,12 +70,6 @@ pub enum Error { VersionStrTooLong, } -impl From for Error { - fn from(e: std::io::Error) -> Self { - Error::StdIOError(e) - } -} - impl From for Error { fn from(e: XDRError) -> Self { Error::XDRConversionError(e) diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index fcf0e440d..c311fa29e 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -58,7 +58,6 @@ impl StellarOverlayConnection { pub fn listen(&mut self) -> Result, Error> { loop { if !self.is_alive() { - self.disconnect(); return Err(Error::Disconnected) } @@ -82,20 +81,20 @@ impl StellarOverlayConnection { let is_closed = self.sender.is_closed(); if is_closed { - self.disconnect(); + self.stop(); } !is_closed } - pub fn disconnect(&mut self) { - log::info!("disconnect(): closing connection to overlay network"); + pub fn stop(&mut self) { + log::info!("stop(): closing connection to overlay network"); self.receiver.close(); } } impl Drop for StellarOverlayConnection { fn drop(&mut self) { - self.disconnect(); + self.stop(); } } diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index 0bd51ff45..26e77bf72 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -1,13 +1,13 @@ use crate::{ connection::ConnectionInfo, node::NodeInfo, StellarOverlayConfig, StellarOverlayConnection, }; +use async_std::{future::timeout, sync::Mutex}; use serial_test::serial; use std::{sync::Arc, thread::sleep, time::Duration}; use substrate_stellar_sdk::{ types::{ScpStatementExternalize, ScpStatementPledges, StellarMessage}, Hash, IntoHash, }; -use tokio::{sync::Mutex, time::timeout}; fn secret_key(is_mainnet: bool) -> String { let path = if is_mainnet { @@ -59,7 +59,7 @@ async fn stellar_overlay_should_receive_scp_messages() { if let Ok(Some(msg)) = ov_conn_locked.listen() { scps_vec_clone.lock().await.push(msg); - ov_conn_locked.disconnect(); + ov_conn_locked.stop(); } }) .await @@ -107,15 +107,11 @@ async fn stellar_overlay_should_receive_tx_set() { StellarMessage::TxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - ov_conn_locked.disconnect(); break }, StellarMessage::GeneralizedTxSet(set) => { let tx_set_hash = set.into_hash().expect("should return a hash"); actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - ov_conn_locked.disconnect(); break }, _ => {}, @@ -145,7 +141,7 @@ async fn stellar_overlay_disconnect_works() { // let it run for a second, before disconnecting. sleep(Duration::from_secs(1)); - overlay_connection.disconnect(); + overlay_connection.stop(); // let the disconnection call pass for a few seconds, before checking its status. sleep(Duration::from_secs(3)); diff --git a/clients/vault/Cargo.toml b/clients/vault/Cargo.toml index f91f12b06..4dfd5c6a7 100644 --- a/clients/vault/Cargo.toml +++ b/clients/vault/Cargo.toml @@ -20,6 +20,7 @@ parachain-metadata-foucoco = ["runtime/parachain-metadata-foucoco"] integration-test = ["runtime/standalone-metadata", "integration"] [dependencies] +async-std = "1.12.0" async-trait = "0.1.40" base64 = { version = '0.13.0', default-features = false, features = ['alloc'] } bincode = "1.3.3" diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json index cee8b5f6b..d82db5ad8 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json @@ -7,7 +7,7 @@ "ledger_version": 20, "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json index 894a57110..057dd44cf 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json @@ -7,7 +7,7 @@ "ledger_version": 20, "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json index 1f0a7291f..c9334f817 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json @@ -7,7 +7,7 @@ "ledger_version": 20, "overlay_version": 31, "overlay_min_version": 27, - "version_str": "stellar-core 20.1.0 (114b833e755400178a57142f45b7fb892ddb034f)", + "version_str": "stellar-core 20.2.0.rc1 (3076c138d77735c6ce8230886a540f4d54d85c59)", "is_pub_net": false }, "stellar_history_archive_urls": [] diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index 8945bf878..77ff5fff5 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -91,7 +91,6 @@ pub async fn start_oracle_agent( // if a disconnect signal was sent, disconnect from Stellar. Ok(_) | Err(TryRecvError::Disconnected) => { tracing::info!("start_oracle_agent(): disconnect overlay..."); - overlay_conn.disconnect(); break }, Err(TryRecvError::Empty) => {}, @@ -112,7 +111,6 @@ pub async fn start_oracle_agent( Ok(None) => {}, // connection got lost Err(e) => { - overlay_conn.disconnect(); tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); if let Err(e) = shutdown_sender_clone2.send(()) { @@ -124,10 +122,13 @@ pub async fn start_oracle_agent( }, } } + + // shutdown the overlay connection + overlay_conn.stop(); }); tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection..."); if let Err(e) = disconnect_signal_sender.send(()).await { tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); } @@ -141,6 +142,12 @@ pub async fn start_oracle_agent( }) } +impl Drop for OracleAgent { + fn drop(&mut self) { + self.stop(); + } +} + impl OracleAgent { /// This method returns the proof for a given slot or an error if the proof cannot be provided. /// The agent will try every possible way to get the proof before returning an error. @@ -192,19 +199,18 @@ impl OracleAgent { } /// Stops listening for new SCP messages. - pub fn stop(&self) -> Result<(), Error> { - tracing::info!("stop(): Shutting down OracleAgent..."); + pub fn stop(&self) { + tracing::debug!("stop(): Shutting down OracleAgent..."); if let Err(e) = self.shutdown_sender.send(()) { tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e); } - Ok(()) } } #[cfg(test)] mod tests { use crate::oracle::{ - get_random_secret_key, get_test_secret_key, get_test_stellar_relay_config, + get_random_secret_key, get_test_secret_key, specific_stellar_relay_config, traits::ArchiveStorage, ScpArchiveStorage, TransactionsArchiveStorage, }; @@ -215,11 +221,15 @@ mod tests { #[ntest::timeout(1_800_000)] // timeout at 30 minutes #[serial] async fn test_get_proof_for_current_slot() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let shutdown_sender = ShutdownSender::new(); // We use a random secret key to avoid conflicts with other tests. let agent = start_oracle_agent( - get_test_stellar_relay_config(true), + specific_stellar_relay_config(true, 0), &get_random_secret_key(), shutdown_sender, ) @@ -244,12 +254,16 @@ mod tests { #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); let shutdown_sender = ShutdownSender::new(); let agent = start_oracle_agent( - get_test_stellar_relay_config(true), + specific_stellar_relay_config(true, 1), &get_test_secret_key(true), shutdown_sender, ) @@ -266,17 +280,19 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - agent.stop().expect("Failed to stop the agent"); } #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_with_fallback() { + // let it run for a few seconds, making sure that the other tests have successfully shutdown + // their connection to Stellar Node + sleep(Duration::from_secs(2)).await; + let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let base_config = get_test_stellar_relay_config(true); + let base_config = specific_stellar_relay_config(true, 2); // We add two fake archive urls to the config to make sure that the agent will actually fall // back to other archives. let mut archive_urls = base_config.stellar_history_archive_urls().clone(); @@ -302,18 +318,15 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - agent.stop().expect("Failed to stop the agent"); } #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_fails_without_archives() { - env_logger::init(); let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let base_config = get_test_stellar_relay_config(true); + let base_config = specific_stellar_relay_config(true, 0); let modified_config: StellarOverlayConfig = StellarOverlayConfig { stellar_history_archive_urls: vec![], ..base_config }; @@ -324,6 +337,8 @@ mod tests { // This slot should be archived on the public network let target_slot = 44041116; + tracing::info!("let's sleep for 3 seconds,to get the network up and running"); + sleep(Duration::from_secs(3)).await; let proof_result = agent.get_proof(target_slot).await; assert!(matches!(proof_result, Err(Error::ProofTimeout(_)))); @@ -331,8 +346,5 @@ mod tests { // These might return an error if the file does not exist, but that's fine. let _ = scp_archive_storage.remove_file(target_slot); let _ = tx_archive_storage.remove_file(target_slot); - - println!("HOY PLEAAASE"); - agent.stop().expect("Failed to stop the agent"); } } diff --git a/clients/vault/src/oracle/collector/collector.rs b/clients/vault/src/oracle/collector/collector.rs index 3a9e3aac9..97f014c98 100644 --- a/clients/vault/src/oracle/collector/collector.rs +++ b/clients/vault/src/oracle/collector/collector.rs @@ -234,7 +234,7 @@ mod test { use crate::oracle::{ collector::{collector::AddTxSet, ScpMessageCollector}, - get_test_stellar_relay_config, + random_stellar_relay_config, traits::FileHandler, EnvelopesFileHandler, }; @@ -273,7 +273,7 @@ mod test { } fn stellar_history_archive_urls() -> Vec { - get_test_stellar_relay_config(true).stellar_history_archive_urls() + random_stellar_relay_config(true).stellar_history_archive_urls() } #[test] diff --git a/clients/vault/src/oracle/storage/impls.rs b/clients/vault/src/oracle/storage/impls.rs index 28be05d8a..e4dd33677 100644 --- a/clients/vault/src/oracle/storage/impls.rs +++ b/clients/vault/src/oracle/storage/impls.rs @@ -160,8 +160,8 @@ mod test { use crate::oracle::{ constants::MAX_SLOTS_PER_FILE, errors::Error, - get_test_stellar_relay_config, impls::ArchiveStorage, + random_stellar_relay_config, storage::{ traits::{FileHandler, FileHandlerExt}, EnvelopesFileHandler, @@ -174,7 +174,7 @@ mod test { impl Default for ScpArchiveStorage { fn default() -> Self { - let cfg = get_test_stellar_relay_config(true); + let cfg = random_stellar_relay_config(true); let archive_urls = cfg.stellar_history_archive_urls(); let archive_url = archive_urls.first().expect("should have an archive url"); ScpArchiveStorage(archive_url.clone()) @@ -183,7 +183,7 @@ mod test { impl Default for TransactionsArchiveStorage { fn default() -> Self { - let cfg = get_test_stellar_relay_config(true); + let cfg = random_stellar_relay_config(true); let archive_urls = cfg.stellar_history_archive_urls(); let archive_url = archive_urls.first().expect("should have an archive url"); TransactionsArchiveStorage(archive_url.clone()) diff --git a/clients/vault/src/oracle/testing_utils.rs b/clients/vault/src/oracle/testing_utils.rs index e5e10760b..6f245cd6e 100644 --- a/clients/vault/src/oracle/testing_utils.rs +++ b/clients/vault/src/oracle/testing_utils.rs @@ -1,19 +1,43 @@ use stellar_relay_lib::sdk::SecretKey; -pub fn get_test_stellar_relay_config(is_mainnet: bool) -> stellar_relay_lib::StellarOverlayConfig { +pub fn random_stellar_relay_config(is_mainnet: bool) -> stellar_relay_lib::StellarOverlayConfig { use rand::seq::SliceRandom; - let stellar_node_points: Vec<&str> = if is_mainnet { + let (stellar_node_points, dir) = stellar_relay_config_choices(is_mainnet); + + let node_point = stellar_node_points + .choose(&mut rand::thread_rng()) + .expect("should return a value"); + + stellar_relay_config_abs_path(dir, node_point) +} + +pub fn specific_stellar_relay_config( + is_mainnet: bool, + index: usize, +) -> stellar_relay_lib::StellarOverlayConfig { + let (stellar_node_points, dir) = stellar_relay_config_choices(is_mainnet); + + let node_point = stellar_node_points.get(index).expect("should return a value"); + + stellar_relay_config_abs_path(dir, node_point) +} + +fn stellar_relay_config_choices(is_mainnet: bool) -> (Vec<&'static str>, &'static str) { + let node_points = if is_mainnet { vec!["frankfurt", "iowa", "singapore"] } else { vec!["sdftest1", "sdftest2", "sdftest3"] }; - let dir = if is_mainnet { "mainnet" } else { "testnet" }; - let res = stellar_node_points - .choose(&mut rand::thread_rng()) - .expect("should return a value"); - let path_string = format!("./resources/config/{dir}/stellar_relay_config_{res}.json"); + let dir = if is_mainnet { "mainnet" } else { "testnet" }; + (node_points, dir) +} +fn stellar_relay_config_abs_path( + dir: &str, + node_point: &str, +) -> stellar_relay_lib::StellarOverlayConfig { + let path_string = format!("./resources/config/{dir}/stellar_relay_config_{node_point}.json"); stellar_relay_lib::StellarOverlayConfig::try_from_path(path_string.as_str()) .expect("should be able to extract config") diff --git a/clients/vault/tests/helper/mod.rs b/clients/vault/tests/helper/mod.rs index 7747f0b10..caa17e9b5 100644 --- a/clients/vault/tests/helper/mod.rs +++ b/clients/vault/tests/helper/mod.rs @@ -20,7 +20,7 @@ use std::{future::Future, sync::Arc}; use stellar_relay_lib::StellarOverlayConfig; use tokio::sync::RwLock; use vault::{ - oracle::{get_test_secret_key, get_test_stellar_relay_config, start_oracle_agent, OracleAgent}, + oracle::{get_test_secret_key, random_stellar_relay_config, start_oracle_agent, OracleAgent}, ArcRwLock, }; use wallet::StellarWallet; @@ -28,7 +28,7 @@ use wallet::StellarWallet; pub type StellarPublicKey = [u8; 32]; lazy_static! { - pub static ref CFG: StellarOverlayConfig = get_test_stellar_relay_config(false); + pub static ref CFG: StellarOverlayConfig = random_stellar_relay_config(false); pub static ref SECRET_KEY: String = get_test_secret_key(false); // TODO clean this up by extending the `get_test_secret_key()` function pub static ref DESTINATION_SECRET_KEY: String = "SDNQJEIRSA6YF5JNS6LQLCBF2XVWZ2NJV3YLC322RGIBJIJRIRGWKLEF".to_string();