Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom RTSP Media Factory with Proxy pair #212

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
11 changes: 1 addition & 10 deletions src/stream/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,7 @@ impl PipelineState {
);

if let Sink::Rtsp(sink) = &sink {
let caps = &self
.sink_tee
.static_pad("sink")
.expect("No static sink pad found on capsfilter")
.current_caps()
.context("Failed to get caps from capsfilter sink pad")?;

debug!("caps: {:#?}", caps.to_string());

RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?;
RTSPServer::add_pipeline(&sink.path(), sink.proxysink(), &sink.encoding())?;

RTSPServer::start_pipeline(&sink.path())?;
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/rtsp/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
mod rtsp_media_factory;
pub mod rtsp_server;
78 changes: 78 additions & 0 deletions src/stream/rtsp/rtsp_media_factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/// /References:
/// 1 - https://gitlab.freedesktop.org/slomo/rtp-rapid-sync-example
/// 2 - https://github.com/gtk-rs/gtk3-rs/blob/master/examples/list_box_model/row_data/imp.rs
mod imp {
use std::sync::{Arc, Mutex};

use once_cell::sync::Lazy;

use gst::{
glib::{self, subclass::prelude::*, *},
prelude::*,
};
use gst_rtsp_server::subclass::prelude::*;

// The actual data structure that stores our values. This is not accessible
// directly from the outside.
#[derive(Default)]
pub struct Factory {
bin: Arc<Mutex<gst::Bin>>,
}

// Basic declaration of our type for the GObject type system
#[glib::object_subclass]
impl ObjectSubclass for Factory {
const NAME: &'static str = "RTSPMediaFactoryFromBin";
type Type = super::Factory;
type ParentType = gst_rtsp_server::RTSPMediaFactory;
}

// The ObjectImpl trait provides the setters/getters for GObject properties.
// Here we need to provide the values that are internally stored back to the
// caller, or store whatever new value the caller is providing.
//
// This maps between the GObject properties and our internal storage of the
// corresponding values of the properties.
impl ObjectImpl for Factory {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecObject::builder::<gst::Bin>("bin")
.construct_only()
.build()]
});

PROPERTIES.as_ref()
}

fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"bin" => {
let bin = value.get().unwrap();
self.bin.set(bin);
}
_ => unimplemented!(),
}
}
}

impl RTSPMediaFactoryImpl for Factory {
// Create the custom stream producer bin.
fn create_element(&self, _url: &gst_rtsp::RTSPUrl) -> Option<gst::Element> {
let bin = self.bin.lock().unwrap();
bin.debug_to_dot_file_with_ts(gst::DebugGraphDetails::all(), "rtsp-bin-created");

Some(bin.to_owned().upcast())
}
}
}

gst::glib::wrapper! {
pub struct Factory(ObjectSubclass<imp::Factory>) @extends gst_rtsp_server::RTSPMediaFactory;
}

// Trivial constructor for the media factory.
impl Factory {
pub fn new(bin: gst::Bin) -> Self {
gst::glib::Object::builder().property("bin", bin).build()
}
}
87 changes: 43 additions & 44 deletions src/stream/rtsp/rtsp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use gst_rtsp::RTSPLowerTrans;
use gst_rtsp_server::{prelude::*, RTSPTransportMode};
use tracing::*;

use crate::stream::rtsp::rtsp_media_factory;
use crate::video::types::VideoEncodeType;

#[allow(dead_code)]
pub struct RTSPServer {
pub server: gst_rtsp_server::RTSPServer,
host: String,
port: u16,
run: bool,
pub path_to_factory: HashMap<String, gst_rtsp_server::RTSPMediaFactory>,
pub path_to_factory: HashMap<String, rtsp_media_factory::Factory>,
main_loop_thread: Option<std::thread::JoinHandle<()>>,
main_loop_thread_rx_channel: std::sync::mpsc::Receiver<String>,
}
Expand Down Expand Up @@ -105,70 +108,40 @@ impl RTSPServer {
}

#[instrument(level = "debug")]
pub fn add_pipeline(path: &str, socket_path: &str, rtp_caps: &gst::Caps) -> Result<()> {
// Initialize the singleton before calling gst factory
let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap();

let factory = gst_rtsp_server::RTSPMediaFactory::new();
factory.set_shared(true);
factory.set_buffer_size(0);
factory.set_latency(0u32);
factory.set_transport_mode(RTSPTransportMode::PLAY);
factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST);

let Some(encode) = rtp_caps
.iter()
.find_map(|structure| {
structure.iter().find_map(|(key, sendvalue)| {
if key == "encoding-name" {
Some(sendvalue.to_value().get::<String>().expect("Failed accessing encoding-name parameter"))
} else {
None
}
})
}) else {
return Err(anyhow!("Cannot find 'media' in caps"));
};

