diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index aef5e7bdf2..69d6cdabdd 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -19,7 +19,7 @@ use opentelemetry::logs::LogResult; use opentelemetry::{InstrumentationLibrary, KeyValue}; use opentelemetry_appender_tracing::layer as tracing_layer; use opentelemetry_sdk::export::logs::LogExporter; -use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, LoggerProvider}; +use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; use tracing::error; @@ -55,7 +55,7 @@ impl NoopProcessor { } impl LogProcessor for NoopProcessor { - fn emit(&self, _: &mut LogData) { + fn emit(&self, _: &mut LogRecord, _: &InstrumentationLibrary) { // no-op } diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 840560a1f4..a81eaabebc 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -25,9 +25,9 @@ use opentelemetry::logs::{ }; use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; -use opentelemetry::Key; -use opentelemetry_sdk::logs::LogData; +use opentelemetry::{InstrumentationLibrary, Key}; use opentelemetry_sdk::logs::LogProcessor; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::trace; use opentelemetry_sdk::trace::{Sampler, TracerProvider}; @@ -36,7 +36,7 @@ use opentelemetry_sdk::trace::{Sampler, TracerProvider}; struct NoopProcessor; impl LogProcessor for NoopProcessor { - fn emit(&self, _data: &mut LogData) {} + fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {} fn force_flush(&self) -> LogResult<()> { Ok(()) diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 295f554940..40e8dda846 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -18,8 +18,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; -use opentelemetry_sdk::logs::LogData; +use opentelemetry::InstrumentationLibrary; use opentelemetry_sdk::logs::LogProcessor; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; use pprof::criterion::{Output, PProfProfiler}; use std::fmt::Debug; @@ -28,11 +29,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: Vec); + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export(&mut self, batch: Vec); + fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); } #[derive(Debug)] @@ -40,13 +41,13 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export(&mut self, _batch: Vec) {} + async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} } #[derive(Debug)] struct NoOpExporterWithoutFuture {} impl LogExporterWithoutFuture for NoOpExporterWithoutFuture { - fn export(&mut self, _batch: Vec) {} + fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} } #[derive(Debug)] @@ -63,9 +64,9 @@ impl ExportingProcessorWithFuture { } impl LogProcessor for ExportingProcessorWithFuture { - fn emit(&self, data: &mut LogData) { + fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(vec![data.clone()])); + futures_executor::block_on(exporter.export(vec![(record, library)])); } fn force_flush(&self) -> LogResult<()> { @@ -91,11 +92,11 @@ impl ExportingProcessorWithoutFuture { } impl LogProcessor for ExportingProcessorWithoutFuture { - fn emit(&self, data: &mut LogData) { + fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { self.exporter .lock() .expect("lock error") - .export(vec![data.clone()]); + .export(vec![(record, library)]); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index 7bf279c219..1bc0a130b4 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -19,10 +19,8 @@ use std::{ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; -use opentelemetry_sdk::{ - logs::LogData, - logs::{LogProcessor, LogRecord, Logger, LoggerProvider}, -}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::{LogProcessor, LogRecord, Logger, LoggerProvider}; // Run this benchmark with: // cargo bench --bench log_processor @@ -45,7 +43,7 @@ fn create_log_record(logger: &Logger) -> LogRecord { struct NoopProcessor; impl LogProcessor for NoopProcessor { - fn emit(&self, _data: &mut LogData) {} + fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {} fn force_flush(&self) -> LogResult<()> { Ok(()) @@ -60,7 +58,7 @@ impl LogProcessor for NoopProcessor { struct CloningProcessor; impl LogProcessor for CloningProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogRecord, _library: &InstrumentationLibrary) { let _data_cloned = data.clone(); } @@ -75,8 +73,8 @@ impl LogProcessor for CloningProcessor { #[derive(Debug)] struct SendToChannelProcessor { - sender: std::sync::mpsc::Sender, - receiver: Arc>>, + sender: std::sync::mpsc::Sender<(LogRecord, InstrumentationLibrary)>, + receiver: Arc>>, } impl SendToChannelProcessor { @@ -103,9 +101,8 @@ impl SendToChannelProcessor { } impl LogProcessor for SendToChannelProcessor { - fn emit(&self, data: &mut LogData) { - let data_cloned = data.clone(); - let res = self.sender.send(data_cloned); + fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) { + let res = self.sender.send((record.clone(), library.clone())); if res.is_err() { println!("Error sending log data to channel {0}", res.err().unwrap()); } diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 3af118cc98..6f716908db 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,5 +1,5 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{export::logs::LogExporter, logs::LogData, runtime::RuntimeChannel, Resource}; +use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; use opentelemetry::{ global, logs::{LogError, LogResult}, @@ -254,28 +254,24 @@ impl opentelemetry::logs::Logger for Logger { } /// Emit a `LogRecord`. - fn emit(&self, record: Self::LogRecord) { + fn emit(&self, mut record: Self::LogRecord) { let provider = self.provider(); let processors = provider.log_processors(); let trace_context = Context::map_current(|cx| { cx.has_active_span() .then(|| TraceContext::from(cx.span().span_context())) }); - let mut log_record = record; + + //let mut log_record = record; if let Some(ref trace_context) = trace_context { - log_record.trace_context = Some(trace_context.clone()); + record.trace_context = Some(trace_context.clone()); } - if log_record.observed_timestamp.is_none() { - log_record.observed_timestamp = Some(SystemTime::now()); + if record.observed_timestamp.is_none() { + record.observed_timestamp = Some(SystemTime::now()); } - let mut data = LogData { - record: log_record, - instrumentation: self.instrumentation_library().clone(), - }; - for p in processors { - p.emit(&mut data); + p.emit(&mut record, self.instrumentation_library()); } } @@ -332,7 +328,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -562,7 +558,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 16e15c5f1c..8a5aca4800 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,6 +1,6 @@ use crate::{ export::logs::{ExportResult, LogExporter}, - logs::LogData, + logs::LogRecord, runtime::{RuntimeChannel, TrySend}, Resource, }; @@ -14,8 +14,8 @@ use opentelemetry::logs::Severity; use opentelemetry::{ global, logs::{LogError, LogResult}, + InstrumentationLibrary, }; -use std::borrow::Cow; use std::sync::atomic::AtomicBool; use std::{cmp::min, env, sync::Mutex}; use std::{ @@ -55,8 +55,9 @@ pub trait LogProcessor: Send + Sync + Debug { /// processor in the chain. /// /// # Parameters - /// - `data`: A mutable reference to `LogData` representing the log record. - fn emit(&self, data: &mut LogData); + /// - `record`: A mutable reference to `LogData` representing the log record. + /// - `instrumentation`: The instrumentation library associated with the log record. + fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationLibrary); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -94,7 +95,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -105,9 +106,7 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - futures_executor::block_on( - exporter.export(vec![(&data.record, &data.instrumentation)]), - ) + futures_executor::block_on(exporter.export(vec![(record, instrumentation)])) }); if let Err(err) = result { global::handle_error(err); @@ -153,10 +152,11 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: &mut LogData) { - let result = self - .message_sender - .try_send(BatchMessage::ExportLog(data.clone())); + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + let result = self.message_sender.try_send(BatchMessage::ExportLog(( + record.clone(), + instrumentation.clone(), + ))); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); @@ -215,7 +215,7 @@ impl BatchLogProcessor { match message { // Log has finished, add to buffer of pending logs. BatchMessage::ExportLog(log) => { - logs.push(Cow::Owned(log)); + logs.push(log); if logs.len() == config.max_export_batch_size { let result = export_with_timeout( @@ -299,11 +299,11 @@ impl BatchLogProcessor { } } -async fn export_with_timeout<'a, R, E>( +async fn export_with_timeout( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec>, + batch: Vec<(LogRecord, InstrumentationLibrary)>, ) -> ExportResult where R: RuntimeChannel, @@ -313,9 +313,10 @@ where return Ok(()); } // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> + // TBD - Can we avoid this conversion as it involves heap allocation with new vector? let export_batch = batch .iter() - .map(|log_data| (&log_data.record, &log_data.instrumentation)) + .map(|log_data| (&log_data.0, &log_data.1)) .collect(); let export = exporter.export(export_batch); @@ -498,7 +499,7 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogData), + ExportLog((LogRecord, InstrumentationLibrary)), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -523,8 +524,7 @@ mod tests { OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }, - BatchConfig, BatchConfigBuilder, LogData, LogProcessor, LoggerProvider, - SimpleLogProcessor, + BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor, }, runtime, testing::logs::InMemoryLogsExporter, @@ -778,15 +778,15 @@ mod tests { BatchConfig::default(), runtime::Tokio, ); - let mut log_data = LogData { - record: Default::default(), - instrumentation: Default::default(), - }; - processor.emit(&mut log_data); + + let mut record: LogRecord = Default::default(); + let instrumentation: InstrumentationLibrary = Default::default(); + + processor.emit(&mut record, &instrumentation); processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? - processor.emit(&mut log_data); + processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } @@ -797,12 +797,10 @@ mod tests { .build(); let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); - let mut log_data = LogData { - record: Default::default(), - instrumentation: Default::default(), - }; + let mut record: LogRecord = Default::default(); + let instrumentation: InstrumentationLibrary = Default::default(); - processor.emit(&mut log_data); + processor.emit(&mut record, &instrumentation); processor.shutdown().unwrap(); @@ -811,27 +809,30 @@ mod tests { .load(std::sync::atomic::Ordering::Relaxed); assert!(is_shutdown); - processor.emit(&mut log_data); + processor.emit(&mut record, &instrumentation); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } #[derive(Debug)] struct FirstProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>, } impl LogProcessor for FirstProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { // add attribute - data.record.attributes.get_or_insert(vec![]).push(( + record.attributes.get_or_insert(vec![]).push(( Key::from_static_str("processed_by"), AnyValue::String("FirstProcessor".into()), )); // update body - data.record.body = Some("Updated by FirstProcessor".into()); + record.body = Some("Updated by FirstProcessor".into()); - self.logs.lock().unwrap().push(data.clone()); //clone as the LogProcessor is storing the data. + self.logs + .lock() + .unwrap() + .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. } fn force_flush(&self) -> LogResult<()> { @@ -845,22 +846,25 @@ mod tests { #[derive(Debug)] struct SecondProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>, } impl LogProcessor for SecondProcessor { - fn emit(&self, data: &mut LogData) { - assert!(data.record.attributes.as_ref().map_or(false, |attrs| { + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + assert!(record.attributes.as_ref().map_or(false, |attrs| { attrs.iter().any(|(key, value)| { key.as_str() == "processed_by" && value == &AnyValue::String("FirstProcessor".into()) }) })); assert!( - data.record.body.clone().unwrap() + record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); - self.logs.lock().unwrap().push(data.clone()); + self.logs + .lock() + .unwrap() + .push((record.clone(), instrumentation.clone())); } fn force_flush(&self) -> LogResult<()> { @@ -900,25 +904,25 @@ mod tests { let first_log = &first_processor_logs.lock().unwrap()[0]; let second_log = &second_processor_logs.lock().unwrap()[0]; - assert!(first_log.record.attributes.iter().any(|attrs| { + assert!(first_log.0.attributes.iter().any(|attrs| { attrs.iter().any(|(key, value)| { key.as_str() == "processed_by" && value == &AnyValue::String("FirstProcessor".into()) }) })); - assert!(second_log.record.attributes.iter().any(|attrs| { + assert!(second_log.0.attributes.iter().any(|attrs| { attrs.iter().any(|(key, value)| { key.as_str() == "processed_by" && value == &AnyValue::String("FirstProcessor".into()) }) })); assert!( - first_log.record.body.clone().unwrap() + first_log.0.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); assert!( - second_log.record.body.clone().unwrap() + second_log.0.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 1798401e32..ea813b7138 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -9,6 +9,7 @@ ~38 M /sec */ +use opentelemetry::InstrumentationLibrary; use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider}; use tracing::error; @@ -20,7 +21,12 @@ mod throughput; pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: &mut opentelemetry_sdk::logs::LogData) {} + fn emit( + &self, + _record: &mut opentelemetry_sdk::logs::LogRecord, + _library: &InstrumentationLibrary, + ) { + } fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(())