diff --git a/rustyrpc/examples/client.rs b/rustyrpc/examples/client.rs index 14e70fb..fd466ca 100644 --- a/rustyrpc/examples/client.rs +++ b/rustyrpc/examples/client.rs @@ -50,7 +50,7 @@ async fn main() { tokio::time::sleep(Duration::from_secs(2)).await; // Waiting to allow HelloService deallocation request to be sent. } -async fn start_healthcheck( +async fn start_healthcheck( hello_service_client: HelloServiceClient, ) where for<'a> RequestKind<'a>: Encode, diff --git a/rustyrpc/examples/common/auth_service.rs b/rustyrpc/examples/common/auth_service.rs index 52f6d35..2158cd8 100644 --- a/rustyrpc/examples/common/auth_service.rs +++ b/rustyrpc/examples/common/auth_service.rs @@ -104,13 +104,13 @@ where } #[derive_where(Clone)] -pub struct AuthServiceClient { +pub struct AuthServiceClient { service_kind: ServiceKind, service_id: usize, rpc_client: Arc>, } -impl +impl AuthServiceClient where for<'a> RequestKind<'a>: Encode, @@ -142,8 +142,8 @@ where } } -impl ServiceClient - for AuthServiceClient +impl + ServiceClient for AuthServiceClient { const SERVICE_NAME: &'static str = SERVICE_NAME; const SERVICE_CHECKSUM: &'static [u8] = SERVICE_CHECKSUM; diff --git a/rustyrpc/examples/common/hello_service.rs b/rustyrpc/examples/common/hello_service.rs index dfc442e..68385f1 100644 --- a/rustyrpc/examples/common/hello_service.rs +++ b/rustyrpc/examples/common/hello_service.rs @@ -71,7 +71,7 @@ where } } -pub struct HelloServiceClient +pub struct HelloServiceClient where for<'a> RequestKind<'a>: Encode, ServiceCallRequestResult: Decode, @@ -82,8 +82,8 @@ where rpc_client: Arc>, } -impl ServiceClient - for HelloServiceClient +impl + ServiceClient for HelloServiceClient where for<'a> RequestKind<'a>: Encode, ServiceCallRequestResult: Decode, @@ -105,7 +105,7 @@ where } } -impl +impl HelloServiceClient where for<'a> RequestKind<'a>: Encode, @@ -128,7 +128,7 @@ where } } -impl Drop +impl Drop for HelloServiceClient where for<'a> RequestKind<'a>: Encode, diff --git a/rustyrpc/src/client.rs b/rustyrpc/src/client.rs index 6686506..cd21550 100644 --- a/rustyrpc/src/client.rs +++ b/rustyrpc/src/client.rs @@ -16,12 +16,13 @@ use crate::{ }; /// RPC client for calling remote services. -pub struct Client { +pub struct Client { connection: Mutex>>, _format: PhantomData, } -impl Client +impl + Client where for<'a> RequestKind<'a>: Encode, { @@ -125,7 +126,7 @@ where } } -impl From +impl From for Client { fn from(connection: Connection) -> Self { diff --git a/rustyrpc/src/server/client_connection.rs b/rustyrpc/src/server/client_connection.rs index d671b56..0da0968 100644 --- a/rustyrpc/src/server/client_connection.rs +++ b/rustyrpc/src/server/client_connection.rs @@ -8,12 +8,13 @@ use crate::{ use super::call_stream::CallStream; -pub(crate) struct ClientConnection { +pub(crate) struct ClientConnection +{ connection: Connection, _format: PhantomData, } -impl +impl ClientConnection { pub(crate) async fn accept_call_stream( @@ -23,7 +24,7 @@ impl } } -impl From +impl From for ClientConnection { fn from(connection: Connection) -> Self { diff --git a/rustyrpc/src/server/private_service/service_ref.rs b/rustyrpc/src/server/private_service/service_ref.rs index 9cd146b..7377cdc 100644 --- a/rustyrpc/src/server/private_service/service_ref.rs +++ b/rustyrpc/src/server/private_service/service_ref.rs @@ -14,7 +14,7 @@ impl ServiceRef { /// Creates service client from reference and [rpc client][Client] pub fn into_client< ServiceClient: service::ServiceClient, - Connection: transport::Connection, + Connection: transport::ClientConnection, Format: EncodingFormat, >( self, diff --git a/rustyrpc/src/service.rs b/rustyrpc/src/service.rs index 53a6ed2..5a8d7dd 100644 --- a/rustyrpc/src/service.rs +++ b/rustyrpc/src/service.rs @@ -11,7 +11,7 @@ use crate::{ }; /// Service client for interaction with specific remote service. -pub trait ServiceClient +pub trait ServiceClient where Self: Sized, { diff --git a/rustyrpc/src/transport.rs b/rustyrpc/src/transport.rs index e80c1c8..570192a 100644 --- a/rustyrpc/src/transport.rs +++ b/rustyrpc/src/transport.rs @@ -40,23 +40,34 @@ impl T { } } -/// Transport specific incoming connection. +/// Transport specific connection. pub trait Connection: Send + 'static { + /// Close connection. + fn close(self) -> impl Future> + Send; +} + +/// Transport specific connection on client side. +pub trait ClientConnection: Connection { /// Stream produced by connection. type Stream: Stream + 'static; /// Create new stream and notify other side of connection about it. fn new_stream(&mut self) -> impl Future> + Send; +} + +/// Transport specific connection on server side. +pub trait ServerConnection: Connection { + /// Stream produced by connection. + type Stream: Stream + 'static; + /// Accept new stream created by other side of connection. fn accept_stream(&mut self) -> impl Future> + Send; - /// Close connection. - fn close(self) -> impl Future> + Send; } /// Transport specific incoming connections listener like a [`TcpListener`][`std::net::TcpListener`] or others pub trait ConnectionListener: Send { /// Connection produced by listener - type Connection: Connection; + type Connection: ServerConnection; /// Accepts a new connection fn accept_connection(&mut self) -> impl Future>; diff --git a/rustyrpc/src/transport/quic.rs b/rustyrpc/src/transport/quic.rs index 2bc9723..f3c30cd 100644 --- a/rustyrpc/src/transport/quic.rs +++ b/rustyrpc/src/transport/quic.rs @@ -1,16 +1,15 @@ +mod connection; mod listener; -use core::{net::SocketAddr, num::TryFromIntError}; -use quinn::{ClientConfig, Endpoint, RecvStream, SendStream, StoppedError, VarInt}; +use core::num::TryFromIntError; +use quinn::{RecvStream, SendStream, StoppedError}; use std::io; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +pub use connection::Connection; pub use listener::ConnectionListener; use thiserror::Error; -/// Connection via QUIC protocol. -pub struct Connection(quinn::Connection); - /// Stream via QUIC protocol. pub struct Stream { send_stream: BufWriter, @@ -92,49 +91,3 @@ impl From<(SendStream, RecvStream)> for Stream { } } } - -impl super::Connection for Connection { - type Stream = Stream; - - async fn new_stream(&mut self) -> io::Result { - Ok(self.0.open_bi().await?.into()) - } - - async fn accept_stream(&mut self) -> io::Result { - Ok(self.0.accept_bi().await?.into()) - } - - async fn close(self) -> io::Result<()> { - self.0.close(VarInt::from_u32(0), b"Client is closed"); - Ok(()) - } -} - -impl Connection { - /// Establishes connection to server via QUIC protocol. - /// - /// # Errors - /// Returns error on fail of connection establishment. - pub async fn connect( - client_config: ClientConfig, - local_address: SocketAddr, - address: SocketAddr, - server_name: &str, - ) -> io::Result { - let mut endpoint = Endpoint::client(local_address)?; - endpoint.set_default_client_config(client_config); - - let connection = endpoint - .connect(address, server_name) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))? - .await?; - - Ok(connection.into()) - } -} - -impl From for Connection { - fn from(connection: quinn::Connection) -> Self { - Self(connection) - } -} diff --git a/rustyrpc/src/transport/quic/connection.rs b/rustyrpc/src/transport/quic/connection.rs new file mode 100644 index 0000000..1894feb --- /dev/null +++ b/rustyrpc/src/transport/quic/connection.rs @@ -0,0 +1,63 @@ +use core::net::SocketAddr; +use std::io; + +use quinn::{ClientConfig, Endpoint, VarInt}; + +use crate::transport; + +use super::Stream; + +/// Connection via QUIC protocol. +pub struct Connection(quinn::Connection); + +impl transport::Connection for Connection { + async fn close(self) -> io::Result<()> { + self.0.close(VarInt::from_u32(0), b"Client is closed"); + Ok(()) + } +} + +impl transport::ServerConnection for Connection { + type Stream = Stream; + + async fn accept_stream(&mut self) -> io::Result { + Ok(self.0.accept_bi().await?.into()) + } +} + +impl transport::ClientConnection for Connection { + type Stream = Stream; + + async fn new_stream(&mut self) -> io::Result { + Ok(self.0.open_bi().await?.into()) + } +} + +impl Connection { + /// Establishes connection to server via QUIC protocol. + /// + /// # Errors + /// Returns error on fail of connection establishment. + pub async fn connect( + client_config: ClientConfig, + local_address: SocketAddr, + address: SocketAddr, + server_name: &str, + ) -> io::Result { + let mut endpoint = Endpoint::client(local_address)?; + endpoint.set_default_client_config(client_config); + + let connection = endpoint + .connect(address, server_name) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))? + .await?; + + Ok(connection.into()) + } +} + +impl From for Connection { + fn from(connection: quinn::Connection) -> Self { + Self(connection) + } +} diff --git a/rustyrpc/src/transport/quic/listener.rs b/rustyrpc/src/transport/quic/listener.rs index 54cfc4f..5af53cc 100644 --- a/rustyrpc/src/transport/quic/listener.rs +++ b/rustyrpc/src/transport/quic/listener.rs @@ -3,7 +3,7 @@ use std::io; use quinn::{Endpoint, ServerConfig}; -use super::Connection; +use super::connection::Connection; /// Listener for incoming connections via QUIC protocol. pub struct ConnectionListener(quinn::Endpoint); @@ -18,7 +18,7 @@ impl crate::transport::ConnectionListener for ConnectionListener { .await .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Endpoint is closed"))? .await - .map(Connection)?) + .map(Into::into)?) } }