Skip to content

Commit

Permalink
Log SDK optimization - Improve perf 15%-30% (#2043)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Aug 22, 2024
1 parent 90ca577 commit 673c328
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 85 deletions.
4 changes: 2 additions & 2 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opentelemetry::logs::LogResult;
use opentelemetry::{InstrumentationLibrary, KeyValue};
use opentelemetry_appender_tracing::layer as tracing_layer;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::Resource;
use pprof::criterion::{Output, PProfProfiler};
use tracing::error;
Expand Down Expand Up @@ -55,7 +55,7 @@ impl NoopProcessor {
}

impl LogProcessor for NoopProcessor {
fn emit(&self, _: &mut LogData) {
fn emit(&self, _: &mut LogRecord, _: &InstrumentationLibrary) {
// no-op
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use opentelemetry::logs::{
};
use opentelemetry::trace::Tracer;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::Key;
use opentelemetry_sdk::logs::LogData;
use opentelemetry::{InstrumentationLibrary, Key};
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::trace;
use opentelemetry_sdk::trace::{Sampler, TracerProvider};
Expand All @@ -36,7 +36,7 @@ use opentelemetry_sdk::trace::{Sampler, TracerProvider};
struct NoopProcessor;

impl LogProcessor for NoopProcessor {
fn emit(&self, _data: &mut LogData) {}
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {}

fn force_flush(&self) -> LogResult<()> {
Ok(())
Expand Down
19 changes: 10 additions & 9 deletions opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use criterion::{criterion_group, criterion_main, Criterion};

use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};

use opentelemetry_sdk::logs::LogData;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::logs::LoggerProvider;
use pprof::criterion::{Output, PProfProfiler};
use std::fmt::Debug;
Expand All @@ -28,25 +29,25 @@ use std::fmt::Debug;
// cargo bench --bench log_exporter
#[async_trait]
pub trait LogExporterWithFuture: Send + Sync + Debug {
async fn export(&mut self, batch: Vec<LogData>);
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>);
}

pub trait LogExporterWithoutFuture: Send + Sync + Debug {
fn export(&mut self, batch: Vec<LogData>);
fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>);
}

#[derive(Debug)]
struct NoOpExporterWithFuture {}

#[async_trait]
impl LogExporterWithFuture for NoOpExporterWithFuture {
async fn export(&mut self, _batch: Vec<LogData>) {}
async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {}
}

#[derive(Debug)]
struct NoOpExporterWithoutFuture {}
impl LogExporterWithoutFuture for NoOpExporterWithoutFuture {
fn export(&mut self, _batch: Vec<LogData>) {}
fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {}
}

#[derive(Debug)]
Expand All @@ -63,9 +64,9 @@ impl ExportingProcessorWithFuture {
}

impl LogProcessor for ExportingProcessorWithFuture {
fn emit(&self, data: &mut LogData) {
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
let mut exporter = self.exporter.lock().expect("lock error");
futures_executor::block_on(exporter.export(vec![data.clone()]));
futures_executor::block_on(exporter.export(vec![(record, library)]));
}

fn force_flush(&self) -> LogResult<()> {
Expand All @@ -91,11 +92,11 @@ impl ExportingProcessorWithoutFuture {
}

impl LogProcessor for ExportingProcessorWithoutFuture {
fn emit(&self, data: &mut LogData) {
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
self.exporter
.lock()
.expect("lock error")
.export(vec![data.clone()]);
.export(vec![(record, library)]);
}

fn force_flush(&self) -> LogResult<()> {
Expand Down
19 changes: 8 additions & 11 deletions opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ use std::{

use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};
use opentelemetry_sdk::{
logs::LogData,
logs::{LogProcessor, LogRecord, Logger, LoggerProvider},
};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, Logger, LoggerProvider};

// Run this benchmark with:
// cargo bench --bench log_processor
Expand All @@ -45,7 +43,7 @@ fn create_log_record(logger: &Logger) -> LogRecord {
struct NoopProcessor;

impl LogProcessor for NoopProcessor {
fn emit(&self, _data: &mut LogData) {}
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {}

fn force_flush(&self) -> LogResult<()> {
Ok(())
Expand All @@ -60,7 +58,7 @@ impl LogProcessor for NoopProcessor {
struct CloningProcessor;

impl LogProcessor for CloningProcessor {
fn emit(&self, data: &mut LogData) {
fn emit(&self, data: &mut LogRecord, _library: &InstrumentationLibrary) {
let _data_cloned = data.clone();
}

Expand All @@ -75,8 +73,8 @@ impl LogProcessor for CloningProcessor {

#[derive(Debug)]
struct SendToChannelProcessor {
sender: std::sync::mpsc::Sender<LogData>,
receiver: Arc<Mutex<std::sync::mpsc::Receiver<LogData>>>,
sender: std::sync::mpsc::Sender<(LogRecord, InstrumentationLibrary)>,
receiver: Arc<Mutex<std::sync::mpsc::Receiver<(LogRecord, InstrumentationLibrary)>>>,
}

impl SendToChannelProcessor {
Expand All @@ -103,9 +101,8 @@ impl SendToChannelProcessor {
}

impl LogProcessor for SendToChannelProcessor {
fn emit(&self, data: &mut LogData) {
let data_cloned = data.clone();
let res = self.sender.send(data_cloned);
fn emit(&self, record: &mut LogRecord, library: &InstrumentationLibrary) {
let res = self.sender.send((record.clone(), library.clone()));
if res.is_err() {
println!("Error sending log data to channel {0}", res.err().unwrap());
}
Expand Down
24 changes: 10 additions & 14 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, logs::LogData, runtime::RuntimeChannel, Resource};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::{
global,
logs::{LogError, LogResult},
Expand Down Expand Up @@ -254,28 +254,24 @@ impl opentelemetry::logs::Logger for Logger {
}

/// Emit a `LogRecord`.
fn emit(&self, record: Self::LogRecord) {
fn emit(&self, mut record: Self::LogRecord) {
let provider = self.provider();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
cx.has_active_span()
.then(|| TraceContext::from(cx.span().span_context()))
});
let mut log_record = record;

//let mut log_record = record;
if let Some(ref trace_context) = trace_context {
log_record.trace_context = Some(trace_context.clone());
record.trace_context = Some(trace_context.clone());
}
if log_record.observed_timestamp.is_none() {
log_record.observed_timestamp = Some(SystemTime::now());
if record.observed_timestamp.is_none() {
record.observed_timestamp = Some(SystemTime::now());
}

let mut data = LogData {
record: log_record,
instrumentation: self.instrumentation_library().clone(),
};

for p in processors {
p.emit(&mut data);
p.emit(&mut record, self.instrumentation_library());
}
}

Expand Down Expand Up @@ -332,7 +328,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: &mut LogData) {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -562,7 +558,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: &mut LogData) {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
// nothing to do.
}

Expand Down
Loading

0 comments on commit 673c328

Please sign in to comment.