diff --git a/Cargo.toml b/Cargo.toml index 56705b3..c14f664 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ workspace = { members = ["rszlm-sys"] } [package] name = "rszlm" -version = "0.1.2" +version = "0.1.3" edition = "2021" authors = ["shiben. "] description = "ZLMediaKit rust api" @@ -19,6 +19,11 @@ anyhow = "1" [dev-dependencies] tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["full"] } +axum = "0.7" +hyper = { version = "1", features = ["full"] } +hyper-util = { version = "0.1", features = ["client-legacy"] } +http-body-util = { version = "0.1" } +futures-util = { version = "0.3", features = ["io"] } [features] default = [] diff --git a/README.md b/README.md index 2e7072e..d1bc0dc 100644 --- a/README.md +++ b/README.md @@ -6,14 +6,17 @@ ZLMediaKit rust api - 依赖 - 1. [rust-bindgen](https://github.com/rust-lang/rust-bindgen)需要安装`Clang` + 1. [rust-bindgen](https://github.com/rust-lang/rust-bindgen)需要安装`Clang`环境 - 参考链接:https://rust-lang.github.io/rust-bindgen/requirements.html 2. [ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit) - - 参考链接:https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E5%BF%AB%E9%80%9F%E5%BC%80%E5%A7%8B +- 编译好并且 install`ZLMediaKit`之后,设置环境变量`ZLM_DIR`,该路径包含 ZLMediaKit 的`include`和`lib`以及`bin`文件夹,并将`bin`设置到`PATH`环境变量中 + + 如果不设置`ZLM_DIR`环境变量,rszlm-sys 会拉取`ZLMediaKit`git 代码进行编译 + - 编译 ```shell diff --git a/examples/server.rs b/examples/server.rs index 18bbf6c..21f6eda 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,5 +1,9 @@ use std::collections::HashMap; +use axum::response::IntoResponse; +use axum::{body::Body, routing::get, Router}; +use futures_util::StreamExt; +use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor}; use once_cell::sync::{Lazy, OnceCell}; use rszlm::{ event::EVENTS, @@ -7,7 +11,7 @@ use rszlm::{ player::ProxyPlayerBuilder, server::{http_server_start, rtmp_server_start, rtsp_server_start, stop_all_server}, }; -use tokio::sync::RwLock; +use tokio::{runtime::Handle, sync::RwLock}; use tokio_util::sync::CancellationToken; static PULL_PROXY_MESSAGE: OnceCell> = OnceCell::new(); @@ -21,6 +25,15 @@ pub(crate) async fn pull_proxy_message(msg: ProxyMessageCmd) { } } +const AXUM_PORT: u16 = 8552; + +type Client = hyper_util::client::legacy::Client; + +static CLIENT: Lazy = Lazy::new(|| { + hyper_util::client::legacy::Client::<(), ()>::builder(TokioExecutor::new()) + .build(HttpConnector::new()) +}); + #[tokio::main] async fn main() { let cancel = CancellationToken::new(); @@ -32,6 +45,9 @@ async fn main() { let cancel_clone = cancel.clone(); tokio::spawn(start(cancel_clone)); + let cancel_clone = cancel.clone(); + tokio::spawn(axum_start(AXUM_PORT, cancel_clone)); + tokio::signal::ctrl_c().await.unwrap(); cancel.cancel(); } @@ -154,7 +170,8 @@ async fn zlm_start(cancel: CancellationToken) -> anyhow::Result<()> { let cancel_clone = cancel.clone(); let (tx, mut rx) = tokio::sync::mpsc::channel::(100); - let _ = start_zlm_background(cancel_clone, tx); + let runtime = Handle::current(); + let _ = start_zlm_background(cancel_clone, tx, runtime); loop { tokio::select! { @@ -173,6 +190,7 @@ async fn zlm_start(cancel: CancellationToken) -> anyhow::Result<()> { fn start_zlm_background( cancel: CancellationToken, tx: tokio::sync::mpsc::Sender, + runtime: tokio::runtime::Handle, ) -> anyhow::Result<()> { tokio::task::spawn_blocking(move || { EnvInitBuilder::default() @@ -230,6 +248,64 @@ fn start_zlm_background( stream: msg.sender.stream(), })); }); + + events.on_http_request(move |msg| { + let url = msg.parser.url(); + + if url.starts_with("/test") { + let headers = vec!["Content-Type".to_string(), "text/plain".to_string()]; + let body = "hello world"; + msg.invoker.invoke(200, headers, body); + return true; + } else if url.starts_with("/proxy") { + let path = &url["/proxy".len()..]; + let query = msg.parser.query_str(); + let path_query = if query.is_empty() { + path.to_owned() + } else { + format!("{}?{}", path, query) + }; + + let uri = format!("http://127.0.0.1:{}{}", AXUM_PORT, path_query); + if let Ok(req) = hyper::Request::builder() + .method(msg.parser.method().as_str()) + .uri(uri) + .body(Body::from(msg.parser.body())) + { + // TODO copy request headers + let resp = runtime.block_on(async move { + CLIENT + .request(req) + .await + .map_err(|_| hyper::StatusCode::BAD_REQUEST) + .into_response() + }); + let status = resp.status(); + let header = resp + .headers() + .iter() + .map(|(k, v)| vec![k.to_string(), v.to_str().unwrap().to_string()]) + .flatten() + .collect::>(); + let body = resp.into_body(); + let body_str = runtime + .block_on(async move { + let mut b = body.into_data_stream(); + let mut data = String::new(); + while let Some(chunk) = b.next().await { + data.push_str(&String::from_utf8_lossy(&chunk?)); + } + anyhow::Ok(data) + }) + .unwrap(); + + msg.invoker + .invoke(status.as_u16() as i32, header, &body_str); + return true; + } + } + false + }); } loop { @@ -246,3 +322,22 @@ fn start_zlm_background( Ok(()) } + +async fn axum_start(port: u16, cancel: CancellationToken) -> anyhow::Result<()> { + let app = Router::new() + .route("/", get(|| async { "Hello, axum world!" })) + .route("/test", get(|| async { "Hello, axum test!" })); + + let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)).await?; + println!("listening on {}", listener.local_addr()?); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + tokio::select! { + _ = cancel.cancelled() => { + println!("cancel"); + }, + } + }) + .await + .map_err(|e| e.into()) +} diff --git a/rszlm-sys/Cargo.toml b/rszlm-sys/Cargo.toml index da817f7..1212121 100644 --- a/rszlm-sys/Cargo.toml +++ b/rszlm-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rszlm-sys" -version = "0.1.1" +version = "0.1.2" edition = "2021" authors = ["shiben. "] description = "ZLMediaKit rust ffi api" @@ -8,6 +8,9 @@ repository = "https://github.com/BenLocal/rszlm" keywords = ["zlm", "zlmediakit", "rszlm", "rszlm-sys"] license = "MIT" +[lib] +doctest = false + [dependencies] openssl-sys = { version = "0.9", optional = true } diff --git a/src/lib.rs b/src/lib.rs index e6fe15f..ac9d128 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,12 +44,11 @@ macro_rules! box_to_mut_void_ptr { pub(crate) fn as_cstr_array> + Clone>( arr: &[T], ) -> *mut *const ::std::os::raw::c_char { - let c_strings = arr + let mut tmp: Vec<_> = arr .iter() .map(|s| std::ffi::CString::new::(s.to_owned()).unwrap()) + .map(|f| f.as_ptr()) .collect::>(); - let c_char_pointers = c_strings.iter().map(|s| s.as_ptr()).collect::>(); - let mut c_char_pointers_with_null = c_char_pointers.clone(); - c_char_pointers_with_null.push(std::ptr::null()); - c_char_pointers_with_null.as_mut_ptr() + tmp.push(std::ptr::null_mut()); + tmp.as_mut_ptr() }