diff --git a/src/lib/stream/pipeline/mod.rs b/src/lib/stream/pipeline/mod.rs index 3f304d93..c4b271fa 100644 --- a/src/lib/stream/pipeline/mod.rs +++ b/src/lib/stream/pipeline/mod.rs @@ -1,4 +1,5 @@ pub mod fake_pipeline; +pub mod onvif_pipeline; pub mod qr_pipeline; pub mod redirect_pipeline; pub mod runner; @@ -23,6 +24,7 @@ use crate::{ }; use fake_pipeline::FakePipeline; +use onvif_pipeline::OnvifPipeline; use qr_pipeline::QrPipeline; use redirect_pipeline::RedirectPipeline; use runner::PipelineRunner; @@ -42,6 +44,7 @@ pub enum Pipeline { V4l(V4lPipeline), Fake(FakePipeline), QR(QrPipeline), + Onvif(OnvifPipeline), Redirect(RedirectPipeline), } @@ -52,6 +55,7 @@ impl Pipeline { Pipeline::V4l(pipeline) => &mut pipeline.state, Pipeline::Fake(pipeline) => &mut pipeline.state, Pipeline::QR(pipeline) => &mut pipeline.state, + Pipeline::Onvif(pipeline) => &mut pipeline.state, Pipeline::Redirect(pipeline) => &mut pipeline.state, } } @@ -62,6 +66,7 @@ impl Pipeline { Pipeline::V4l(pipeline) => &pipeline.state, Pipeline::Fake(pipeline) => &pipeline.state, Pipeline::QR(pipeline) => &pipeline.state, + Pipeline::Onvif(pipeline) => &pipeline.state, Pipeline::Redirect(pipeline) => &pipeline.state, } } @@ -93,6 +98,9 @@ impl Pipeline { }), #[cfg(not(target_os = "linux"))] VideoSourceType::Local(_) => unreachable!("Local is only supported on linux"), + VideoSourceType::Onvif(_) => Pipeline::Onvif(OnvifPipeline { + state: pipeline_state, + }), VideoSourceType::Redirect(_) => Pipeline::Redirect(RedirectPipeline { state: pipeline_state, }), @@ -149,6 +157,9 @@ impl PipelineState { VideoSourceType::Local(_) => { unreachable!("Local source only supported on linux"); } + VideoSourceType::Onvif(_) => { + OnvifPipeline::try_new(pipeline_id, video_and_stream_information) + } VideoSourceType::Redirect(_) => { RedirectPipeline::try_new(pipeline_id, video_and_stream_information) } diff --git a/src/lib/stream/pipeline/onvif_pipeline.rs b/src/lib/stream/pipeline/onvif_pipeline.rs new file mode 100644 index 00000000..28b5ea9c --- /dev/null +++ b/src/lib/stream/pipeline/onvif_pipeline.rs @@ -0,0 +1,128 @@ +use anyhow::{anyhow, Result}; +use gst::prelude::*; +use tracing::*; + +use crate::{ + stream::types::CaptureConfiguration, + video::{ + types::{VideoEncodeType, VideoSourceType}, + video_source_onvif::VideoSourceOnvifType, + }, + video_stream::types::VideoAndStreamInformation, +}; + +use super::{ + PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME, + PIPELINE_VIDEO_TEE_NAME, +}; + +#[derive(Debug)] +pub struct OnvifPipeline { + pub state: PipelineState, +} + +impl OnvifPipeline { + #[instrument(level = "debug")] + pub fn try_new( + pipeline_id: &uuid::Uuid, + video_and_stream_information: &VideoAndStreamInformation, + ) -> Result { + match &video_and_stream_information + .stream_information + .configuration + { + CaptureConfiguration::Video(configuration) => configuration, + unsupported => { + return Err(anyhow!( + "{unsupported:?} is not supported as Onvif Pipeline" + )) + } + }; + + let video_source = match &video_and_stream_information.video_source { + VideoSourceType::Onvif(source) => source, + unsupported => { + return Err(anyhow!( + "SourceType {unsupported:?} is not supported as V4l Pipeline" + )) + } + }; + + let location = { + let VideoSourceOnvifType::Onvif(url) = &video_source.source; + url.to_string() + }; + + let encode = match &video_and_stream_information + .stream_information + .configuration + { + CaptureConfiguration::Video(configuration) => Some(configuration.encode.clone()), + _unknown => None, + }; + + let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}"); + let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}"); + let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}"); + + let description = match encode { + Some(VideoEncodeType::H264) => { + format!( + concat!( + "rtspsrc location={location} is-live=true latency=0", + " ! application/x-rtp", + " ! rtph264depay", + // " ! h264parse", // we might want to add this in the future to expand the compatibility, since it can transform the stream format + " ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au", + " ! tee name={video_tee_name} allow-not-linked=true", + " ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96", + " ! tee name={rtp_tee_name} allow-not-linked=true" + ), + location = location, + filter_name = filter_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, + ) + } + Some(VideoEncodeType::H265) => { + format!( + concat!( + "rtspsrc location={location} is-live=true latency=0", + " ! application/x-rtp", + " ! rtph265depay", + // " ! h265parse", // we might want to add this in the future to expand the compatibility, since it can transform the stream format + " ! capsfilter name={filter_name} caps=video/x-h265,profile={profile},stream-format=byte-stream,alignment=au", + " ! tee name={video_tee_name} allow-not-linked=true", + " ! rtph265pay aggregate-mode=zero-latency config-interval=10 pt=96", + " ! tee name={rtp_tee_name} allow-not-linked=true" + ), + location = location, + filter_name = filter_name, + video_tee_name = video_tee_name, + profile = "main", + rtp_tee_name = rtp_tee_name, + ) + } + unsupported => { + return Err(anyhow!( + "Encode {unsupported:?} is not supported for Onvif Pipeline" + )) + } + }; + + let pipeline = gst::parse::launch(&description)?; + + let pipeline = pipeline + .downcast::() + .expect("Couldn't downcast pipeline"); + + Ok(pipeline) + } +} + +impl PipelineGstreamerInterface for OnvifPipeline { + #[instrument(level = "trace")] + fn is_running(&self) -> bool { + self.state.pipeline_runner.is_running() + } +}