diff --git a/examples/examples/simulcast/simulcast.rs b/examples/examples/simulcast/simulcast.rs index bd804a2b9..8e652c608 100644 --- a/examples/examples/simulcast/simulcast.rs +++ b/examples/examples/simulcast/simulcast.rs @@ -160,7 +160,7 @@ async fn main() -> Result<()> { if let Some(track) = track { println!("Track has started"); - let rid = track.rid().to_owned(); + let rid = track.rid().map_or(String::from(""), String::from); let output_track = if let Some(output_track) = output_tracks.get(&rid) { Arc::clone(output_track) } else { @@ -199,7 +199,7 @@ async fn main() -> Result<()> { tokio::spawn(async move { // Read RTP packets being sent to webrtc-rs - println!("enter track loop {}", track.rid()); + println!("enter track loop {:?}", track.rid()); while let Ok((rtp, _)) = track.read_rtp().await { if let Err(err) = output_track.write_rtp(&rtp).await { if Error::ErrClosedPipe != err { @@ -210,7 +210,7 @@ async fn main() -> Result<()> { } } } - println!("exit track loop {}", track.rid()); + println!("exit track loop {:?}", track.rid()); }); } Box::pin(async {}) diff --git a/interceptor/src/stream_info.rs b/interceptor/src/stream_info.rs index 139cdbc38..95967545e 100644 --- a/interceptor/src/stream_info.rs +++ b/interceptor/src/stream_info.rs @@ -11,6 +11,7 @@ pub struct RTPHeaderExtension { #[derive(Default, Debug, Clone)] pub struct StreamInfo { pub id: String, + pub rid: Option, pub attributes: Attributes, pub ssrc: u32, pub payload_type: u8, diff --git a/webrtc/src/error.rs b/webrtc/src/error.rs index 088a695cb..97241f902 100644 --- a/webrtc/src/error.rs +++ b/webrtc/src/error.rs @@ -197,6 +197,29 @@ pub enum Error { #[error("new track must be of the same kind as previous")] ErrRTPSenderNewTrackHasIncorrectKind, + #[error("Cannot call replace on a sender with multiple tracks")] + ErrRTPSenderCannotReplaceSimulcast, + + #[error("Sender does not have track for RID")] + ErrRTPSenderNoTrackForRID, + + #[error("Sender cannot add encoding due to RID collision")] + ErrRTPSenderRIDCollision, + + #[error("Sender cannot add encoding as provided track does not match base track")] + ErrRTPSenderBaseEncodingMismatch, + + /// ErrRTPSenderStopped indicates the sender was already stopped + #[error("Sender has already been stopped")] + ErrRTPSenderStopped, + + /// ErrRTPSenderRidNil indicates that the track RID was empty + #[error("Sender cannot add encoding as rid is empty")] + ErrRTPSenderRidNil, + + #[error("Sender cannot add encoding as there is no base track")] + ErrRTPSenderNoBaseEncoding, + /// ErrUnbindFailed indicates that a TrackLocal was not able to be unbind #[error("failed to unbind TrackLocal from PeerConnection")] ErrUnbindFailed, diff --git a/webrtc/src/peer_connection/mod.rs b/webrtc/src/peer_connection/mod.rs index cd678cdbe..e16d92e51 100644 --- a/webrtc/src/peer_connection/mod.rs +++ b/webrtc/src/peer_connection/mod.rs @@ -472,7 +472,7 @@ impl RTCPeerConnection { }; let sender = match t.sender().await { - Some(s) => s.clone(), + Some(s) => s, None => { log::warn!( "RtpSender missing for transeceiver with sending direction {} for mid {}", @@ -495,7 +495,6 @@ impl RTCPeerConnection { if stream_ids.is_empty() { return true; } - // different stream id if dmsid.split_whitespace().next() != Some(&stream_ids[0]) { return true; diff --git a/webrtc/src/peer_connection/peer_connection_internal.rs b/webrtc/src/peer_connection/peer_connection_internal.rs index 5c0539ef4..34037e59a 100644 --- a/webrtc/src/peer_connection/peer_connection_internal.rs +++ b/webrtc/src/peer_connection/peer_connection_internal.rs @@ -173,9 +173,9 @@ impl PeerConnectionInternal { let mut receiver_needs_stopped = false; for t in tracks { - if !t.rid().is_empty() { + if let Some(rid) = t.rid() { if let Some(details) = - track_details_for_rid(&track_details, t.rid().to_owned()) + track_details_for_rid(&track_details, String::from(rid)) { t.set_id(details.id.clone()).await; t.set_stream_id(details.stream_id.clone()).await; @@ -1141,6 +1141,7 @@ impl PeerConnectionInternal { let stream_info = create_stream_info( "".to_owned(), ssrc, + None, params.codecs[0].payload_type, params.codecs[0].capability.clone(), ¶ms.header_extensions, @@ -1182,7 +1183,7 @@ impl PeerConnectionInternal { return receiver .receive_for_rtx( 0, - rsid, + &rsid, TrackStream { stream_info: Some(stream_info.clone()), rtp_read_stream, @@ -1196,7 +1197,7 @@ impl PeerConnectionInternal { let track = receiver .receive_for_rid( - rid, + &rid, params, TrackStream { stream_info: Some(stream_info.clone()), @@ -1535,20 +1536,25 @@ impl PeerConnectionInternal { None => continue, }; - let track_id = track.id().to_string(); let kind = match track.kind() { RTPCodecType::Unspecified => continue, RTPCodecType::Audio => "audio", RTPCodecType::Video => "video", }; - track_infos.push(TrackInfo { - track_id, - ssrc: sender.ssrc, - mid: mid.clone(), - rid: None, - kind, - }); + let encodings = sender.track_encodings.read().await; + for e in encodings.iter() { + let track_id = track.rid().map_or(track.id().to_string(), |rid| { + format!("{}-{}", track.id(), rid) + }); + track_infos.push(TrackInfo { + track_id, + ssrc: e.ssrc, + mid: mid.clone(), + rid: track.rid().map(String::from), + kind, + }); + } } let stream_stats = self @@ -1610,7 +1616,7 @@ impl PeerConnectionInternal { kind, packets_sent, mid, - rid, + rid: rid.map(|a| a.to_owned()), header_bytes_sent, bytes_sent, nack_count, diff --git a/webrtc/src/peer_connection/sdp/mod.rs b/webrtc/src/peer_connection/sdp/mod.rs index ae9576045..aed5ddc8c 100644 --- a/webrtc/src/peer_connection/sdp/mod.rs +++ b/webrtc/src/peer_connection/sdp/mod.rs @@ -535,13 +535,36 @@ pub(crate) async fn add_transceiver_sdp( for mt in transceivers { if let Some(sender) = mt.sender().await { if let Some(track) = sender.track().await { - media = media.with_media_source( - sender.ssrc, - track.stream_id().to_owned(), /* cname */ - track.stream_id().to_owned(), /* streamLabel */ - track.id().to_owned(), - ); + if mt.direction().has_send() { + let send_parameters = sender.get_parameters().await; + // Get the different encodings expressed first + for encoding in &send_parameters.encodings { + media = media.with_media_source( + encoding.ssrc, + track.stream_id().to_owned(), /* cname */ + track.stream_id().to_owned(), /* streamLabel */ + track.id().to_owned(), + ); + } + + // Then tell the world about simulcast + if send_parameters.encodings.len() > 1 { + let mut send_rids: Vec = vec![]; + for e in &send_parameters.encodings { + if let Some(rid) = &e.rid { + let mut s: String = rid.clone(); + send_rids.push(rid.clone()); + s.push_str(" send"); + media = media.with_value_attribute("rid".into(), s); + } + } + // Simulcast) + let mut s: String = "send ".to_owned(); + s.push_str(send_rids.join(";").as_ref()); + media = media.with_value_attribute("simulcast".into(), s); + } + } // Send msid based on the configured track if we haven't already // sent on this sender. If we have sent we must keep the msid line consistent, this // is handled below. @@ -553,12 +576,10 @@ pub(crate) async fn add_transceiver_sdp( track.id() )); } - sender.set_initial_track_id(track.id().to_string())?; break; } } - if !is_plan_b { if let Some(track_id) = sender.initial_track_id() { // After we have include an msid attribute in an offer it must stay the same for diff --git a/webrtc/src/rtp_transceiver/mod.rs b/webrtc/src/rtp_transceiver/mod.rs index 8e6058d52..dc94a0cf7 100644 --- a/webrtc/src/rtp_transceiver/mod.rs +++ b/webrtc/src/rtp_transceiver/mod.rs @@ -93,7 +93,7 @@ pub struct RTCRtpRtxParameters { /// #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct RTCRtpCodingParameters { - pub rid: String, + pub rid: Option, pub ssrc: SSRC, pub payload_type: PayloadType, pub rtx: RTCRtpRtxParameters, @@ -132,6 +132,7 @@ pub struct RTCRtpTransceiverInit { pub(crate) fn create_stream_info( id: String, ssrc: SSRC, + rid: Option<&str>, payload_type: PayloadType, codec: RTCRtpCodecCapability, webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters], @@ -154,6 +155,7 @@ pub(crate) fn create_stream_info( StreamInfo { id, + rid: rid.map(String::from), attributes: Attributes::new(), ssrc, payload_type, diff --git a/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs b/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs index e4e0a398c..28be08529 100644 --- a/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_receiver/mod.rs @@ -198,7 +198,7 @@ impl RTPReceiverInternal { let tracks = self.tracks.read().await; for t in &*tracks { - if t.track.rid() == rid { + if Some(rid) == t.track.rid() { if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor { let a = Attributes::new(); @@ -529,6 +529,7 @@ impl RTCRtpReceiver { let stream_info = create_stream_info( "".to_owned(), encoding.ssrc, + None, 0, codec.clone(), &global_params.header_extensions, @@ -586,6 +587,7 @@ impl RTCRtpReceiver { let stream_info = create_stream_info( "".to_owned(), rtx_ssrc, + None, 0, codec.clone(), &global_params.header_extensions, @@ -597,7 +599,7 @@ impl RTCRtpReceiver { self.receive_for_rtx( rtx_ssrc, - "".to_owned(), + "", TrackStream { stream_info: Some(stream_info), rtp_read_stream, @@ -654,7 +656,7 @@ impl RTCRtpReceiver { let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size]; for (i, encoding) in encodings.iter_mut().enumerate() { if incoming.rids.len() > i { - encoding.rid = incoming.rids[i].clone(); + encoding.rid = Some(incoming.rids[i].clone()); } if incoming.ssrcs.len() > i { encoding.ssrc = incoming.ssrcs[i]; @@ -743,13 +745,13 @@ impl RTCRtpReceiver { /// It populates all the internal state for the given RID pub(crate) async fn receive_for_rid( &self, - rid: String, + rid: &str, params: RTCRtpParameters, stream: TrackStream, ) -> Result> { let mut tracks = self.internal.tracks.write().await; for t in &mut *tracks { - if t.track.rid() == rid { + if Some(rid) == t.track.rid() { t.track.set_kind(self.kind); if let Some(codec) = params.codecs.first() { t.track.set_codec(codec.clone()).await; @@ -771,13 +773,13 @@ impl RTCRtpReceiver { pub(crate) async fn receive_for_rtx( &self, ssrc: SSRC, - rsid: String, + rsid: &str, repair_stream: TrackStream, ) -> Result<()> { let mut tracks = self.internal.tracks.write().await; let l = tracks.len(); for t in &mut *tracks { - if (ssrc != 0 && l == 1) || t.track.rid() == rsid { + if (ssrc != 0 && l == 1) || Some(rsid) == t.track.rid() { t.repair_stream = repair_stream; let receive_mtu = self.receive_mtu; diff --git a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs index eca3a4d7e..4aca6fc27 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -20,39 +20,35 @@ use interceptor::stream_info::StreamInfo; use interceptor::{Attributes, Interceptor, RTCPReader, RTPWriter}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; -use tokio::sync::{mpsc, Mutex, Notify}; +use tokio::sync::{mpsc, Mutex, Notify, RwLock}; pub(crate) struct RTPSenderInternal { pub(crate) send_called_rx: Mutex>, pub(crate) stop_called_rx: Arc, pub(crate) stop_called_signal: Arc, - pub(crate) rtcp_interceptor: Mutex>>, } impl RTPSenderInternal { /// read reads incoming RTCP for this RTPReceiver - async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { + async fn read( + &self, + encoding: &Arc, + b: &mut [u8], + ) -> Result<(usize, Attributes)> { let mut send_called_rx = self.send_called_rx.lock().await; tokio::select! { _ = send_called_rx.recv() =>{ - let rtcp_interceptor = { - let rtcp_interceptor = self.rtcp_interceptor.lock().await; - rtcp_interceptor.clone() - }; - if let Some(rtcp_interceptor) = rtcp_interceptor{ + let rtcp_reader = encoding.rtcp_reader.clone(); let a = Attributes::new(); tokio::select! { _ = self.stop_called_rx.notified() => { Err(Error::ErrClosedPipe) } - result = rtcp_interceptor.read(b, &a) => { + result = rtcp_reader.read(b, &a) => { Ok(result?) } } - }else{ - Err(Error::ErrInterceptorNotBind) - } } _ = self.stop_called_rx.notified() =>{ Err(Error::ErrClosedPipe) @@ -63,10 +59,11 @@ impl RTPSenderInternal { /// read_rtcp is a convenience method that wraps Read and unmarshals for you. async fn read_rtcp( &self, + encoding: &Arc, receive_mtu: usize, ) -> Result<(Vec>, Attributes)> { let mut b = vec![0u8; receive_mtu]; - let (n, attributes) = self.read(&mut b).await?; + let (n, attributes) = self.read(encoding, &mut b).await?; let mut buf = &b[..n]; let pkts = rtcp::packet::unmarshal(&mut buf)?; @@ -75,19 +72,26 @@ impl RTPSenderInternal { } } -/// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer -pub struct RTCRtpSender { +pub struct TrackEncoding { pub(crate) track: Mutex>>, pub(crate) srtp_stream: Arc, + + pub(crate) rtcp_reader: Arc, pub(crate) stream_info: Mutex, pub(crate) context: Mutex, + pub(crate) ssrc: SSRC, +} + +/// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer +pub struct RTCRtpSender { + pub(crate) track_encodings: RwLock>>, + pub(crate) transport: Arc, pub(crate) payload_type: PayloadType, - pub(crate) ssrc: SSRC, receive_mtu: usize, /// a transceiver sender since we can just check the @@ -96,12 +100,14 @@ pub struct RTCRtpSender { pub(crate) media_engine: Arc, pub(crate) interceptor: Arc, + pub(crate) kind: RTPCodecType, pub(crate) id: String, /// The id of the initial track, even if we later change to a different /// track id should be use when negotiating. pub(crate) initial_track_id: std::sync::Mutex>, + /// AssociatedMediaStreamIds from the WebRTC specifcations pub(crate) associated_media_stream_ids: std::sync::Mutex>, @@ -140,44 +146,20 @@ impl RTCRtpSender { let (send_called_tx, send_called_rx) = mpsc::channel(1); let stop_called_tx = Arc::new(Notify::new()); let stop_called_rx = stop_called_tx.clone(); - let ssrc = rand::random::(); let stop_called_signal = Arc::new(AtomicBool::new(false)); + let stream_ids = vec![track.stream_id().to_string()]; let internal = Arc::new(RTPSenderInternal { send_called_rx: Mutex::new(send_called_rx), stop_called_rx, stop_called_signal: Arc::clone(&stop_called_signal), - rtcp_interceptor: Mutex::new(None), }); - let srtp_stream = Arc::new(SrtpWriterFuture { - closed: AtomicBool::new(false), - ssrc, - rtp_sender: Arc::downgrade(&internal), - rtp_transport: Arc::clone(&transport), - rtcp_read_stream: Mutex::new(None), - rtp_write_session: Mutex::new(None), - }); - - let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; - let rtcp_interceptor = interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; - { - let mut internal_rtcp_interceptor = internal.rtcp_interceptor.lock().await; - *internal_rtcp_interceptor = Some(rtcp_interceptor); - } - - let stream_ids = vec![track.stream_id().to_string()]; - RTCRtpSender { - track: Mutex::new(Some(track)), - - srtp_stream, - stream_info: Mutex::new(StreamInfo::default()), - - context: Mutex::new(TrackLocalContext::default()), + let sender = RTCRtpSender { + track_encodings: RwLock::new(vec![]), transport, payload_type: 0, - ssrc, receive_mtu, negotiated: AtomicBool::new(false), @@ -188,6 +170,7 @@ impl RTCRtpSender { id, initial_track_id: std::sync::Mutex::new(None), associated_media_stream_ids: std::sync::Mutex::new(stream_ids), + kind: track.kind(), rtp_transceiver: Mutex::new(None), @@ -198,7 +181,88 @@ impl RTCRtpSender { paused: Arc::new(AtomicBool::new(start_paused)), internal, + }; + + sender.add_encoding_internal(track).await; + + // Add track + sender + } + + pub async fn add_encoding(&self, track: Arc) -> Result<()> { + if self.has_stopped().await { + return Err(Error::ErrRTPSenderStopped); + } + + if self.has_sent().await { + return Err(Error::ErrRTPSenderSendAlreadyCalled); + } + + if track.rid().is_none() { + return Err(Error::ErrRTPSenderRidNil); + } + + if let Some(rid) = track.rid() { + if self.encoding_for_rid(&rid).await.is_some() { + return Err(Error::ErrRTPSenderRIDCollision); + } + } else { + return Err(Error::ErrRTPSenderRidNil); + } + + let ref_track = if let Some(t) = self.first_encoding().await? { + let t = t.track.lock().await; + if let Some(t) = &*t { + if t.rid().is_some() { + t.clone() + } else { + return Err(Error::ErrRTPSenderNoBaseEncoding); + } + } else { + return Err(Error::ErrRTPSenderNoBaseEncoding); + } + } else { + return Err(Error::ErrRTPSenderNoBaseEncoding); + }; + + if ref_track.id() != track.id() + || ref_track.stream_id() != track.stream_id() + || ref_track.kind() != track.kind() + { + return Err(Error::ErrRTPSenderBaseEncodingMismatch); } + self.add_encoding_internal(track).await; + Ok(()) + } + + pub(crate) async fn add_encoding_internal(&self, track: Arc) { + let ssrc = rand::random::(); + let srtp_stream = Arc::new(SrtpWriterFuture { + closed: AtomicBool::new(false), + ssrc, + rtp_sender: Arc::downgrade(&self.internal), + rtp_transport: Arc::clone(&self.transport), + rtcp_read_stream: Mutex::new(None), + rtp_write_session: Mutex::new(None), + }); + + // TODO: Each encoding currently gets its own srtp_stream over the top of a shared + // transport, their own own rtcp_reader, and separate 'bind' calls. + // This is directly lifted from Pion but is perhaps the wrong way to go about things + let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc; + let rtcp_reader = self.interceptor.bind_rtcp_reader(srtp_rtcp_reader).await; + + let track_encoding = TrackEncoding { + track: Mutex::new(Some(track)), + srtp_stream, + ssrc, + rtcp_reader, + stream_info: Mutex::new(StreamInfo::default()), + context: Mutex::new(TrackLocalContext::default()), + }; + + let mut encodings = self.track_encodings.write().await; + encodings.push(Arc::new(track_encoding)); } pub(crate) fn is_negotiated(&self) -> bool { @@ -233,27 +297,30 @@ impl RTCRtpSender { /// get_parameters describes the current configuration for the encoding and /// transmission of media on the sender's track. pub async fn get_parameters(&self) -> RTCRtpSendParameters { - let kind = { - let track = self.track.lock().await; - if let Some(t) = &*track { - t.kind() - } else { - RTPCodecType::default() - } - }; - - let mut send_parameters = { - RTCRtpSendParameters { - rtp_parameters: self - .media_engine - .get_rtp_parameters_by_kind(kind, RTCRtpTransceiverDirection::Sendonly) - .await, - encodings: vec![RTCRtpEncodingParameters { - ssrc: self.ssrc, + let encodings = { + let track_encodings = self.track_encodings.read().await; + let mut encodings: Vec = + Vec::with_capacity(track_encodings.len()); + for te in track_encodings.iter() { + let track = te.track.lock().await; + let rid = track.as_ref().and_then(|t| t.rid().map(String::from)); + + encodings.push(RTCRtpEncodingParameters { + ssrc: te.ssrc, payload_type: self.payload_type, + rid, ..Default::default() - }], + }) } + encodings + }; + + let mut send_parameters = RTCRtpSendParameters { + rtp_parameters: self + .media_engine + .get_rtp_parameters_by_kind(self.kind, RTCRtpTransceiverDirection::Sendonly) + .await, + encodings, }; let codecs = { @@ -262,10 +329,10 @@ impl RTCRtpSender { if let Some(t) = t.upgrade() { t.get_codecs().await } else { - self.media_engine.get_codecs_by_kind(kind).await + self.media_engine.get_codecs_by_kind(self.kind).await } } else { - self.media_engine.get_codecs_by_kind(kind).await + self.media_engine.get_codecs_by_kind(self.kind).await } }; send_parameters.rtp_parameters.codecs = codecs; @@ -274,9 +341,16 @@ impl RTCRtpSender { } /// track returns the RTCRtpTransceiver track, or nil + /// In the case of an RTCRtpSender with multiple encodings, this will return the track for the + /// first encoding only pub async fn track(&self) -> Option> { - let track = self.track.lock().await; - track.clone() + let encodings = self.track_encodings.read().await; + if let Some(t) = encodings.first() { + let track = t.track.lock().await; + track.clone() + } else { + None + } } /// replace_track replaces the track currently being used as the sender's source with a new TrackLocal. @@ -287,6 +361,10 @@ impl RTCRtpSender { track: Option>, ) -> Result<()> { if let Some(t) = &track { + let encodings = self.track_encodings.read().await; + if encodings.len() > 1 { + return Err(Error::ErrRTPSenderCannotReplaceSimulcast); + } let tr = self.rtp_transceiver.lock().await; if let Some(r) = &*tr { if let Some(r) = r.upgrade() { @@ -301,25 +379,33 @@ impl RTCRtpSender { } } + let re = { + let encodings = self.track_encodings.read().await; + match encodings.first() { + Some(re) => re.clone(), + None => return Ok(()), + } + }; + if self.has_sent().await { let t = { - let t = self.track.lock().await; + let t = re.track.lock().await; t.clone() }; if let Some(t) = t { - let context = self.context.lock().await; + let context = re.context.lock().await; t.unbind(&context).await?; } } if !self.has_sent().await || track.is_none() { - let mut t = self.track.lock().await; + let mut t = re.track.lock().await; *t = track; return Ok(()); } let context = { - let context = self.context.lock().await; + let context = re.context.lock().await; context.clone() }; @@ -332,7 +418,8 @@ impl RTCRtpSender { .await, ssrc: context.ssrc, write_stream: context.write_stream.clone(), - paused: self.paused.clone(), + rtcp_reader: context.rtcp_reader.clone(), + paused: context.paused.clone(), }; t.bind(&new_context).await @@ -343,7 +430,7 @@ impl RTCRtpSender { match result { Err(err) => { // Re-bind the original track - let track = self.track.lock().await; + let track = re.track.lock().await; if let Some(t) = &*track { t.bind(&context).await?; } @@ -353,12 +440,12 @@ impl RTCRtpSender { Ok(codec) => { // Codec has changed if self.payload_type != codec.payload_type { - let mut context = self.context.lock().await; + let mut context = re.context.lock().await; context.params.codecs = vec![codec]; } { - let mut t = self.track.lock().await; + let mut t = re.track.lock().await; *t = track; } @@ -373,67 +460,72 @@ impl RTCRtpSender { return Err(Error::ErrRTPSenderSendAlreadyCalled); } - let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); - let (context, stream_info) = { - let track = self.track.lock().await; - let mut context = TrackLocalContext { - id: self.id.clone(), - params: self - .media_engine - .get_rtp_parameters_by_kind( - if let Some(t) = &*track { - t.kind() - } else { - RTPCodecType::default() - }, - RTCRtpTransceiverDirection::Sendonly, - ) - .await, - ssrc: parameters.encodings[0].ssrc, - write_stream: Some( - Arc::clone(&write_stream) as Arc - ), - paused: self.paused.clone(), - }; + let encodings = self.track_encodings.read().await; + for te in encodings.iter() { + let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); + let (context, stream_info) = { + let track = te.track.lock().await; + let mut context = TrackLocalContext { + id: self.id.clone(), + params: self + .media_engine + .get_rtp_parameters_by_kind( + if let Some(t) = &*track { + t.kind() + } else { + RTPCodecType::default() + }, + RTCRtpTransceiverDirection::Sendonly, + ) + .await, + ssrc: te.ssrc, + rtcp_reader: Some(te.rtcp_reader.clone()), + write_stream: Some( + Arc::clone(&write_stream) as Arc + ), + paused: self.paused.clone(), + }; - let codec = if let Some(t) = &*track { - t.bind(&context).await? - } else { - RTCRtpCodecParameters::default() + let (codec, rid) = if let Some(t) = &*track { + let codec = t.bind(&context).await?; + (codec, t.rid()) + } else { + (RTCRtpCodecParameters::default(), None) + }; + let payload_type = codec.payload_type; + let capability = codec.capability.clone(); + context.params.codecs = vec![codec]; + let stream_info = create_stream_info( + self.id.clone(), + te.ssrc, + rid, + payload_type, + capability, + ¶meters.rtp_parameters.header_extensions, + ); + + (context, stream_info) }; - let payload_type = codec.payload_type; - let capability = codec.capability.clone(); - context.params.codecs = vec![codec]; - let stream_info = create_stream_info( - self.id.clone(), - parameters.encodings[0].ssrc, - payload_type, - capability, - ¶meters.rtp_parameters.header_extensions, - ); - - (context, stream_info) - }; - let srtp_rtp_writer = Arc::clone(&self.srtp_stream) as Arc; - let rtp_interceptor = self - .interceptor - .bind_local_stream(&stream_info, srtp_rtp_writer) - .await; - { - let mut interceptor_rtp_writer = write_stream.interceptor_rtp_writer.lock().await; - *interceptor_rtp_writer = Some(rtp_interceptor); - } + let srtp_rtp_writer = Arc::clone(&te.srtp_stream) as Arc; + let rtp_interceptor = self + .interceptor + .bind_local_stream(&stream_info, srtp_rtp_writer) + .await; + { + let mut interceptor_rtp_writer = write_stream.interceptor_rtp_writer.lock().await; + *interceptor_rtp_writer = Some(rtp_interceptor); + } - { - let mut ctx = self.context.lock().await; - *ctx = context; - } - { - let mut si = self.stream_info.lock().await; - *si = stream_info; + { + let mut ctx = te.context.lock().await; + *ctx = context; + } + { + let mut si = te.stream_info.lock().await; + *si = stream_info; + } } - { let mut send_called_tx = self.send_called_tx.lock().await; send_called_tx.take(); @@ -456,24 +548,58 @@ impl RTCRtpSender { self.replace_track(None).await?; - { - let stream_info = self.stream_info.lock().await; + let encodings = self.track_encodings.read().await; + for te in encodings.iter() { + let stream_info = te.stream_info.lock().await; self.interceptor.unbind_local_stream(&stream_info).await; + te.srtp_stream.close().await? } - - self.srtp_stream.close().await + Ok(()) } /// read reads incoming RTCP for this RTPReceiver pub async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> { - self.internal.read(b).await + if let Some(encoding) = self.first_encoding().await? { + self.internal.read(&encoding, b).await + } else { + Err(Error::ErrInterceptorNotBind) + } + } + + async fn encoding_for_rid(&self, rid: &str) -> Option> { + let encodings = self.track_encodings.read().await; + for e in encodings.iter() { + if let Some(track) = &*e.track.lock().await { + if Some(rid) == track.rid() { + return Some(e.clone()); + } + }; + } + None + } + + async fn first_encoding(&self) -> Result>> { + let encodings = self.track_encodings.read().await; + return Ok(encodings.first().map(|x| (*x).clone())); + } + + pub async fn read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> { + if let Some(encoding) = self.encoding_for_rid(rid).await { + self.internal.read(&encoding, b).await + } else { + Err(Error::ErrRTPSenderNoTrackForRID) + } } /// read_rtcp is a convenience method that wraps Read and unmarshals for you. pub async fn read_rtcp( &self, ) -> Result<(Vec>, Attributes)> { - self.internal.read_rtcp(self.receive_mtu).await + if let Some(encoding) = self.first_encoding().await? { + self.internal.read_rtcp(&encoding, self.receive_mtu).await + } else { + Err(Error::ErrInterceptorNotBind) + } } /// has_sent tells if data has been ever sent for this instance diff --git a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs index 12bcef3e4..7c7e2b23f 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs @@ -142,10 +142,11 @@ async fn test_rtp_sender_get_parameters() -> Result<()> { signal_pair(&mut offerer, &mut answerer).await?; if let Some(sender) = rtp_transceiver.sender().await { + let encoding = { sender.track_encodings.read().await[0].clone() }; let parameters = sender.get_parameters().await; assert_ne!(0, parameters.rtp_parameters.codecs.len()); assert_eq!(1, parameters.encodings.len()); - assert_eq!(sender.ssrc, parameters.encodings[0].ssrc); + assert_eq!(encoding.ssrc, parameters.encodings[0].ssrc); } else { assert!(false); } diff --git a/webrtc/src/track/track_local/mod.rs b/webrtc/src/track/track_local/mod.rs index c64a7077d..597824f98 100644 --- a/webrtc/src/track/track_local/mod.rs +++ b/webrtc/src/track/track_local/mod.rs @@ -9,7 +9,7 @@ use crate::rtp_transceiver::rtp_codec::*; use crate::rtp_transceiver::*; use async_trait::async_trait; -use interceptor::{Attributes, RTPWriter}; +use interceptor::{Attributes, RTCPReader, RTPWriter}; use std::any::Any; use std::fmt; use std::sync::atomic::{AtomicBool, Ordering}; @@ -29,13 +29,25 @@ pub trait TrackLocalWriter: fmt::Debug { /// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used /// in Interceptors. -#[derive(Default, Debug, Clone)] +#[derive(Default, Clone)] pub struct TrackLocalContext { pub(crate) id: String, pub(crate) params: RTCRtpParameters, pub(crate) ssrc: SSRC, pub(crate) write_stream: Option>, pub(crate) paused: Arc, + pub(crate) rtcp_reader: Option>, +} + +impl std::fmt::Debug for TrackLocalContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TrackLocalContext") + .field("id", &self.id) + .field("params", &self.params) + .field("ssrc", &self.ssrc) + .field("write_stream", &self.write_stream) + .finish() + } } impl TrackLocalContext { @@ -63,6 +75,10 @@ impl TrackLocalContext { self.write_stream.clone() } + pub fn rtcp_reader(&self) -> Option> { + self.rtcp_reader.clone() + } + /// id is a unique identifier that is used for both bind/unbind pub fn id(&self) -> String { self.id.clone() @@ -87,6 +103,9 @@ pub trait TrackLocal { /// and stream_id would be 'desktop' or 'webcam' fn id(&self) -> &str; + /// rid is the RTP identifier for this Track + fn rid(&self) -> Option<&str>; + /// stream_id is the group this track belongs too. This must be unique fn stream_id(&self) -> &str; diff --git a/webrtc/src/track/track_local/track_local_static_rtp.rs b/webrtc/src/track/track_local/track_local_static_rtp.rs index 58bd40033..24e0c4b3d 100644 --- a/webrtc/src/track/track_local/track_local_static_rtp.rs +++ b/webrtc/src/track/track_local/track_local_static_rtp.rs @@ -10,6 +10,7 @@ pub struct TrackLocalStaticRTP { pub(crate) bindings: Mutex>>, codec: RTCRtpCodecCapability, id: String, + rid: Option, stream_id: String, } @@ -20,10 +21,15 @@ impl TrackLocalStaticRTP { codec, bindings: Mutex::new(vec![]), id, + rid: None, stream_id, } } + pub fn set_rid(&mut self, rid: String) { + self.rid = Some(rid); + } + /// codec gets the Codec of the track pub fn codec(&self) -> RTCRtpCodecCapability { self.codec.clone() @@ -105,6 +111,10 @@ impl TrackLocal for TrackLocalStaticRTP { self.stream_id.as_str() } + fn rid(&self) -> Option<&str> { + self.rid.as_deref() + } + /// kind controls if this TrackLocal is audio or video fn kind(&self) -> RTPCodecType { if self.codec.mime_type.starts_with("audio/") { diff --git a/webrtc/src/track/track_local/track_local_static_sample.rs b/webrtc/src/track/track_local/track_local_static_sample.rs index 7c8237788..dfb00636a 100644 --- a/webrtc/src/track/track_local/track_local_static_sample.rs +++ b/webrtc/src/track/track_local/track_local_static_sample.rs @@ -44,6 +44,11 @@ impl TrackLocalStaticSample { self.rtp_track.codec() } + // Sets the RID for this track + pub fn set_rid(&mut self, rid: String) { + self.rtp_track.set_rid(rid); + } + /// write_sample writes a Sample to the TrackLocalStaticSample /// If one PeerConnection fails the packets will still be sent to /// all PeerConnections. The error message will contain the ID of the failed @@ -105,12 +110,6 @@ impl TrackLocalStaticSample { if sample.prev_dropped_packets > 0 { packetizer.skip_samples(samples * sample.prev_dropped_packets as u32); } - /*println!( - "clock_rate={}, samples={}, {}", - clock_rate, - samples, - sample.duration.as_secs_f64() - );*/ packetizer.packetize(&sample.data, samples).await? } else { vec![] @@ -172,6 +171,10 @@ impl TrackLocal for TrackLocalStaticSample { self.rtp_track.id() } + fn rid(&self) -> Option<&str> { + self.rtp_track.rid() + } + /// stream_id is the group this track belongs too. This must be unique fn stream_id(&self) -> &str { self.rtp_track.stream_id() diff --git a/webrtc/src/track/track_remote/mod.rs b/webrtc/src/track/track_remote/mod.rs index 0f6382af3..9ec722586 100644 --- a/webrtc/src/track/track_remote/mod.rs +++ b/webrtc/src/track/track_remote/mod.rs @@ -48,7 +48,7 @@ pub struct TrackRemote { ssrc: AtomicU32, //SSRC, codec: Mutex, pub(crate) params: Mutex, - rid: String, + rid: Option, media_engine: Arc, interceptor: Arc, @@ -79,7 +79,7 @@ impl TrackRemote { receive_mtu: usize, kind: RTPCodecType, ssrc: SSRC, - rid: String, + rid: Option, receiver: Weak, media_engine: Arc, interceptor: Arc, @@ -135,8 +135,8 @@ impl TrackRemote { /// rid gets the RTP Stream ID of this Track /// With Simulcast you will have multiple tracks with the same ID, but different RID values. /// In many cases a TrackRemote will not have an RID, so it is important to assert it is non-zero - pub fn rid(&self) -> &str { - self.rid.as_str() + pub fn rid(&self) -> Option<&str> { + self.rid.as_deref() } /// payload_type gets the PayloadType of the track