From e0c3c5f53f5ef0c5f8996579799438fb0f54987a Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 1 Aug 2024 22:00:58 +0800 Subject: [PATCH] feat: implement otel log appender (#12) Signed-off-by: tison --- Cargo.toml | 16 ++++++- src/append/mod.rs | 23 +++++++-- src/append/opentelemetry.rs | 95 +++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 5 deletions(-) create mode 100644 src/append/opentelemetry.rs diff --git a/Cargo.toml b/Cargo.toml index f81c48f..d9f0f50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,13 +23,18 @@ license = "Apache-2.0" readme = "README.md" repository = "https://github.com/tisonkun/logforth" rust-version = "1.71.0" -version = "0.2.0" +version = "0.3.0" [features] fastrace = ["dep:fastrace"] file = ["dep:crossbeam-channel", "dep:parking_lot", "dep:time"] json = ["dep:serde_json", "dep:serde"] no-color = ["colored/no-color"] +opentelemetry = [ + "dep:opentelemetry", + "dep:opentelemetry-otlp", + "dep:opentelemetry_sdk", +] [dependencies] anyhow = { version = "1.0" } @@ -38,6 +43,15 @@ crossbeam-channel = { version = "0.5", optional = true } fastrace = { version = "0.6", optional = true } humantime = { version = "2.1" } log = { version = "0.4", features = ["std", "kv_unstable"] } +opentelemetry = { version = "0.24", features = ["logs"], optional = true } +opentelemetry-otlp = { version = "0.17", features = [ + "logs", + "grpc-tonic", +], optional = true } +opentelemetry_sdk = { version = "0.24", features = [ + "logs", + "rt-tokio", +], optional = true } parking_lot = { version = "0.12", optional = true } paste = { version = "1.0" } serde = { version = "1.0", features = ["derive"], optional = true } diff --git a/src/append/mod.rs b/src/append/mod.rs index 1713e78..92cbe51 100644 --- a/src/append/mod.rs +++ b/src/append/mod.rs @@ -12,22 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::dynlog::DynLog; -use crate::filter::FilterImpl; -use crate::layout; -use crate::layout::Layout; pub use boxdyn::*; #[cfg(feature = "fastrace")] pub use fastrace::*; #[cfg(feature = "file")] pub use file::*; +#[cfg(feature = "opentelemetry")] +pub use opentelemetry::*; pub use stdio::*; +use crate::dynlog::DynLog; +use crate::filter::FilterImpl; +use crate::layout; +use crate::layout::Layout; + mod boxdyn; #[cfg(feature = "fastrace")] mod fastrace; #[cfg(feature = "file")] mod file; +#[cfg(feature = "opentelemetry")] +mod opentelemetry; mod stdio; pub trait Append { @@ -56,6 +61,8 @@ pub enum AppendImpl { DynLog(DynLog), #[cfg(feature = "fastrace")] Fastrace(Fastrace), + #[cfg(feature = "opentelemetry")] + OpenTelemetryLog(OpenTelemetryLog), #[cfg(feature = "file")] RollingFile(RollingFile), Stdout(Stdout), @@ -69,6 +76,8 @@ impl Append for AppendImpl { AppendImpl::DynLog(append) => append.try_append(record), #[cfg(feature = "fastrace")] AppendImpl::Fastrace(append) => append.try_append(record), + #[cfg(feature = "opentelemetry")] + AppendImpl::OpenTelemetryLog(append) => append.try_append(record), #[cfg(feature = "file")] AppendImpl::RollingFile(append) => append.try_append(record), AppendImpl::Stdout(append) => append.try_append(record), @@ -82,6 +91,8 @@ impl Append for AppendImpl { AppendImpl::DynLog(append) => append.flush(), #[cfg(feature = "fastrace")] AppendImpl::Fastrace(append) => append.flush(), + #[cfg(feature = "opentelemetry")] + AppendImpl::OpenTelemetryLog(append) => append.flush(), #[cfg(feature = "file")] AppendImpl::RollingFile(append) => append.flush(), AppendImpl::Stdout(append) => append.flush(), @@ -95,6 +106,8 @@ impl Append for AppendImpl { AppendImpl::DynLog(append) => append.default_layout(), #[cfg(feature = "fastrace")] AppendImpl::Fastrace(append) => append.default_layout(), + #[cfg(feature = "opentelemetry")] + AppendImpl::OpenTelemetryLog(append) => append.default_layout(), #[cfg(feature = "file")] AppendImpl::RollingFile(append) => append.default_layout(), AppendImpl::Stdout(append) => append.default_layout(), @@ -108,6 +121,8 @@ impl Append for AppendImpl { AppendImpl::DynLog(append) => append.default_filters(), #[cfg(feature = "fastrace")] AppendImpl::Fastrace(append) => append.default_filters(), + #[cfg(feature = "opentelemetry")] + AppendImpl::OpenTelemetryLog(append) => append.default_filters(), #[cfg(feature = "file")] AppendImpl::RollingFile(append) => append.default_filters(), AppendImpl::Stdout(append) => append.default_filters(), diff --git a/src/append/opentelemetry.rs b/src/append/opentelemetry.rs new file mode 100644 index 0000000..f0934a2 --- /dev/null +++ b/src/append/opentelemetry.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; + +use log::Record; +use opentelemetry::logs::{AnyValue, LoggerProvider as ILoggerProvider}; +use opentelemetry::logs::{Logger, Severity}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::logs::LoggerProvider; + +use crate::append::Append; + +#[derive(Debug)] +pub struct OpenTelemetryLog { + name: String, + category: String, + library: Arc, + provider: LoggerProvider, +} + +impl OpenTelemetryLog { + pub fn new( + name: impl Into, + category: impl Into, + otlp_endpoint: impl Into, + ) -> Self { + let name = name.into(); + let category = category.into(); + let otlp_endpoint = otlp_endpoint.into(); + + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(otlp_endpoint) + .with_protocol(opentelemetry_otlp::Protocol::Grpc) + .with_timeout(Duration::from_secs( + opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, + )) + .build_log_exporter() + .expect("failed to initialize oltp exporter"); + + let provider = LoggerProvider::builder() + .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) + .build(); + + let library = Arc::new(InstrumentationLibrary::builder(name.clone()).build()); + + Self { + name, + category, + library, + provider, + } + } +} + +impl Append for OpenTelemetryLog { + fn try_append(&self, log_record: &Record) -> anyhow::Result<()> { + let provider = self.provider.clone(); + let logger = provider.library_logger(self.library.clone()); + + let mut record = opentelemetry_sdk::logs::LogRecord::default(); + record.observed_timestamp = Some(SystemTime::now()); + record.severity_number = Some(log_level_to_otel_severity(log_record.level())); + record.severity_text = Some(log_record.level().as_str().into()); + record.body = Some(AnyValue::from(log_record.args().to_string())); + + logger.emit(record); + Ok(()) + } + + fn flush(&self) { + for err in self + .provider + .force_flush() + .into_iter() + .filter_map(|r| r.err()) + { + eprintln!( + "failed to flush logger ({}@{}): {}", + self.name, self.category, err + ); + } + } +} + +fn log_level_to_otel_severity(level: log::Level) -> Severity { + match level { + log::Level::Error => Severity::Error, + log::Level::Warn => Severity::Warn, + log::Level::Info => Severity::Info, + log::Level::Debug => Severity::Debug, + log::Level::Trace => Severity::Trace, + } +}