Skip to content

Commit

Permalink
add http proxy handler
Browse files Browse the repository at this point in the history
add http proxy handler
  • Loading branch information
BenLocal committed Jun 27, 2024
1 parent b4a8f77 commit 048769d
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 11 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ workspace = { members = ["rszlm-sys"] }

[package]
name = "rszlm"
version = "0.1.2"
version = "0.1.3"
edition = "2021"
authors = ["shiben. <[email protected]>"]
description = "ZLMediaKit rust api"
Expand All @@ -19,6 +19,11 @@ anyhow = "1"
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
axum = "0.7"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy"] }
http-body-util = { version = "0.1" }
futures-util = { version = "0.3", features = ["io"] }

[features]
default = []
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ ZLMediaKit rust api

- 依赖

1. [rust-bindgen](https://github.com/rust-lang/rust-bindgen)需要安装`Clang`
1. [rust-bindgen](https://github.com/rust-lang/rust-bindgen)需要安装`Clang`环境

- 参考链接:https://rust-lang.github.io/rust-bindgen/requirements.html

2. [ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit)

- 参考链接:https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E5%BF%AB%E9%80%9F%E5%BC%80%E5%A7%8B

- 编译好并且 install`ZLMediaKit`之后,设置环境变量`ZLM_DIR`,该路径包含 ZLMediaKit 的`include``lib`以及`bin`文件夹,并将`bin`设置到`PATH`环境变量中

如果不设置`ZLM_DIR`环境变量,rszlm-sys 会拉取`ZLMediaKit`git 代码进行编译

- 编译

```shell
Expand Down
99 changes: 97 additions & 2 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::collections::HashMap;

use axum::response::IntoResponse;
use axum::{body::Body, routing::get, Router};
use futures_util::StreamExt;
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
use once_cell::sync::{Lazy, OnceCell};
use rszlm::{
event::EVENTS,
init::{EnvIni, EnvInitBuilder},
player::ProxyPlayerBuilder,
server::{http_server_start, rtmp_server_start, rtsp_server_start, stop_all_server},
};
use tokio::sync::RwLock;
use tokio::{runtime::Handle, sync::RwLock};
use tokio_util::sync::CancellationToken;

static PULL_PROXY_MESSAGE: OnceCell<tokio::sync::mpsc::Sender<ProxyMessageCmd>> = OnceCell::new();
Expand All @@ -21,6 +25,15 @@ pub(crate) async fn pull_proxy_message(msg: ProxyMessageCmd) {
}
}

const AXUM_PORT: u16 = 8552;

type Client = hyper_util::client::legacy::Client<HttpConnector, Body>;

static CLIENT: Lazy<Client> = Lazy::new(|| {
hyper_util::client::legacy::Client::<(), ()>::builder(TokioExecutor::new())
.build(HttpConnector::new())
});

#[tokio::main]
async fn main() {
let cancel = CancellationToken::new();
Expand All @@ -32,6 +45,9 @@ async fn main() {
let cancel_clone = cancel.clone();
tokio::spawn(start(cancel_clone));

let cancel_clone = cancel.clone();
tokio::spawn(axum_start(AXUM_PORT, cancel_clone));

tokio::signal::ctrl_c().await.unwrap();
cancel.cancel();
}
Expand Down Expand Up @@ -154,7 +170,8 @@ async fn zlm_start(cancel: CancellationToken) -> anyhow::Result<()> {
let cancel_clone = cancel.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ProxyMessageCmd>(100);

let _ = start_zlm_background(cancel_clone, tx);
let runtime = Handle::current();
let _ = start_zlm_background(cancel_clone, tx, runtime);

loop {
tokio::select! {
Expand All @@ -173,6 +190,7 @@ async fn zlm_start(cancel: CancellationToken) -> anyhow::Result<()> {
fn start_zlm_background(
cancel: CancellationToken,
tx: tokio::sync::mpsc::Sender<ProxyMessageCmd>,
runtime: tokio::runtime::Handle,
) -> anyhow::Result<()> {
tokio::task::spawn_blocking(move || {
EnvInitBuilder::default()
Expand Down Expand Up @@ -230,6 +248,64 @@ fn start_zlm_background(
stream: msg.sender.stream(),
}));
});

events.on_http_request(move |msg| {
let url = msg.parser.url();

if url.starts_with("/test") {
let headers = vec!["Content-Type".to_string(), "text/plain".to_string()];
let body = "hello world";
msg.invoker.invoke(200, headers, body);
return true;
} else if url.starts_with("/proxy") {
let path = &url["/proxy".len()..];
let query = msg.parser.query_str();
let path_query = if query.is_empty() {
path.to_owned()
} else {
format!("{}?{}", path, query)
};

let uri = format!("http://127.0.0.1:{}{}", AXUM_PORT, path_query);
if let Ok(req) = hyper::Request::builder()
.method(msg.parser.method().as_str())
.uri(uri)
.body(Body::from(msg.parser.body()))
{
// TODO copy request headers
let resp = runtime.block_on(async move {
CLIENT
.request(req)
.await
.map_err(|_| hyper::StatusCode::BAD_REQUEST)
.into_response()
});
let status = resp.status();
let header = resp
.headers()
.iter()
.map(|(k, v)| vec![k.to_string(), v.to_str().unwrap().to_string()])
.flatten()
.collect::<Vec<_>>();
let body = resp.into_body();
let body_str = runtime
.block_on(async move {
let mut b = body.into_data_stream();
let mut data = String::new();
while let Some(chunk) = b.next().await {
data.push_str(&String::from_utf8_lossy(&chunk?));
}
anyhow::Ok(data)
})
.unwrap();

msg.invoker
.invoke(status.as_u16() as i32, header, &body_str);
return true;
}
}
false
});
}

loop {
Expand All @@ -246,3 +322,22 @@ fn start_zlm_background(

Ok(())
}

async fn axum_start(port: u16, cancel: CancellationToken) -> anyhow::Result<()> {
let app = Router::new()
.route("/", get(|| async { "Hello, axum world!" }))
.route("/test", get(|| async { "Hello, axum test!" }));

let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)).await?;
println!("listening on {}", listener.local_addr()?);
axum::serve(listener, app)
.with_graceful_shutdown(async move {
tokio::select! {
_ = cancel.cancelled() => {
println!("cancel");
},
}
})
.await
.map_err(|e| e.into())
}
5 changes: 4 additions & 1 deletion rszlm-sys/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
[package]
name = "rszlm-sys"
version = "0.1.1"
version = "0.1.2"
edition = "2021"
authors = ["shiben. <[email protected]>"]
description = "ZLMediaKit rust ffi api"
repository = "https://github.com/BenLocal/rszlm"
keywords = ["zlm", "zlmediakit", "rszlm", "rszlm-sys"]
license = "MIT"

[lib]
doctest = false

[dependencies]
openssl-sys = { version = "0.9", optional = true }

Expand Down
9 changes: 4 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ macro_rules! box_to_mut_void_ptr {
pub(crate) fn as_cstr_array<T: Into<Vec<u8>> + Clone>(
arr: &[T],
) -> *mut *const ::std::os::raw::c_char {
let c_strings = arr
let mut tmp: Vec<_> = arr
.iter()
.map(|s| std::ffi::CString::new::<T>(s.to_owned()).unwrap())
.map(|f| f.as_ptr())
.collect::<Vec<_>>();
let c_char_pointers = c_strings.iter().map(|s| s.as_ptr()).collect::<Vec<_>>();
let mut c_char_pointers_with_null = c_char_pointers.clone();
c_char_pointers_with_null.push(std::ptr::null());
c_char_pointers_with_null.as_mut_ptr()
tmp.push(std::ptr::null_mut());
tmp.as_mut_ptr()
}

0 comments on commit 048769d

Please sign in to comment.