From c894b6ae7f042ce11aa4a306261b85dbe84472d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 18 Apr 2023 11:25:48 -0300 Subject: [PATCH] src: stream: Move from shm to proxy pair --- src/stream/pipeline/mod.rs | 15 ++++- src/stream/rtsp/rtsp_server.rs | 104 ++++++++++++++------------------- src/stream/sink/rtsp_sink.rs | 43 ++++++-------- 3 files changed, 77 insertions(+), 85 deletions(-) diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 13e6faca..090b5e72 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -206,10 +206,23 @@ impl PipelineState { .expect("No static sink pad found on capsfilter") .current_caps() .context("Failed to get caps from capsfilter sink pad")?; + let Some(encode) = caps + .iter() + .find_map(|structure| { + structure.iter().find_map(|(key, sendvalue)| { + if key == "encoding-name" { + Some(sendvalue.to_value().get::().expect("Failed accessing encoding-name parameter")) + } else { + None + } + }) + }) else { + return Err(anyhow!("Cannot find 'media' in caps")); + }; debug!("caps: {:#?}", caps.to_string()); - RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?; + RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &encode)?; RTSPServer::start_pipeline(&sink.path())?; } diff --git a/src/stream/rtsp/rtsp_server.rs b/src/stream/rtsp/rtsp_server.rs index 0e8f23fe..b2f347a4 100644 --- a/src/stream/rtsp/rtsp_server.rs +++ b/src/stream/rtsp/rtsp_server.rs @@ -7,13 +7,15 @@ use gst_rtsp::RTSPLowerTrans; use gst_rtsp_server::{prelude::*, RTSPTransportMode}; use tracing::*; +use crate::stream::rtsp::rtsp_media_factory; + #[allow(dead_code)] pub struct RTSPServer { pub server: gst_rtsp_server::RTSPServer, host: String, port: u16, run: bool, - pub path_to_factory: HashMap, + pub path_to_factory: HashMap, main_loop_thread: Option>, main_loop_thread_rx_channel: std::sync::mpsc::Receiver, } @@ -105,70 +107,30 @@ impl RTSPServer { } #[instrument(level = "debug")] - pub fn add_pipeline(path: &str, socket_path: &str, rtp_caps: &gst::Caps) -> Result<()> { - // Initialize the singleton before calling gst factory - let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); - - let factory = gst_rtsp_server::RTSPMediaFactory::new(); - factory.set_shared(true); - factory.set_buffer_size(0); - factory.set_latency(0u32); - factory.set_transport_mode(RTSPTransportMode::PLAY); - factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST); - - let Some(encode) = rtp_caps - .iter() - .find_map(|structure| { - structure.iter().find_map(|(key, sendvalue)| { - if key == "encoding-name" { - Some(sendvalue.to_value().get::().expect("Failed accessing encoding-name parameter")) - } else { - None - } - }) - }) else { - return Err(anyhow!("Cannot find 'media' in caps")); - }; - - let rtp_caps = rtp_caps.to_string(); - let description = match encode.as_str() { + fn create_rtsp_bin(proxysink: &gst::Element, encode: &str) -> Result { + let description = match encode { "H264" => { - format!( - concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", - " ! rtph264depay", - " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", - ), - socket_path = socket_path, - rtp_caps = rtp_caps, + concat!( + "proxysrc name=ProxySrc message-forward=true", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtph264depay", + " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", ) } "RAW" => { - format!( - concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", - " ! rtpvrawdepay", - " ! rtpvrawpay name=pay0 pt=96", - ), - socket_path = socket_path, - rtp_caps = rtp_caps, + concat!( + "proxysrc name=ProxySrc", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtpvrawdepay", + " ! rtpvrawpay name=pay0 pt=96", ) } "JPEG" => { - format!( - concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", - " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", - " ! rtpjpegdepay", - " ! rtpjpegpay name=pay0 pt=96", - ), - socket_path = socket_path, - rtp_caps = rtp_caps, + concat!( + "proxysrc name=ProxySrc", + " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", + " ! rtpjpegdepay", + " ! rtpjpegpay name=pay0 pt=96", ) } unsupported => { @@ -180,7 +142,31 @@ impl RTSPServer { debug!("RTSP Server description: {description:#?}"); - factory.set_launch(&description); + let rtsp_bin = gst::parse_bin_from_description(&description, true)?; + { + let proxysrc = rtsp_bin + .by_name("ProxySrc") + .expect("Failed to find proxysrc by name: wrong name?"); + proxysrc.set_property("proxysink", proxysink); + let _ = rtsp_bin.set_state(gst::State::Playing); + } + + Ok(rtsp_bin) + } + + #[instrument(level = "debug")] + pub fn add_pipeline(path: &str, proxysink: &gst::Element, encode: &str) -> Result<()> { + // Initialize the singleton before calling gst factory + let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); + + let rtsp_bin = Self::create_rtsp_bin(proxysink, encode)?; + + let factory = rtsp_media_factory::Factory::new(rtsp_bin); + factory.set_shared(true); + factory.set_buffer_size(0); + factory.set_latency(0u32); + factory.set_transport_mode(RTSPTransportMode::PLAY); + factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST); if let Some(server) = rtsp_server .path_to_factory diff --git a/src/stream/sink/rtsp_sink.rs b/src/stream/sink/rtsp_sink.rs index 914ec6b7..75221f1d 100644 --- a/src/stream/sink/rtsp_sink.rs +++ b/src/stream/sink/rtsp_sink.rs @@ -10,11 +10,10 @@ use super::SinkInterface; pub struct RtspSink { sink_id: uuid::Uuid, queue: gst::Element, - sink: gst::Element, + proxysink: gst::Element, sink_sink_pad: gst::Pad, tee_src_pad: Option, path: String, - socket_path: String, } impl SinkInterface for RtspSink { #[instrument(level = "debug", skip(self))] @@ -26,8 +25,6 @@ impl SinkInterface for RtspSink { ) -> Result<()> { let sink_id = &self.get_id(); - let _ = std::fs::remove_file(&self.socket_path); // Remove if already exists - // Set Tee's src pad if self.tee_src_pad.is_some() { return Err(anyhow!( @@ -55,7 +52,7 @@ impl SinkInterface for RtspSink { }; // Add the Sink elements to the Pipeline - let elements = &[&self.queue, &self.sink]; + let elements = &[&self.queue, &self.proxysink]; if let Err(add_err) = pipeline.add_many(elements) { let msg = format!("Failed to add WebRTCSink's elements to the Pipeline: {add_err:?}"); error!(msg); @@ -143,10 +140,6 @@ impl SinkInterface for RtspSink { #[instrument(level = "debug", skip(self))] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { - if let Err(error) = std::fs::remove_file(&self.socket_path) { - warn!("Failed removing the RTSP Sink socket file. Reason: {error:?}"); - } - let Some(tee_src_pad) = &self.tee_src_pad else { warn!("Tried to unlink Sink from a pipeline without a Tee src pad."); return Ok(()); @@ -180,7 +173,7 @@ impl SinkInterface for RtspSink { } // Remove the Sink's elements from the Source's pipeline - let elements = &[&self.queue, &self.sink]; + let elements = &[&self.queue, &self.proxysink]; if let Err(remove_err) = pipeline.remove_many(elements) { warn!("Failed removing RtspSink's elements from pipeline: {remove_err:?}"); } @@ -191,7 +184,7 @@ impl SinkInterface for RtspSink { } // Set Sink to null - if let Err(state_err) = self.sink.set_state(gst::State::Null) { + if let Err(state_err) = self.proxysink.set_state(gst::State::Null) { warn!("Failed to set RtspSink's to NULL: {state_err:#?}"); } @@ -227,34 +220,34 @@ impl RtspSink { "Failed to find RTSP compatible address. Example: \"rtsp://0.0.0.0:8554/test\"", )?; - let socket_path = format!("/tmp/{id}"); - let sink = gst::ElementFactory::make("shmsink") - .property_from_str("socket-path", &socket_path) - .property("sync", true) - .property("wait-for-connection", false) - .property("shm-size", 10_000_000u32) - .build()?; + let proxysink = gst::ElementFactory::make("proxysink").build()?; - let sink_sink_pad = sink.static_pad("sink").context("Failed to get Sink Pad")?; + let sink_sink_pad = proxysink + .static_pad("sink") + .context("Failed to get Sink Pad")?; Ok(Self { sink_id: id, queue, - sink, + proxysink, sink_sink_pad, path, - socket_path, tee_src_pad: Default::default(), }) } #[instrument(level = "trace", skip(self))] - pub fn path(&self) -> String { - self.path.clone() + pub fn proxysink(&self) -> &gst::Element { + &self.proxysink } + // #[instrument(level = "trace", skip(self))] + // pub fn link_proxy(&self, proxysrc: &gst::Element) { + // proxysrc.set_property("proxysink", &self.proxysink) + // } + #[instrument(level = "trace", skip(self))] - pub fn socket_path(&self) -> String { - self.socket_path.clone() + pub fn path(&self) -> String { + self.path.clone() } }