Skip to content

Commit

Permalink
Folds uds functionality into existing http listener as a new transpor…
Browse files Browse the repository at this point in the history
…t type
  • Loading branch information
scottopell committed Jul 9, 2024
1 parent c4c4848 commit 2bf0936
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 169 deletions.
47 changes: 24 additions & 23 deletions metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub struct PrometheusBuilder {
exporter_config: ExporterConfig,
#[cfg(feature = "http-listener")]
allowed_addresses: Option<Vec<IpNet>>,
#[cfg(feature = "uds-listener")]
listen_path: std::path::PathBuf,
quantiles: Vec<Quantile>,
bucket_duration: Option<Duration>,
bucket_count: Option<NonZeroU32>,
Expand All @@ -58,22 +56,20 @@ impl PrometheusBuilder {

#[cfg(feature = "http-listener")]
let exporter_config = ExporterConfig::HttpListener {
listen_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9000),
destination: super::ListenDestination::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
9000,
)),
};
#[cfg(not(feature = "http-listener"))]
let exporter_config = ExporterConfig::Unconfigured;

#[cfg(feature = "uds-listener")]
let listen_path = std::path::PathBuf::from("/tmp/metrics.sock");

let upkeep_timeout = Duration::from_secs(5);

Self {
exporter_config,
#[cfg(feature = "http-listener")]
allowed_addresses: None,
#[cfg(feature = "uds-listener")]
listen_path,
quantiles,
bucket_duration: None,
bucket_count: None,
Expand All @@ -100,7 +96,9 @@ impl PrometheusBuilder {
#[cfg_attr(docsrs, doc(cfg(feature = "http-listener")))]
#[must_use]
pub fn with_http_listener(mut self, addr: impl Into<SocketAddr>) -> Self {
self.exporter_config = ExporterConfig::HttpListener { listen_address: addr.into() };
self.exporter_config = ExporterConfig::HttpListener {
destination: super::ListenDestination::Tcp(addr.into()),
};
self
}

Expand Down Expand Up @@ -147,14 +145,16 @@ impl PrometheusBuilder {
/// Running in HTTP listener mode is mutually exclusive with the push gateway i.e. enabling the
/// HTTP listener will disable the push gateway, and vise versa.
///
/// Defaults to disabled, if enabled, default listening at `/tmp/metrics.sock`
/// Defaults to disabled, if enabled, listens on the specified path
///
/// [scrape endpoint]: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format
#[cfg(feature = "uds-listener")]
#[cfg_attr(docsrs, doc(cfg(feature = "uds-listener")))]
#[must_use]
pub fn with_http_uds_listener(mut self, addr: impl Into<std::path::PathBuf>) -> Self {
self.exporter_config = ExporterConfig::UdsListener { listen_path: addr.into() };
self.exporter_config = ExporterConfig::HttpListener {
destination: super::ListenDestination::Uds(addr.into()),
};
self
}

Expand Down Expand Up @@ -468,25 +468,26 @@ impl PrometheusBuilder {
ExporterConfig::Unconfigured => Err(BuildError::MissingExporterConfiguration)?,

#[cfg(feature = "http-listener")]
ExporterConfig::HttpListener { listen_address } => {
super::http_listener::new_http_listener(
handle,
listen_address,
allowed_addresses,
)?
}
ExporterConfig::HttpListener { destination } => match destination {
super::ListenDestination::Tcp(listen_address) => {
super::http_listener::new_http_listener(
handle,
listen_address,
allowed_addresses,
)?
}
#[cfg(feature = "uds-listener")]
super::ListenDestination::Uds(listen_path) => {
super::http_listener::new_http_uds_listener(handle, listen_path)?
}
},

#[cfg(feature = "push-gateway")]
ExporterConfig::PushGateway { endpoint, interval, username, password } => {
super::push_gateway::new_push_gateway(
endpoint, interval, username, password, handle,
)
}

#[cfg(feature = "uds-listener")]
ExporterConfig::UdsListener { listen_path } => {
super::uds_listener::new_http_uds_listener(handle, listen_path)?
}
},
))
}
Expand Down
136 changes: 96 additions & 40 deletions metrics-exporter-prometheus/src/exporter/http_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,47 @@ use hyper::{
};
use hyper_util::rt::TokioIo;
use ipnet::IpNet;
#[cfg(feature = "uds-listener")]
use std::path::PathBuf;
use tokio::net::{TcpListener, TcpStream};
#[cfg(feature = "uds-listener")]
use tokio::net::{UnixListener, UnixStream};
use tracing::warn;

