diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index 8f3eaad1..d5b1747f 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -1,8 +1,13 @@ +use opentelemetry::global; +use opentelemetry::propagation::Extractor; use std::cmp::Ordering; use std::collections::HashMap; use std::sync::Arc; +use tonic::codegen::http::HeaderMap; use tonic::{transport, transport::Server, Request, Response, Status}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; use limitador::counter::Counter; @@ -48,8 +53,13 @@ impl RateLimitService for MyRateLimiter { debug!("Request received: {:?}", request); let mut values: HashMap = HashMap::new(); - let req = request.into_inner(); + let (metadata, _ext, req) = request.into_parts(); let namespace = req.domain; + let rl_headers = RateLimitRequestHeaders::new(metadata.into_headers()); + let parent_context = + global::get_text_map_propagator(|propagator| propagator.extract(&rl_headers)); + let span = Span::current(); + span.set_parent(parent_context); if namespace.is_empty() { return Ok(Response::new(RateLimitResponse { @@ -195,6 +205,27 @@ pub fn to_response_header( headers } +struct RateLimitRequestHeaders { + inner: HeaderMap, +} +impl RateLimitRequestHeaders { + pub fn new(inner: HeaderMap) -> Self { + Self { inner } + } +} +impl Extractor for RateLimitRequestHeaders { + fn get(&self, key: &str) -> Option<&str> { + match self.inner.get(key) { + Some(v) => v.to_str().ok(), + None => None, + } + } + + fn keys(&self) -> Vec<&str> { + self.inner.keys().map(|k| k.as_str()).collect() + } +} + mod rls_proto { pub(crate) const RLS_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("rls"); } diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 6081b324..8aff7275 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -32,8 +32,9 @@ use limitador::{ }; use notify::event::{ModifyKind, RenameMode}; use notify::{Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; -use opentelemetry::KeyValue; +use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::{trace, Resource}; use std::env::VarError; use std::fmt::Display; @@ -304,6 +305,7 @@ async fn main() -> Result<(), Box> { }; if !config.tracing_endpoint.is_empty() { + global::set_text_map_propagator(TraceContextPropagator::new()); let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter( @@ -312,7 +314,7 @@ async fn main() -> Result<(), Box> { .with_endpoint(config.tracing_endpoint.clone()), ) .with_trace_config(trace::config().with_resource(Resource::new(vec![ - KeyValue::new("service.name", "limitador-server"), + KeyValue::new("service.name", "limitador"), ]))) .install_batch(opentelemetry_sdk::runtime::Tokio)?; let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);