let rtp_caps = rtp_caps.to_string();
let description = match encode.as_str() {
"H264" => {
fn create_rtsp_bin(proxysink: &gst::Element, encode: &VideoEncodeType) -> Result<gst::Bin> {
let proxysrc_name = format!("proxysrc-{}", uuid::Uuid::new_v4());
let description = match encode {
VideoEncodeType::H264 => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
"proxysrc name={proxysrc_name}",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtph264depay",
" ! rtph264pay name=pay0 aggregate-mode=zero-latency config-interval=10 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
proxysrc_name=proxysrc_name
)
}
"RAW" => {
VideoEncodeType::Yuyv => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
"proxysrc name={proxysrc_name}",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtpvrawdepay",
" ! rtpvrawpay name=pay0 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
proxysrc_name = proxysrc_name
)
}
"JPEG" => {
VideoEncodeType::Mjpg => {
format!(
concat!(
"shmsrc socket-path={socket_path} do-timestamp=true",
"proxysrc name={proxysrc_name}",
" ! queue leaky=downstream flush-on-eos=true max-size-buffers=0",
" ! capsfilter caps={rtp_caps:?}",
" ! rtpjpegdepay",
" ! rtpjpegpay name=pay0 pt=96",
),
socket_path = socket_path,
rtp_caps = rtp_caps,
proxysrc_name = proxysrc_name
)
}
unsupported => {
Expand All @@ -180,7 +153,33 @@ impl RTSPServer {

debug!("RTSP Server description: {description:#?}");

factory.set_launch(&description);
let rtsp_bin = gst::parse_bin_from_description(&description, false)?;
let proxysrc = rtsp_bin
.by_name(&proxysrc_name)
.expect("Failed to find proxysrc by name: wrong name?");
proxysrc.set_property("proxysink", proxysink);

Ok(rtsp_bin)
}

#[instrument(level = "debug")]
pub fn add_pipeline(
path: &str,
proxysink: &gst::Element,
encoding: &VideoEncodeType,
) -> Result<()> {
// Initialize the singleton before calling gst factory
let mut rtsp_server = RTSP_SERVER.as_ref().lock().unwrap();

let rtsp_bin = Self::create_rtsp_bin(proxysink, encoding)?;

let factory = rtsp_media_factory::Factory::new(rtsp_bin);
factory.set_shared(true);
factory.set_buffer_size(0);
factory.set_latency(0u32);
factory.set_do_retransmission(false);
factory.set_transport_mode(RTSPTransportMode::PLAY);
factory.set_protocols(RTSPLowerTrans::UDP | RTSPLowerTrans::UDP_MCAST);

if let Some(server) = rtsp_server
.path_to_factory
Expand Down
4 changes: 2 additions & 2 deletions src/stream/sink/image_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl SinkInterface for ImageSink {
.expect("No sink pad found on ProxySink");
if let Err(link_err) = queue_src_pad.link(proxysink_sink_pad) {
let msg =
format!("Failed to link Queue's src pad with WebRTCBin's sink pad: {link_err:?}");
format!("Failed to link Queue's src pad with ProxySink's sink pad: {link_err:?}");
error!(msg);

if let Some(parent) = tee_src_pad.parent_element() {
Expand Down Expand Up @@ -175,7 +175,7 @@ impl SinkInterface for ImageSink {
error!(msg);

if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) {
error!("Failed to unlink Queue's src pad and ProxySink's sink pad: {unlink_err:?}");
error!("Failed to unlink Tee's src pad and Queue's sink pad: {unlink_err:?}");
}

if let Err(unlink_err) = queue_src_pad.unlink(proxysink_sink_pad) {
Expand Down
12 changes: 11 additions & 1 deletion src/stream/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use anyhow::{anyhow, Result};

use tracing::*;

use super::types::CaptureConfiguration;

#[enum_dispatch]
pub trait SinkInterface {
/// Link this Sink's sink pad to the given Pipelines's Tee element's src pad.
Expand Down Expand Up @@ -72,7 +74,15 @@ pub fn create_rtsp_sink(
.endpoints
.clone();

Ok(Sink::Rtsp(RtspSink::try_new(id, addresses)?))
let encoding = match &video_and_stream_information
.stream_information
.configuration
{
CaptureConfiguration::Video(configuration) => configuration.encode.clone(),
_ => unreachable!(),
};

Ok(Sink::Rtsp(RtspSink::try_new(id, addresses, encoding)?))
}

#[instrument(level = "debug")]
Expand Down
Loading