From c67fff1107c3da527301861316f572d146327918 Mon Sep 17 00:00:00 2001 From: Nicolas Menard Date: Tue, 31 Oct 2023 12:15:40 -0400 Subject: [PATCH] Support for priority (#106) --- examples/sdkconfig.defaults | 3 +-- src/common/conn/server.rs | 33 +++++++++++++++++++++------------ src/common/webrtc/api.rs | 28 +++++++++++++++++++++++----- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/examples/sdkconfig.defaults b/examples/sdkconfig.defaults index 928a028f4..eaf5df31a 100644 --- a/examples/sdkconfig.defaults +++ b/examples/sdkconfig.defaults @@ -26,8 +26,7 @@ CONFIG_ESP32S3_SPIRAM_SUPPORT=n CONFIG_SPIRAM_SPEED_80M=y CONFIG_SPIRAM_IGNORE_NOTFOUND=y CONFIG_SPIRAM_USE_MALLOC=y -CONFIG_HEAP_ABORT_WHEN_ALLOCATION_FAILS=y -CONFIG_SPIRAM_MALLOC_ALWAYSINTERNAL=4096 +CONFIG_SPIRAM_MALLOC_ALWAYSINTERNAL=4090 #CONFIG_SPIRAM_TRY_ALLOCATE_WIFI_LWIP=y #CONFIG_MBEDTLS_DEBUG=y diff --git a/src/common/conn/server.rs b/src/common/conn/server.rs index 4347c359a..e7ee8b0d8 100755 --- a/src/common/conn/server.rs +++ b/src/common/conn/server.rs @@ -311,9 +311,9 @@ where .unwrap(); let _ = self.app_client.insert(app_client); let cloned_robot = robot.clone(); - + let mut current_prio = None; loop { - let _ = smol::Timer::after(std::time::Duration::from_millis(100)).await; + let _ = smol::Timer::after(std::time::Duration::from_millis(300)).await; let sig = if let Some(webrtc_config) = self.webrtc_config.as_ref() { let ip = self.app_config.get_ip(); @@ -345,22 +345,31 @@ where }, async { let mut api = sig.await.map_err(|e| ServerError::Other(Box::new(e)))?; - if let Some(task) = self.webtrc_conn.as_ref() { - if !task.is_finished() { - log::info!( - "a webrtc connection is active ignoring further signaling requests" - ); - return Err(ServerError::ServerConnectionNotConfigured); - } - } + + let prio = self + .webtrc_conn + .as_ref() + .and_then(|f| (!f.is_finished()).then_some(¤t_prio)) + .unwrap_or(&None); + let sdp = api - .answer() + .answer(prio) .await .map_err(|e| ServerError::Other(Box::new(e)))?; + // When the current priority is lower than the priority of the incoming connection then + // we cancel and close the current webrtc connection (if any) + if let Some(task) = self.webtrc_conn.take() { + if !task.is_finished() { + let _ = task.cancel().await; + } + } + + let _ = current_prio.insert(sdp.1); + Ok(IncomingConnection::WebRtcConnection(WebRTCConnection { webrtc_api: api, - sdp, + sdp: sdp.0, server: None, robot: cloned_robot.clone(), })) diff --git a/src/common/webrtc/api.rs b/src/common/webrtc/api.rs index 265a4fe46..ff05ff41e 100755 --- a/src/common/webrtc/api.rs +++ b/src/common/webrtc/api.rs @@ -71,6 +71,8 @@ pub enum WebRtcError { Other(#[from] anyhow::Error), #[error(transparent)] DtlsError(#[from] Box), + #[error("the active webrtc connection has a higher priority")] + CurrentConnectionHigherPrority(), } pub(crate) struct WebRtcSignalingChannel { @@ -410,7 +412,10 @@ where Err(WebRtcError::DataChannelOpenError()) } - pub async fn answer(&mut self) -> Result, WebRtcError> { + pub async fn answer( + &mut self, + current_prio: &Option, + ) -> Result<(Box, u32), WebRtcError> { let offer = self .signaling .as_mut() @@ -426,6 +431,19 @@ where .get(0) .ok_or_else(|| WebRtcError::InvalidSDPOffer("no media description".to_owned()))?; + let caller_prio = attribute + .attribute("x-priority") + .flatten() + .map_or(Ok(u32::MAX), |a| a.parse::()) + .unwrap_or(u32::MAX); + + let current_prio = current_prio.unwrap_or(0); + + // TODO use is_some_then when rust min version reach 1.70 + if current_prio >= caller_prio { + return Err(WebRtcError::CurrentConnectionHigherPrority()); + } + let remote_creds = ICECredentials::new( attribute .attribute("ice-ufrag") @@ -486,9 +504,9 @@ where let answer = answer.with_media(media); - Ok(Box::new(WebRtcSdp::new( - answer, - self.uuid.as_ref().unwrap().clone(), - ))) + Ok(( + Box::new(WebRtcSdp::new(answer, self.uuid.as_ref().unwrap().clone())), + caller_prio, + )) } }