use crate::{common::BuildError, ExporterFuture, PrometheusHandle};

struct HttpListeningExporter {
pub struct HttpListeningExporter {
handle: PrometheusHandle,
allowed_addresses: Option<Vec<IpNet>>,
listener_type: ListenerType,
}

enum ListenerType {
Tcp(TcpListener),
#[cfg(feature = "uds-listener")]
Uds(UnixListener),
}

/// Error type for HTTP listening.
pub enum HttpListeningError {
Hyper(hyper::Error),
Io(std::io::Error),
}

impl HttpListeningExporter {
async fn serve(&self, listener: tokio::net::TcpListener) -> Result<(), hyper::Error> {
pub async fn serve(&self) -> Result<(), HttpListeningError> {
match &self.listener_type {
ListenerType::Tcp(listener) => {
self.serve_tcp(listener).await.map_err(HttpListeningError::Hyper)
}
#[cfg(feature = "uds-listener")]
ListenerType::Uds(listener) => {
self.serve_uds(listener).await.map_err(HttpListeningError::Io)
}
}
}

async fn serve_tcp(&self, listener: &TcpListener) -> Result<(), hyper::Error> {
loop {
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Expand All @@ -29,31 +58,52 @@ impl HttpListeningExporter {
continue;
}
};

let is_allowed = self.allowed_addresses.as_ref().map_or(true, |addrs| {
stream.peer_addr().map_or_else(
|e| {
warn!(error = ?e, "Error obtaining remote address.");
false
},
|peer_addr| {
let remote_ip = peer_addr.ip();
addrs.iter().any(|addr| addr.contains(&remote_ip))
},
)
});

self.process_stream(stream, is_allowed).await;
self.process_tcp_stream(stream).await;
}
}

async fn process_stream(&self, stream: TcpStream, is_allowed: bool) {
async fn process_tcp_stream(&self, stream: TcpStream) {
let is_allowed = self.check_tcp_allowed(&stream);
let handle = self.handle.clone();
let service = service_fn(move |req: Request<body::Incoming>| {
let handle = handle.clone();
async move { Ok::<_, hyper::Error>(Self::handle_http_request(is_allowed, &handle, &req)) }
});

tokio::spawn(async move {
if let Err(err) =
HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await
{
warn!(error = ?err, "Error serving connection.");
}
});
}

fn check_tcp_allowed(&self, stream: &TcpStream) -> bool {
if let Some(addrs) = &self.allowed_addresses {
if let Ok(peer_addr) = stream.peer_addr() {
return addrs.iter().any(|addr| addr.contains(&peer_addr.ip()));
}
}
true
}

#[cfg(feature = "uds-listener")]
async fn serve_uds(&self, listener: &UnixListener) -> Result<(), std::io::Error> {
loop {
let (stream, _) = listener.accept().await?;
self.process_uds_stream(stream).await;
}
}

#[cfg(feature = "uds-listener")]
async fn process_uds_stream(&self, stream: UnixStream) {
let handle = self.handle.clone();
let service = service_fn(move |req: Request<body::Incoming>| {
let handle = handle.clone();
async move { Ok::<_, hyper::Error>(Self::handle_http_request(true, &handle, &req)) }
});

tokio::spawn(async move {
if let Err(err) =
HyperHttpBuilder::new().serve_connection(TokioIo::new(stream), service).await
Expand All @@ -74,23 +124,15 @@ impl HttpListeningExporter {
_ => handle.render().into(),
})
} else {
Self::new_forbidden_response()
Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Full::<Bytes>::default())
.unwrap()
}
}

fn new_forbidden_response() -> Response<Full<Bytes>> {
// This unwrap should not fail because we don't use any function that
// can assign an Err to it's inner such as `Builder::header``. A unit test
// will have to suffice to detect if this fails to hold true.
Response::builder().status(StatusCode::FORBIDDEN).body(Full::<Bytes>::default()).unwrap()
}
}

/// Creates an `ExporterFuture` implementing a http listener that servies prometheus metrics.
///
/// # Errors
/// Will return Err if it cannot bind to the listen address
pub(crate) fn new_http_listener(
pub fn new_http_listener(
handle: PrometheusHandle,
listen_address: SocketAddr,
allowed_addresses: Option<Vec<IpNet>>,
Expand All @@ -103,17 +145,31 @@ pub(crate) fn new_http_listener(
.map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?;
let listener = TcpListener::from_std(listener).unwrap();

let exporter = HttpListeningExporter { handle, allowed_addresses };
let exporter = HttpListeningExporter {
handle,
allowed_addresses,
listener_type: ListenerType::Tcp(listener),
};

Ok(Box::pin(async move { exporter.serve(listener).await }))
Ok(Box::pin(async move { exporter.serve().await }))
}

#[cfg(test)]
mod tests {
use crate::exporter::http_listener::HttpListeningExporter;

#[test]
fn new_forbidden_response_always_succeeds() {
HttpListeningExporter::new_forbidden_response(); // doesn't panic
#[cfg(feature = "uds-listener")]
pub fn new_http_uds_listener(
handle: PrometheusHandle,
listen_path: PathBuf,
) -> Result<ExporterFuture, BuildError> {
if listen_path.exists() {
std::fs::remove_file(&listen_path)
.map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?;
}
let listener = UnixListener::bind(listen_path)
.map_err(|e| BuildError::FailedToCreateHTTPListener(e.to_string()))?;
let exporter = HttpListeningExporter {
handle,
allowed_addresses: None,
listener_type: ListenerType::Uds(listener),
};

Ok(Box::pin(async move { exporter.serve().await }))
}
23 changes: 13 additions & 10 deletions metrics-exporter-prometheus/src/exporter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "http-listener")]
use http_listener::HttpListeningError;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
use std::future::Future;
#[cfg(feature = "http-listener")]
Expand All @@ -12,16 +14,22 @@ use hyper::Uri;

/// Convenience type for Future implementing an exporter.
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
pub type ExporterFuture = Pin<Box<dyn Future<Output = Result<(), hyper::Error>> + Send + 'static>>;
pub type ExporterFuture =
Pin<Box<dyn Future<Output = Result<(), HttpListeningError>> + Send + 'static>>;

#[cfg(feature = "http-listener")]
#[derive(Clone)]
enum ListenDestination {
Tcp(SocketAddr),
#[cfg(feature = "uds-listener")]
Uds(std::path::PathBuf),
}

#[derive(Clone)]
enum ExporterConfig {
// Run an HTTP listener on the given `listen_address`.
#[cfg(feature = "http-listener")]
HttpListener { listen_address: SocketAddr },

#[cfg(feature = "uds-listener")]
UdsListener { listen_path: std::path::PathBuf },
HttpListener { destination: ListenDestination },

// Run a push gateway task sending to the given `endpoint` after `interval` time has elapsed,
// infinitely.
Expand All @@ -46,8 +54,6 @@ impl ExporterConfig {
#[cfg(feature = "push-gateway")]
Self::PushGateway { .. } => "push-gateway",
Self::Unconfigured => "unconfigured,",
#[cfg(feature = "uds-listener")]
Self::UdsListener { .. } => "uds-listener",
}
}
}
Expand All @@ -58,7 +64,4 @@ mod http_listener;
#[cfg(feature = "push-gateway")]
mod push_gateway;

#[cfg(feature = "uds-listener")]
mod uds_listener;

pub(crate) mod builder;
Loading

0 comments on commit 2bf0936

Please sign in to comment.