diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 13e6faca..cd2831f4 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -200,16 +200,7 @@ impl PipelineState { ); if let Sink::Rtsp(sink) = &sink { - let caps = &self - .sink_tee - .static_pad("sink") - .expect("No static sink pad found on capsfilter") - .current_caps() - .context("Failed to get caps from capsfilter sink pad")?; - - debug!("caps: {:#?}", caps.to_string()); - - RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?; + RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &sink.encoding())?; RTSPServer::start_pipeline(&sink.path())?; } diff --git a/src/stream/rtsp/mod.rs b/src/stream/rtsp/mod.rs index 598792ed..044a3618 100644 --- a/src/stream/rtsp/mod.rs +++ b/src/stream/rtsp/mod.rs @@ -1 +1,2 @@ +mod rtsp_media_factory; pub mod rtsp_server; diff --git a/src/stream/rtsp/rtsp_media_factory.rs b/src/stream/rtsp/rtsp_media_factory.rs new file mode 100644 index 00000000..3c70fe8e --- /dev/null +++ b/src/stream/rtsp/rtsp_media_factory.rs @@ -0,0 +1,78 @@ +/// /References: +/// 1 - https://gitlab.freedesktop.org/slomo/rtp-rapid-sync-example +/// 2 - https://github.com/gtk-rs/gtk3-rs/blob/master/examples/list_box_model/row_data/imp.rs +mod imp { + use std::sync::{Arc, Mutex}; + + use once_cell::sync::Lazy; + + use gst::{ + glib::{self, subclass::prelude::*, *}, + prelude::*, + }; + use gst_rtsp_server::subclass::prelude::*; + + // The actual data structure that stores our values. This is not accessible + // directly from the outside. + #[derive(Default)] + pub struct Factory { + bin: Arc>, + } + + // Basic declaration of our type for the GObject type system + #[glib::object_subclass] + impl ObjectSubclass for Factory { + const NAME: &'static str = "RTSPMediaFactoryFromBin"; + type Type = super::Factory; + type ParentType = gst_rtsp_server::RTSPMediaFactory; + } + + // The ObjectImpl trait provides the setters/getters for GObject properties. + // Here we need to provide the values that are internally stored back to the + // caller, or store whatever new value the caller is providing. + // + // This maps between the GObject properties and our internal storage of the + // corresponding values of the properties. + impl ObjectImpl for Factory { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecObject::builder::("bin") + .construct_only() + .build()] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "bin" => { + let bin = value.get().unwrap(); + self.bin.set(bin); + } + _ => unimplemented!(), + } + } + } + + impl RTSPMediaFactoryImpl for Factory { + // Create the custom stream producer bin. + fn create_element(&self, _url: &gst_rtsp::RTSPUrl) -> Option { + let bin = self.bin.lock().unwrap(); + bin.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "rtsp-bin-created"); + + Some(bin.to_owned().upcast()) + } + } +} + +gst::glib::wrapper! { + pub struct Factory(ObjectSubclass) @extends gst_rtsp_server::RTSPMediaFactory; +} + +// Trivial constructor for the media factory. +impl Factory { + pub fn new(bin: gst::Bin) -> Self { + gst::glib::Object::builder().property("bin", bin).build() + } +} diff --git a/src/stream/rtsp/rtsp_server.rs b/src/stream/rtsp/rtsp_server.rs index 0e8f23fe..7df034e5 100644 --- a/src/stream/rtsp/rtsp_server.rs +++ b/src/stream/rtsp/rtsp_server.rs @@ -7,13 +7,16 @@ use gst_rtsp::RTSPLowerTrans; use gst_rtsp_server::{prelude::*, RTSPTransportMode}; use tracing::*; +use crate::stream::rtsp::rtsp_media_factory; +use crate::video::types::VideoEncodeType; + #[allow(dead_code)] pub struct RTSPServer { pub server: gst_rtsp_server::RTSPServer, host: String, port: u16, run: bool, - pub path_to_factory: HashMap, + pub path_to_factory: HashMap, main_loop_thread: Option>, main_loop_thread_rx_channel: std::sync::mpsc::Receiver, } @@ -105,70 +108,40 @@ impl RTSPServer { } #[instrument(level = "debug")] - pub fn add_pipeline(path: &str, socket_path: &str, rtp_caps: &gst::Caps) -> Result<()> { - // Initialize the singleton before calling gst factory - let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); - - let factory = gst_rtsp_server::RTSPMediaFactory::new(); - factory.set_shared(true); - factory.set_buffer_size(0); - factory.set_latency(0u32); - factory.set_transport_mode(RTSPTransportMode::PLAY); - factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST); - - let Some(encode) = rtp_caps - .iter() - .find_map(|structure| { - structure.iter().find_map(|(key, sendvalue)| { - if key == "encoding-name" { - Some(sendvalue.to_value().get::().expect("Failed accessing encoding-name parameter")) - } else { - None - } - }) - }) else { - return Err(anyhow!("Cannot find 'media' in caps")); - }; - - let rtp_caps = rtp_caps.to_string(); - let description = match encode.as_str() { - "H264" => { + fn create_rtsp_bin(proxysink: &gst::Element, encode: &VideoEncodeType) -> Result { + let proxysrc_name = format!("proxysrc-{}", uuid::Uuid::new_v4()); + let description = match encode { + VideoEncodeType::H264 => { format!( concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", + "proxysrc name={proxysrc_name}", " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", " ! rtph264depay", " ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96", ), - socket_path = socket_path, - rtp_caps = rtp_caps, + proxysrc_name=proxysrc_name ) } - "RAW" => { + VideoEncodeType::Yuyv => { format!( concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", + "proxysrc name={proxysrc_name}", " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", " ! rtpvrawdepay", " ! rtpvrawpay name=pay0 pt=96", ), - socket_path = socket_path, - rtp_caps = rtp_caps, + proxysrc_name = proxysrc_name ) } - "JPEG" => { + VideoEncodeType::Mjpg => { format!( concat!( - "shmsrc socket-path={socket_path} do-timestamp=true", + "proxysrc name={proxysrc_name}", " ! queue leaky=downstream flush-on-eos=true max-size-buffers=0", - " ! capsfilter caps={rtp_caps:?}", " ! rtpjpegdepay", " ! rtpjpegpay name=pay0 pt=96", ), - socket_path = socket_path, - rtp_caps = rtp_caps, + proxysrc_name = proxysrc_name ) } unsupported => { @@ -180,7 +153,33 @@ impl RTSPServer { debug!("RTSP Server description: {description:#?}"); - factory.set_launch(&description); + let rtsp_bin = gst::parse_bin_from_description(&description, false)?; + let proxysrc = rtsp_bin + .by_name(&proxysrc_name) + .expect("Failed to find proxysrc by name: wrong name?"); + proxysrc.set_property("proxysink", proxysink); + + Ok(rtsp_bin) + } + + #[instrument(level = "debug")] + pub fn add_pipeline( + path: &str, + proxysink: &gst::Element, + encoding: &VideoEncodeType, + ) -> Result<()> { + // Initialize the singleton before calling gst factory + let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap(); + + let rtsp_bin = Self::create_rtsp_bin(proxysink, encoding)?; + + let factory = rtsp_media_factory::Factory::new(rtsp_bin); + factory.set_shared(true); + factory.set_buffer_size(0); + factory.set_latency(0u32); + factory.set_do_retransmission(false); + factory.set_transport_mode(RTSPTransportMode::PLAY); + factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST); if let Some(server) = rtsp_server .path_to_factory diff --git a/src/stream/sink/image_sink.rs b/src/stream/sink/image_sink.rs index 93a84b7e..2c9684e0 100644 --- a/src/stream/sink/image_sink.rs +++ b/src/stream/sink/image_sink.rs @@ -131,7 +131,7 @@ impl SinkInterface for ImageSink { .expect("No sink pad found on ProxySink"); if let Err(link_err) = queue_src_pad.link(proxysink_sink_pad) { let msg = - format!("Failed to link Queue's src pad with WebRTCBin's sink pad: {link_err:?}"); + format!("Failed to link Queue's src pad with ProxySink's sink pad: {link_err:?}"); error!(msg); if let Some(parent) = tee_src_pad.parent_element() { @@ -175,7 +175,7 @@ impl SinkInterface for ImageSink { error!(msg); if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { - error!("Failed to unlink Queue's src pad and ProxySink's sink pad: {unlink_err:?}"); + error!("Failed to unlink Tee's src pad and Queue's sink pad: {unlink_err:?}"); } if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { diff --git a/src/stream/sink/mod.rs b/src/stream/sink/mod.rs index b5c500f3..5106f5c0 100644 --- a/src/stream/sink/mod.rs +++ b/src/stream/sink/mod.rs @@ -16,6 +16,8 @@ use anyhow::{anyhow, Result}; use tracing::*; +use super::types::CaptureConfiguration; + #[enum_dispatch] pub trait SinkInterface { /// Link this Sink's sink pad to the given Pipelines's Tee element's src pad. @@ -72,7 +74,15 @@ pub fn create_rtsp_sink( .endpoints .clone(); - Ok(Sink::Rtsp(RtspSink::try_new(id, addresses)?)) + let encoding = match &video_and_stream_information + .stream_information + .configuration + { + CaptureConfiguration::Video(configuration) => configuration.encode.clone(), + _ => unreachable!(), + }; + + Ok(Sink::Rtsp(RtspSink::try_new(id, addresses, encoding)?)) } #[instrument(level = "debug")] diff --git a/src/stream/sink/rtsp_sink.rs b/src/stream/sink/rtsp_sink.rs index 914ec6b7..53eefc17 100644 --- a/src/stream/sink/rtsp_sink.rs +++ b/src/stream/sink/rtsp_sink.rs @@ -4,17 +4,19 @@ use tracing::*; use gst::prelude::*; +use crate::video::types::VideoEncodeType; + use super::SinkInterface; #[derive(Debug)] pub struct RtspSink { sink_id: uuid::Uuid, queue: gst::Element, - sink: gst::Element, + proxysink: gst::Element, sink_sink_pad: gst::Pad, tee_src_pad: Option, + encoding: VideoEncodeType, path: String, - socket_path: String, } impl SinkInterface for RtspSink { #[instrument(level = "debug", skip(self))] @@ -26,8 +28,6 @@ impl SinkInterface for RtspSink { ) -> Result<()> { let sink_id = &self.get_id(); - let _ = std::fs::remove_file(&self.socket_path); // Remove if already exists - // Set Tee's src pad if self.tee_src_pad.is_some() { return Err(anyhow!( @@ -55,7 +55,7 @@ impl SinkInterface for RtspSink { }; // Add the Sink elements to the Pipeline - let elements = &[&self.queue, &self.sink]; + let elements = &[&self.queue, &self.proxysink]; if let Err(add_err) = pipeline.add_many(elements) { let msg = format!("Failed to add WebRTCSink's elements to the Pipeline: {add_err:?}"); error!(msg); @@ -143,10 +143,6 @@ impl SinkInterface for RtspSink { #[instrument(level = "debug", skip(self))] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { - if let Err(error) = std::fs::remove_file(&self.socket_path) { - warn!("Failed removing the RTSP Sink socket file. Reason: {error:?}"); - } - let Some(tee_src_pad) = &self.tee_src_pad else { warn!("Tried to unlink Sink from a pipeline without a Tee src pad."); return Ok(()); @@ -180,7 +176,7 @@ impl SinkInterface for RtspSink { } // Remove the Sink's elements from the Source's pipeline - let elements = &[&self.queue, &self.sink]; + let elements = &[&self.queue, &self.proxysink]; if let Err(remove_err) = pipeline.remove_many(elements) { warn!("Failed removing RtspSink's elements from pipeline: {remove_err:?}"); } @@ -191,7 +187,7 @@ impl SinkInterface for RtspSink { } // Set Sink to null - if let Err(state_err) = self.sink.set_state(gst::State::Null) { + if let Err(state_err) = self.proxysink.set_state(gst::State::Null) { warn!("Failed to set RtspSink's to NULL: {state_err:#?}"); } @@ -213,7 +209,11 @@ impl SinkInterface for RtspSink { impl RtspSink { #[instrument(level = "debug")] - pub fn try_new(id: uuid::Uuid, addresses: Vec) -> Result { + pub fn try_new( + id: uuid::Uuid, + addresses: Vec, + encoding: VideoEncodeType, + ) -> Result { let queue = gst::ElementFactory::make("queue") .property_from_str("leaky", "downstream") // Throw away any data .property("flush-on-eos", true) @@ -227,34 +227,35 @@ impl RtspSink { "Failed to find RTSP compatible address. Example: \"rtsp://0.0.0.0:8554/test\"", )?; - let socket_path = format!("/tmp/{id}"); - let sink = gst::ElementFactory::make("shmsink") - .property_from_str("socket-path", &socket_path) - .property("sync", true) - .property("wait-for-connection", false) - .property("shm-size", 10_000_000u32) - .build()?; + let proxysink = gst::ElementFactory::make("proxysink").build()?; - let sink_sink_pad = sink.static_pad("sink").context("Failed to get Sink Pad")?; + let sink_sink_pad = proxysink + .static_pad("sink") + .context("Failed to get Sink Pad")?; Ok(Self { sink_id: id, queue, - sink, + proxysink, sink_sink_pad, + encoding, path, - socket_path, tee_src_pad: Default::default(), }) } #[instrument(level = "trace", skip(self))] - pub fn path(&self) -> String { - self.path.clone() + pub fn proxysink(&self) -> &gst::Element { + &self.proxysink + } + + #[instrument(level = "trace", skip(self))] + pub fn encoding(&self) -> VideoEncodeType { + self.encoding.clone() } #[instrument(level = "trace", skip(self))] - pub fn socket_path(&self) -> String { - self.socket_path.clone() + pub fn path(&self) -> String { + self.path.clone() } } diff --git a/src/stream/sink/udp_sink.rs b/src/stream/sink/udp_sink.rs index c203a25b..d2f3f33d 100644 --- a/src/stream/sink/udp_sink.rs +++ b/src/stream/sink/udp_sink.rs @@ -79,7 +79,7 @@ impl SinkInterface for UdpSink { .expect("No sink pad found on ProxySink"); if let Err(link_err) = queue_src_pad.link(proxysink_sink_pad) { let msg = - format!("Failed to link Queue's src pad with WebRTCBin's sink pad: {link_err:?}"); + format!("Failed to link Queue's src pad with ProxySink's sink pad: {link_err:?}"); error!(msg); if let Some(parent) = tee_src_pad.parent_element() { @@ -122,8 +122,8 @@ impl SinkInterface for UdpSink { let msg = format!("Failed to synchronize children states: {sync_err:?}"); error!(msg); - if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) { - error!("Failed to unlink Queue's src pad and ProxySink's sink pad: {unlink_err:?}"); + if let Err(unlink_err) = tee_src_pad.unlink(queue_sink_pad) { + error!("Failed to unlink Tee's src pad and Queue's sink pad: {unlink_err:?}"); } if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) {