From 49fa63ee821d71edf48fe9d72dea1d308741e5ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Thu, 6 Apr 2023 08:45:50 -0300 Subject: [PATCH 1/7] stream: sink: Fix wrong error messages --- src/stream/sink/image_sink.rs | 4 ++-- src/stream/sink/udp_sink.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/stream/sink/image_sink.rs b/src/stream/sink/image_sink.rs index 93a84b7e..2c9684e0 100644 --- a/src/stream/sink/image_sink.rs +++ b/src/stream/sink/image_sink.rs @@ -131,7 +131,7 @@ impl SinkInterface for ImageSink { .expect("No sink pad found on ProxySink"); if let Err(link_err) = queue_src_pad.link(proxysink_sink_pad) { let msg = - format!("Failed to link Queue's src pad with WebRTCBin's sink pad: {link_err:?}"); + format!("Failed to link Queue's src pad with ProxySink's sink pad: {link_err:?}"); error!(msg); if let Some(parent) = tee_src_pad.parent_element() { @@ -175,7 +175,7 @@ impl SinkInterface for ImageSink { error!(msg); if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { - error!("Failed to unlink Queue's src pad and ProxySink's sink pad: {unlink_err:?}"); + error!("Failed to unlink Tee's src pad and Queue's sink pad: {unlink_err:?}"); } if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { diff --git a/src/stream/sink/udp_sink.rs b/src/stream/sink/udp_sink.rs index c203a25b..d2f3f33d 100644 --- a/src/stream/sink/udp_sink.rs +++ b/src/stream/sink/udp_sink.rs @@ -79,7 +79,7 @@ impl SinkInterface for UdpSink { .expect("No sink pad found on ProxySink"); if let Err(link_err) = queue_src_pad.link(proxysink_sink_pad) { let msg = - format!("Failed to link Queue's src pad with WebRTCBin's sink pad: {link_err:?}"); + format!("Failed to link Queue's src pad with ProxySink's sink pad: {link_err:?}"); error!(msg); if let Some(parent) = tee_src_pad.parent_element() { @@ -122,8 +122,8 @@ impl SinkInterface for UdpSink { let msg = format!("Failed to synchronize children states: {sync_err:?}"); error!(msg); - if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { - error!("Failed to unlink Queue's src pad and ProxySink's sink pad: {unlink_err:?}"); + if let Err(unlink_err) = tee_src_pad.unlink(queue_sink_pad) { + error!("Failed to unlink Tee's src pad and Queue's sink pad: {unlink_err:?}"); } if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { From cbe74d4e2514fbf1c6317da910e464bf5420d7d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 18 Apr 2023 11:24:11 -0300 Subject: [PATCH 2/7] src: stream: Add Custom RTSP Media Factory --- src/stream/rtsp/mod.rs | 1 + src/stream/rtsp/rtsp_media_factory.rs | 78 +++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 src/stream/rtsp/rtsp_media_factory.rs diff --git a/src/stream/rtsp/mod.rs b/src/stream/rtsp/mod.rs index 598792ed..044a3618 100644 --- a/src/stream/rtsp/mod.rs +++ b/src/stream/rtsp/mod.rs @@ -1 +1,2 @@ +mod rtsp_media_factory; pub mod rtsp_server; diff --git a/src/stream/rtsp/rtsp_media_factory.rs b/src/stream/rtsp/rtsp_media_factory.rs new file mode 100644 index 00000000..3c70fe8e --- /dev/null +++ b/src/stream/rtsp/rtsp_media_factory.rs @@ -0,0 +1,78 @@ +/// /References: +/// 1 - https://gitlab.freedesktop.org/slomo/rtp-rapid-sync-example +/// 2 - https://github.com/gtk-rs/gtk3-rs/blob/master/examples/list_box_model/row_data/imp.rs +mod imp { + use std::sync::{Arc, Mutex}; + + use once_cell::sync::Lazy; + + use gst::{ + glib::{self, subclass::prelude::*, *}, + prelude::*, + }; + use gst_rtsp_server::subclass::prelude::*; + + // The actual data structure that stores our values. This is not accessible + // directly from the outside. + #[derive(Default)] + pub struct Factory { + bin: Arc>, + } + + // Basic declaration of our type for the GObject type system + #[glib::object_subclass] + impl ObjectSubclass for Factory { + const NAME: &'static str = "RTSPMediaFactoryFromBin"; + type Type = super::Factory; + type ParentType = gst_rtsp_server::RTSPMediaFactory; + } + + // The ObjectImpl trait provides the setters/getters for GObject properties. + // Here we need to provide the values that are internally stored back to the + // caller, or store whatever new value the caller is providing. + // + // This maps between the GObject properties and our internal storage of the + // corresponding values of the properties. + impl ObjectImpl for Factory { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecObject::builder::("bin") + .construct_only() + .build()] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "bin" => { + let bin = value.get().unwrap(); + self.bin.set(bin); + } + _ => unimplemented!(), + } + } + } + + impl RTSPMediaFactoryImpl for Factory { + // Create the custom stream producer bin. + fn create_element(&self, _url: &gst_rtsp::RTSPUrl) -> Option { + let bin = self.bin.lock().unwrap(); + bin.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "rtsp-bin-created"); + + Some(bin.to_owned().upcast()) + } + } +} + +gst::glib::wrapper! { + pub struct Factory(ObjectSubclass) @extends gst_rtsp_server::RTSPMediaFactory; +} + +// Trivial constructor for the media factory. +impl Factory { + pub fn new(bin: gst::Bin) -> Self { + gst::glib::Object::builder().property("bin", bin).build() + } +} From a85b2d3d425bdb5c757f98317f12c418ba50328f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 18 Apr 2023 11:25:48 -0300 Subject: [PATCH 3/7] src: stream: Move from shm to proxy pair --- src/stream/pipeline/mod.rs | 15 ++++- src/stream/rtsp/rtsp_server.rs | 104 ++++++++++++++------------------- src/stream/sink/rtsp_sink.rs | 38 +++++------- 3 files changed, 72 insertions(+), 85 deletions(-) diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 13e6faca..090b5e72 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -206,10 +206,23 @@ impl PipelineState { .expect("No static sink pad found on capsfilter") .current_caps() .context("Failed to get caps from capsfilter sink pad")?; + let Some(encode) = caps + .iter() + .find_map(|structure| { + structure.iter().find_map(|(key, sendvalue)| { + if key == "encoding-name" { + Some(sendvalue.to_value().get::().expect("Failed accessing encoding-name parameter")) + } else { + None + } + }) + }) else { + return Err(anyhow!("Cannot find 'media' in caps")); + }; debug!("caps: {:#?}", caps.to_string()); - RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?; + RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &encode)?; RTSPServer::start_pipeline(&sink.path())?; } diff --git a/src/stream/rtsp/rtsp_server.rs b/src/stream/rtsp/rtsp_server.rs index 0e8f23fe..b2f347a4 100644 --- a/src/stream/rtsp/rtsp_server.rs +++ b/src/stream/rtsp/rtsp_server.rs @@ -7,13 +7,15 @@ use gst_rtsp::RTSPLowerTrans; use gst_rtsp_server::{prelude::*, RTSPTransportMode}; use tracing::*; +use crate::stream::rtsp::rtsp_media_factory; + #[allow(dead_code)] pub struct RTSPServer { pub server: gst_rtsp_server::RTSPServer, host: String, port: u16, run: bool, - pub path_to_factory: HashMap, + pub path_to_factory: HashMap, main_loop_thread: Option>, main_loop_thread_rx_channel: std::sync::mpsc::Receiver, } @@ -105,70 +107,30 @@ impl RTSPServer { } #[instrument(level = "debug")] - pub fn add_pipeline(path: &str, socket_path: &str, rtp_caps: &gst::Caps) -> Result<()> { - // Initialize the singleton before calling gst factory - let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); - - let factory = gst_rtsp_server::RTSPMediaFactory::new(); - factory.set_shared(true); - factory.set_buffer_size(0); - factory.set_latency(0u32); - factory.set_transport_mode(RTSPTransportMode::PLAY); - factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST); - - let Some(encode) = rtp_caps - .iter() - .find_map(|structure| { - structure.iter().find_map(|(key, sendvalue)| { - if key == "encoding-name" { - Some(sendvalue.to_value().get::().expect("Failed accessing encoding-name parameter")) - } else { - None - } - }) - }) else { - return Err(anyhow!("Cannot find 'media' in caps")); - }; - - let rtp_caps = rtp_caps.to_string(); - let description = match encode.as_str() { + fn create_rtsp_bin(proxysink: &gst::Element, encode: &str) -> Result { + let description = match encode { "H264" => { - format!( - concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", - " ! rtph264depay", - " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", - ), - socket_path = socket_path, - rtp_caps = rtp_caps, + concat!( + "proxysrc name=ProxySrc message-forward=true", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtph264depay", + " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", ) } "RAW" => { - format!( - concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", - " ! rtpvrawdepay", - " ! rtpvrawpay name=pay0 pt=96", - ), - socket_path = socket_path, - rtp_caps = rtp_caps, + concat!( + "proxysrc name=ProxySrc", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtpvrawdepay", + " ! rtpvrawpay name=pay0 pt=96", ) } "JPEG" => { - format!( - concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", - " ! rtpjpegdepay", - " ! rtpjpegpay name=pay0 pt=96", - ), - socket_path = socket_path, - rtp_caps = rtp_caps, + concat!( + "proxysrc name=ProxySrc", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtpjpegdepay", + " ! rtpjpegpay name=pay0 pt=96", ) } unsupported => { @@ -180,7 +142,31 @@ impl RTSPServer { debug!("RTSP Server description: {description:#?}"); - factory.set_launch(&description); + let rtsp_bin = gst::parse_bin_from_description(&description, true)?; + { + let proxysrc = rtsp_bin + .by_name("ProxySrc") + .expect("Failed to find proxysrc by name: wrong name?"); + proxysrc.set_property("proxysink", proxysink); + let _ = rtsp_bin.set_state(gst::State::Playing); + } + + Ok(rtsp_bin) + } + + #[instrument(level = "debug")] + pub fn add_pipeline(path: &str, proxysink: &gst::Element, encode: &str) -> Result<()> { + // Initialize the singleton before calling gst factory + let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); + + let rtsp_bin = Self::create_rtsp_bin(proxysink, encode)?; + + let factory = rtsp_media_factory::Factory::new(rtsp_bin); + factory.set_shared(true); + factory.set_buffer_size(0); + factory.set_latency(0u32); + factory.set_transport_mode(RTSPTransportMode::PLAY); + factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST); if let Some(server) = rtsp_server .path_to_factory diff --git a/src/stream/sink/rtsp_sink.rs b/src/stream/sink/rtsp_sink.rs index 914ec6b7..0a2e22be 100644 --- a/src/stream/sink/rtsp_sink.rs +++ b/src/stream/sink/rtsp_sink.rs @@ -10,11 +10,10 @@ use super::SinkInterface; pub struct RtspSink { sink_id: uuid::Uuid, queue: gst::Element, - sink: gst::Element, + proxysink: gst::Element, sink_sink_pad: gst::Pad, tee_src_pad: Option, path: String, - socket_path: String, } impl SinkInterface for RtspSink { #[instrument(level = "debug", skip(self))] @@ -26,8 +25,6 @@ impl SinkInterface for RtspSink { ) -> Result<()> { let sink_id = &self.get_id(); - let _ = std::fs::remove_file(&self.socket_path); // Remove if already exists - // Set Tee's src pad if self.tee_src_pad.is_some() { return Err(anyhow!( @@ -55,7 +52,7 @@ impl SinkInterface for RtspSink { }; // Add the Sink elements to the Pipeline - let elements = &[&self.queue, &self.sink]; + let elements = &[&self.queue, &self.proxysink]; if let Err(add_err) = pipeline.add_many(elements) { let msg = format!("Failed to add WebRTCSink's elements to the Pipeline: {add_err:?}"); error!(msg); @@ -143,10 +140,6 @@ impl SinkInterface for RtspSink { #[instrument(level = "debug", skip(self))] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { - if let Err(error) = std::fs::remove_file(&self.socket_path) { - warn!("Failed removing the RTSP Sink socket file. Reason: {error:?}"); - } - let Some(tee_src_pad) = &self.tee_src_pad else { warn!("Tried to unlink Sink from a pipeline without a Tee src pad."); return Ok(()); @@ -180,7 +173,7 @@ impl SinkInterface for RtspSink { } // Remove the Sink's elements from the Source's pipeline - let elements = &[&self.queue, &self.sink]; + let elements = &[&self.queue, &self.proxysink]; if let Err(remove_err) = pipeline.remove_many(elements) { warn!("Failed removing RtspSink's elements from pipeline: {remove_err:?}"); } @@ -191,7 +184,7 @@ impl SinkInterface for RtspSink { } // Set Sink to null - if let Err(state_err) = self.sink.set_state(gst::State::Null) { + if let Err(state_err) = self.proxysink.set_state(gst::State::Null) { warn!("Failed to set RtspSink's to NULL: {state_err:#?}"); } @@ -227,34 +220,29 @@ impl RtspSink { "Failed to find RTSP compatible address. Example: \"rtsp://0.0.0.0:8554/test\"", )?; - let socket_path = format!("/tmp/{id}"); - let sink = gst::ElementFactory::make("shmsink") - .property_from_str("socket-path", &socket_path) - .property("sync", true) - .property("wait-for-connection", false) - .property("shm-size", 10_000_000u32) - .build()?; + let proxysink = gst::ElementFactory::make("proxysink").build()?; - let sink_sink_pad = sink.static_pad("sink").context("Failed to get Sink Pad")?; + let sink_sink_pad = proxysink + .static_pad("sink") + .context("Failed to get Sink Pad")?; Ok(Self { sink_id: id, queue, - sink, + proxysink, sink_sink_pad, path, - socket_path, tee_src_pad: Default::default(), }) } #[instrument(level = "trace", skip(self))] - pub fn path(&self) -> String { - self.path.clone() + pub fn proxysink(&self) -> &gst::Element { + &self.proxysink } #[instrument(level = "trace", skip(self))] - pub fn socket_path(&self) -> String { - self.socket_path.clone() + pub fn path(&self) -> String { + self.path.clone() } } From 2f901ac855de42e9cfff6fce751eb6f78072081f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Wed, 19 Apr 2023 22:45:16 -0300 Subject: [PATCH 4/7] src: stream: sink: Store the video encoding in RTSP sink --- src/stream/sink/mod.rs | 12 +++++++++++- src/stream/sink/rtsp_sink.rs | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/stream/sink/mod.rs b/src/stream/sink/mod.rs index b5c500f3..5106f5c0 100644 --- a/src/stream/sink/mod.rs +++ b/src/stream/sink/mod.rs @@ -16,6 +16,8 @@ use anyhow::{anyhow, Result}; use tracing::*; +use super::types::CaptureConfiguration; + #[enum_dispatch] pub trait SinkInterface { /// Link this Sink's sink pad to the given Pipelines's Tee element's src pad. @@ -72,7 +74,15 @@ pub fn create_rtsp_sink( .endpoints .clone(); - Ok(Sink::Rtsp(RtspSink::try_new(id, addresses)?)) + let encoding = match &video_and_stream_information + .stream_information + .configuration + { + CaptureConfiguration::Video(configuration) => configuration.encode.clone(), + _ => unreachable!(), + }; + + Ok(Sink::Rtsp(RtspSink::try_new(id, addresses, encoding)?)) } #[instrument(level = "debug")] diff --git a/src/stream/sink/rtsp_sink.rs b/src/stream/sink/rtsp_sink.rs index 0a2e22be..53eefc17 100644 --- a/src/stream/sink/rtsp_sink.rs +++ b/src/stream/sink/rtsp_sink.rs @@ -4,6 +4,8 @@ use tracing::*; use gst::prelude::*; +use crate::video::types::VideoEncodeType; + use super::SinkInterface; #[derive(Debug)] @@ -13,6 +15,7 @@ pub struct RtspSink { proxysink: gst::Element, sink_sink_pad: gst::Pad, tee_src_pad: Option, + encoding: VideoEncodeType, path: String, } impl SinkInterface for RtspSink { @@ -206,7 +209,11 @@ impl SinkInterface for RtspSink { impl RtspSink { #[instrument(level = "debug")] - pub fn try_new(id: uuid::Uuid, addresses: Vec) -> Result { + pub fn try_new( + id: uuid::Uuid, + addresses: Vec, + encoding: VideoEncodeType, + ) -> Result { let queue = gst::ElementFactory::make("queue") .property_from_str("leaky", "downstream") // Throw away any data .property("flush-on-eos", true) @@ -231,6 +238,7 @@ impl RtspSink { queue, proxysink, sink_sink_pad, + encoding, path, tee_src_pad: Default::default(), }) @@ -241,6 +249,11 @@ impl RtspSink { &self.proxysink } + #[instrument(level = "trace", skip(self))] + pub fn encoding(&self) -> VideoEncodeType { + self.encoding.clone() + } + #[instrument(level = "trace", skip(self))] pub fn path(&self) -> String { self.path.clone() From 6b03aca52eb1c202cc0811e9874d6814458e4182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Wed, 19 Apr 2023 22:46:59 -0300 Subject: [PATCH 5/7] src: stream: pipeline: Ask encoding to the sink --- src/stream/pipeline/mod.rs | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 090b5e72..cd2831f4 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -200,29 +200,7 @@ impl PipelineState { ); if let Sink::Rtsp(sink) = &sink { - let caps = &self - .sink_tee - .static_pad("sink") - .expect("No static sink pad found on capsfilter") - .current_caps() - .context("Failed to get caps from capsfilter sink pad")?; - let Some(encode) = caps - .iter() - .find_map(|structure| { - structure.iter().find_map(|(key, sendvalue)| { - if key == "encoding-name" { - Some(sendvalue.to_value().get::().expect("Failed accessing encoding-name parameter")) - } else { - None - } - }) - }) else { - return Err(anyhow!("Cannot find 'media' in caps")); - }; - - debug!("caps: {:#?}", caps.to_string()); - - RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &encode)?; + RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &sink.encoding())?; RTSPServer::start_pipeline(&sink.path())?; } From fc6eb2f16d955eacad32f87586a740355dd54733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Wed, 19 Apr 2023 22:48:07 -0300 Subject: [PATCH 6/7] src: stream: rtsp_server: Use VideoEncodeType for encoding instead of str --- src/stream/rtsp/rtsp_server.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/stream/rtsp/rtsp_server.rs b/src/stream/rtsp/rtsp_server.rs index b2f347a4..dc1d4569 100644 --- a/src/stream/rtsp/rtsp_server.rs +++ b/src/stream/rtsp/rtsp_server.rs @@ -8,6 +8,7 @@ use gst_rtsp_server::{prelude::*, RTSPTransportMode}; use tracing::*; use crate::stream::rtsp::rtsp_media_factory; +use crate::video::types::VideoEncodeType; #[allow(dead_code)] pub struct RTSPServer { @@ -107,9 +108,9 @@ impl RTSPServer { } #[instrument(level = "debug")] - fn create_rtsp_bin(proxysink: &gst::Element, encode: &str) -> Result { + fn create_rtsp_bin(proxysink: &gst::Element, encode: &VideoEncodeType) -> Result { let description = match encode { - "H264" => { + VideoEncodeType::H264 => { concat!( "proxysrc name=ProxySrc message-forward=true", " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", @@ -117,7 +118,7 @@ impl RTSPServer { " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", ) } - "RAW" => { + VideoEncodeType::Yuyv => { concat!( "proxysrc name=ProxySrc", " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", @@ -125,7 +126,7 @@ impl RTSPServer { " ! rtpvrawpay name=pay0 pt=96", ) } - "JPEG" => { + VideoEncodeType::Mjpg => { concat!( "proxysrc name=ProxySrc", " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", @@ -155,11 +156,15 @@ impl RTSPServer { } #[instrument(level = "debug")] - pub fn add_pipeline(path: &str, proxysink: &gst::Element, encode: &str) -> Result<()> { + pub fn add_pipeline( + path: &str, + proxysink: &gst::Element, + encoding: &VideoEncodeType, + ) -> Result<()> { // Initialize the singleton before calling gst factory let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); - let rtsp_bin = Self::create_rtsp_bin(proxysink, encode)?; + let rtsp_bin = Self::create_rtsp_bin(proxysink, encoding)?; let factory = rtsp_media_factory::Factory::new(rtsp_bin); factory.set_shared(true); From bc7fa4500a2d346d2cb988bf25673a7c426641b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Thu, 20 Apr 2023 12:23:23 -0300 Subject: [PATCH 7/7] src: stream: rtsp_server: Partially fix RTSP creation (it's still problematic) --- src/stream/rtsp/rtsp_server.rs | 54 +++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/src/stream/rtsp/rtsp_server.rs b/src/stream/rtsp/rtsp_server.rs index dc1d4569..7df034e5 100644 --- a/src/stream/rtsp/rtsp_server.rs +++ b/src/stream/rtsp/rtsp_server.rs @@ -109,29 +109,39 @@ impl RTSPServer { #[instrument(level = "debug")] fn create_rtsp_bin(proxysink: &gst::Element, encode: &VideoEncodeType) -> Result { + let proxysrc_name = format!("proxysrc-{}", uuid::Uuid::new_v4()); let description = match encode { VideoEncodeType::H264 => { - concat!( - "proxysrc name=ProxySrc message-forward=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! rtph264depay", - " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", + format!( + concat!( + "proxysrc name={proxysrc_name}", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtph264depay", + " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", + ), + proxysrc_name=proxysrc_name ) } VideoEncodeType::Yuyv => { - concat!( - "proxysrc name=ProxySrc", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! rtpvrawdepay", - " ! rtpvrawpay name=pay0 pt=96", + format!( + concat!( + "proxysrc name={proxysrc_name}", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtpvrawdepay", + " ! rtpvrawpay name=pay0 pt=96", + ), + proxysrc_name = proxysrc_name ) } VideoEncodeType::Mjpg => { - concat!( - "proxysrc name=ProxySrc", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! rtpjpegdepay", - " ! rtpjpegpay name=pay0 pt=96", + format!( + concat!( + "proxysrc name={proxysrc_name}", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtpjpegdepay", + " ! rtpjpegpay name=pay0 pt=96", + ), + proxysrc_name = proxysrc_name ) } unsupported => { @@ -143,14 +153,11 @@ impl RTSPServer { debug!("RTSP Server description: {description:#?}"); - let rtsp_bin = gst::parse_bin_from_description(&description, true)?; - { - let proxysrc = rtsp_bin - .by_name("ProxySrc") - .expect("Failed to find proxysrc by name: wrong name?"); - proxysrc.set_property("proxysink", proxysink); - let _ = rtsp_bin.set_state(gst::State::Playing); - } + let rtsp_bin = gst::parse_bin_from_description(&description, false)?; + let proxysrc = rtsp_bin + .by_name(&proxysrc_name) + .expect("Failed to find proxysrc by name: wrong name?"); + proxysrc.set_property("proxysink", proxysink); Ok(rtsp_bin) } @@ -170,6 +177,7 @@ impl RTSPServer { factory.set_shared(true); factory.set_buffer_size(0); factory.set_latency(0u32); + factory.set_do_retransmission(false); factory.set_transport_mode(RTSPTransportMode::PLAY); factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST);