From cde93d8897c576f1e349b86c163263ec933d38a7 Mon Sep 17 00:00:00 2001 From: Cheuk <90270663+cheukt@users.noreply.github.com> Date: Mon, 21 Nov 2022 13:25:03 -0500 Subject: [PATCH] Update CODEOWNERS (#11) Co-authored-by: Ethan --- .github/CODEOWNERS | 1 + Cargo.toml | 2 +- src/rpc/base_channel.rs | 6 +-- src/rpc/client_channel.rs | 28 +++++++------- src/rpc/dial.rs | 20 +++++----- src/rpc/webrtc.rs | 80 +++++++++++++++++++-------------------- 6 files changed, 64 insertions(+), 73 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 4900e54..d315c76 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1,2 @@ * @npmenard @stuqdog +* @viamrobotics/sdk-netcode diff --git a/Cargo.toml b/Cargo.toml index 5458f20..ac9d9d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ tower-http = { version = "0.3.3", features = ["add-extension","auth","propagate- tracing = {version = "0.1.34"} tracing-subscriber = {version = "0.3.11", features = ["env-filter"]} webpki-roots = "0.21.1" -webrtc = "0.5.0" +webrtc = "0.6.0" [build-dependencies] diff --git a/src/rpc/base_channel.rs b/src/rpc/base_channel.rs index 820634a..71b8647 100644 --- a/src/rpc/base_channel.rs +++ b/src/rpc/base_channel.rs @@ -49,8 +49,7 @@ impl WebRTCBaseChannel { pc.get_stats_id() ); }) - })) - .await; + })); let channel = Arc::new(Self { peer_connection, @@ -70,8 +69,7 @@ impl WebRTCBaseChannel { log::error!("error closing channel: {e}") } }) - })) - .await; + })); channel } diff --git a/src/rpc/client_channel.rs b/src/rpc/client_channel.rs index 0f397bd..0ffc8c1 100644 --- a/src/rpc/client_channel.rs +++ b/src/rpc/client_channel.rs @@ -75,22 +75,20 @@ impl WebRTCClientChannel { let ret_channel = channel.clone(); let channel = Arc::downgrade(&channel); - data_channel - .on_message(Box::new(move |msg: DataChannelMessage| { - let channel = channel.clone(); - Box::pin(async move { - let channel = match channel.upgrade() { - Some(channel) => channel, - None => { - return; - } - }; - if let Err(e) = channel.on_channel_message(msg).await { - log::error!("error deserializing message: {e}"); + data_channel.on_message(Box::new(move |msg: DataChannelMessage| { + let channel = channel.clone(); + Box::pin(async move { + let channel = match channel.upgrade() { + Some(channel) => channel, + None => { + return; } - }) - })) - .await; + }; + if let Err(e) = channel.on_channel_message(msg).await { + log::error!("error deserializing message: {e}"); + } + }) + })); log::debug!("Client channel created"); ret_channel } diff --git a/src/rpc/dial.rs b/src/rpc/dial.rs index b4ba1c9..3334ae1 100644 --- a/src/rpc/dial.rs +++ b/src/rpc/dial.rs @@ -494,12 +494,10 @@ async fn maybe_connect_via_webrtc( let uuid_for_ice_gathering_thread = uuid_lock.clone(); let is_open = Arc::new(AtomicBool::new(false)); let is_open_read = is_open.clone(); - data_channel - .on_open(Box::new(move || { - is_open.store(true, Ordering::Release); - Box::pin(async move {}) - })) - .await; + data_channel.on_open(Box::new(move || { + is_open.store(true, Ordering::Release); + Box::pin(async move {}) + })); let exchange_done = Arc::new(AtomicBool::new(false)); let remote_description_set = Arc::new(AtomicBool::new(false)); @@ -514,8 +512,8 @@ async fn maybe_connect_via_webrtc( let exchange_done = exchange_done.clone(); let remote_description_set = remote_description_set.clone(); - peer_connection - .on_ice_candidate(Box::new(move |ice_candidate: Option| { + peer_connection.on_ice_candidate(Box::new( + move |ice_candidate: Option| { let remote_description_set = remote_description_set.clone(); if exchange_done.load(Ordering::Acquire) { return Box::pin(async move {}); @@ -569,8 +567,8 @@ async fn maybe_connect_via_webrtc( } } }) - })) - .await; + }, + )); peer_connection.set_local_description(offer).await?; } @@ -739,7 +737,7 @@ async fn maybe_connect_via_webrtc( } async fn ice_candidate_to_proto(ice_candidate: RTCIceCandidate) -> Result { - let ice_candidate = ice_candidate.to_json().await?; + let ice_candidate = ice_candidate.to_json()?; Ok(IceCandidate { candidate: ice_candidate.candidate, sdp_mid: ice_candidate.sdp_mid, diff --git a/src/rpc/webrtc.rs b/src/rpc/webrtc.rs index 1d42dc3..db860e8 100644 --- a/src/rpc/webrtc.rs +++ b/src/rpc/webrtc.rs @@ -213,19 +213,17 @@ pub(crate) async fn new_peer_connection_for_client( ..Default::default() }; - peer_connection - .on_peer_connection_state_change(Box::new(move |connection: RTCPeerConnectionState| { + peer_connection.on_peer_connection_state_change(Box::new( + move |connection: RTCPeerConnectionState| { log::info!("peer connection state change: {connection}"); Box::pin(async move {}) - })) - .await; + }, + )); - peer_connection - .on_signaling_state_change(Box::new(move |ssc: RTCSignalingState| { - log::info!("new signaling state: {ssc}"); - Box::pin(async move {}) - })) - .await; + peer_connection.on_signaling_state_change(Box::new(move |ssc: RTCSignalingState| { + log::info!("new signaling state: {ssc}"); + Box::pin(async move {}) + })); let data_channel = peer_connection .create_data_channel("data", Some(data_channel_init)) @@ -237,39 +235,37 @@ pub(crate) async fn new_peer_connection_for_client( let nc = negotiation_channel.clone(); let pc = Arc::downgrade(&peer_connection); - negotiation_channel - .on_message(Box::new(move |msg: DataChannelMessage| { - let wpc = pc.clone(); - let nc = nc.clone(); - Box::pin(async move { - let pc = match wpc.upgrade() { - Some(pc) => pc, - None => return, - }; - let sdp_vec = msg.data.to_vec(); - let maybe_err = async move { - let sdp = serde_json::from_slice::(&sdp_vec) - .map_err(create_invalid_sdp_err)?; - pc.set_remote_description(sdp).await?; - let answer = pc.create_answer(None).await?; - pc.set_local_description(answer).await?; - let local_description = pc - .local_description() - .await - .ok_or("No local description set"); - let desc = - serde_json::to_vec(&local_description).map_err(create_invalid_sdp_err)?; - let desc = Bytes::copy_from_slice(&desc); - nc.send(&desc).await - } - .await; + negotiation_channel.on_message(Box::new(move |msg: DataChannelMessage| { + let wpc = pc.clone(); + let nc = nc.clone(); + Box::pin(async move { + let pc = match wpc.upgrade() { + Some(pc) => pc, + None => return, + }; + let sdp_vec = msg.data.to_vec(); + let maybe_err = async move { + let sdp = serde_json::from_slice::(&sdp_vec) + .map_err(create_invalid_sdp_err)?; + pc.set_remote_description(sdp).await?; + let answer = pc.create_answer(None).await?; + pc.set_local_description(answer).await?; + let local_description = pc + .local_description() + .await + .ok_or("No local description set"); + let desc = + serde_json::to_vec(&local_description).map_err(create_invalid_sdp_err)?; + let desc = Bytes::copy_from_slice(&desc); + nc.send(&desc).await + } + .await; - if let Err(e) = maybe_err { - log::error!("Error processing sdp in negotiation channel: {e}"); - } - }) - })) - .await; + if let Err(e) = maybe_err { + log::error!("Error processing sdp in negotiation channel: {e}"); + } + }) + })); if disable_trickle_ice { let offer = peer_connection.create_offer(None).await?;