Skip to content

Commit

Permalink
Observability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Sep 12, 2024
1 parent 9fedd65 commit d3bb067
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 84 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Oracle Provider Prometheus metrics improvements

## [0.36.0] - 2024-09-10
### Changed
- Oracle Provider: Updated to use V2 `/query` REST API
Expand Down
10 changes: 6 additions & 4 deletions deny.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[bans]
# Forbid multiple versions of same dependency (with some exceptions)
# TODO: Change to "deny" once we crack down on duplication
multiple-versions = "warn"
multiple-versions = "allow"

# Avoid adding dependencies to this list as this slows down compilation.
# Find another ways to avoid duplication.
Expand All @@ -11,8 +11,11 @@ skip-tree = []
wildcards = "deny"

# We specify features explicitly to avoid bloat
workspace-default-features = "deny"
features = []
# TODO: https://github.com/EmbarkStudios/cargo-deny/issues/699
# external-default-features = "deny"

# TODO: https://github.com/EmbarkStudios/cargo-deny/issues/700
# workspace-default-features = "deny"

deny = [
### Crates we shouldn't use ####
Expand Down Expand Up @@ -85,4 +88,3 @@ ignore = [
"RUSTSEC-2023-0071", # https://rustsec.org/advisories/RUSTSEC-2023-0071.html,
"RUSTSEC-2024-0370", # https://rustsec.org/advisories/RUSTSEC-2024-0370.html
]

5 changes: 4 additions & 1 deletion src/app/api-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ messaging-outbox = { workspace = true }
graceful-shutdown = { workspace = true }
http-common = { workspace = true }
internal-error = { workspace = true }
observability = { workspace = true }
observability = { workspace = true, default-features = false, features = [
"opentelemetry",
"prometheus",
] }
opendatafabric = { workspace = true }
time-source = { workspace = true }
database-common = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion src/app/oracle-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ publish = { workspace = true }

[dependencies]
graceful-shutdown = { workspace = true, default-features = false }
observability = { workspace = true, default-features = false }
observability = { workspace = true, default-features = false, features = [
"opentelemetry",
"prometheus",
] }
opendatafabric = { workspace = true, default-features = false }

alloy = { version = "0.2", default-features = false, features = [
Expand Down
25 changes: 12 additions & 13 deletions src/app/oracle-provider/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,23 @@ const DEFAULT_RUST_LOG: &str = "debug,kamu=trace,alloy_transport_http=info,alloy
pub async fn run(args: Cli, config: Config) -> Result<(), InternalError> {
tracing::info!(?args, ?config, "Starting ODF Oracle provider");

let metrics_reg = prometheus::Registry::new();

let http_server = build_http_server(
config.http_address.parse().unwrap(),
config.http_port,
metrics_reg.clone(),
);
let http_address = config.http_address.parse().unwrap();
let http_port = config.http_port;

let rpc_client = init_rpc_client(&config).await?;
let api_client = init_api_client(&config).await?;

let metrics = OdfOracleProviderMetrics::new();
let metrics_reg = prometheus::Registry::new();
let metrics =
OdfOracleProviderMetrics::new(config.chain_id, config.api_url.host_str().unwrap());
metrics.register(&metrics_reg).int_err()?;

let provider = OdfOracleProvider::new(config, rpc_client, api_client, metrics);

let catalog = dill::CatalogBuilder::new().add_value(metrics_reg).build();

let http_server = build_http_server(http_address, http_port, catalog);

tracing::info!("HTTP API is listening on {}", http_server.local_addr());

let shutdown_requested = graceful_shutdown::trap_signals();
Expand Down Expand Up @@ -126,7 +128,7 @@ pub async fn init_api_client(config: &Config) -> Result<Arc<dyn OdfApiClient>, I
fn build_http_server(
address: std::net::IpAddr,
http_port: u16,
metrics_reg: prometheus::Registry,
catalog: dill::Catalog,
) -> axum::Server<hyper::server::conn::AddrIncoming, axum::routing::IntoMakeService<axum::Router>> {
let app = axum::Router::new()
.route(
Expand All @@ -137,10 +139,7 @@ fn build_http_server(
"/system/metrics",
axum::routing::get(observability::metrics::metrics_handler),
)
.layer(axum::extract::Extension(
dill::CatalogBuilder::new().build(),
))
.layer(axum::extract::Extension(metrics_reg));
.layer(axum::extract::Extension(catalog));

let addr = std::net::SocketAddr::from((address, http_port));

Expand Down
69 changes: 39 additions & 30 deletions src/app/oracle-provider/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use alloy::transports::BoxTransport;
use chrono::{DateTime, Utc};
use internal_error::*;
use opendatafabric::{DatasetID, Multihash};
use tracing::Instrument;

use crate::api_client::*;
use crate::Config;
Expand Down Expand Up @@ -148,21 +149,23 @@ pub struct OdfOracleProviderMetrics {
}

impl OdfOracleProviderMetrics {
pub fn new() -> Self {
pub fn new(chain_id: u64, node_host: &str) -> Self {
use prometheus::*;

Self {
wallet_balance: prometheus::Gauge::new(
"wallet_balance",
"Balance of the provider's wallet",
wallet_balance: Gauge::with_opts(
Opts::new("wallet_balance", "Balance of the provider's wallet")
.const_label("chain_id", chain_id.to_string()),
)
.unwrap(),
api_queries_num: prometheus::IntCounter::new(
"api_queries_num",
"ODF API queries executed",
api_queries_num: IntCounter::with_opts(
Opts::new("api_queries_num", "ODF API queries executed")
.const_label("node_host", node_host),
)
.unwrap(),
transactions_num: prometheus::IntCounter::new(
"transactions_num",
"Chain transactions submitted",
transactions_num: IntCounter::with_opts(
Opts::new("transactions_num", "Chain transactions submitted")
.const_label("chain_id", chain_id.to_string()),
)
.unwrap(),
}
Expand All @@ -176,12 +179,6 @@ impl OdfOracleProviderMetrics {
}
}

impl Default for OdfOracleProviderMetrics {
fn default() -> Self {
Self::new()
}
}

/////////////////////////////////////////////////////////////////////////////////////////

pub struct OdfOracleProvider<P: Provider> {
Expand Down Expand Up @@ -338,10 +335,10 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
loop {
// TODO: Operate on blocks that have >N confirmations to avoid running into too
// many reorgs?
let last_block = self.rpc_client.get_block_number().await.int_err()?;
let to_block = self.rpc_client.get_block_number().await.int_err()?;

// TODO: Reorg resistance
if from_block > last_block {
if from_block > to_block {
if idle_start.is_none() {
tracing::debug!("Waiting for new blocks");
idle_start = Some(std::time::Instant::now());
Expand All @@ -357,17 +354,15 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
idle_start = None;
}

// TODO: Refactor towards concurrent streams model where blockchain scanning
// continues independently from execution and submitting
// transactions
let pending_requests = self.scan_block_range(from_block, last_block).await?;
if !pending_requests.is_empty() {
let results = self.process_request_batch(pending_requests).await?;
self.send_results(results).await?;
}
let span =
observability::tracing::root_span!("process_block_range", from_block, to_block);

self.process_block_range(from_block, to_block)
.instrument(span)
.await?;

// TODO: Chain reorg resistance
from_block = last_block + 1;
from_block = to_block + 1;
}
}

Expand All @@ -393,9 +388,7 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
return Ok(());
}

let pending_requests = self.scan_block_range(from_block, to_block).await?;
let results = self.process_request_batch(pending_requests).await?;
self.send_results(results).await?;
self.process_block_range(from_block, to_block).await?;

Ok(())
}
Expand Down Expand Up @@ -433,6 +426,22 @@ impl<P: Provider + Clone> OdfOracleProvider<P> {
Ok(())
}

async fn process_block_range(
&self,
from_block: u64,
to_block: u64,
) -> Result<(), InternalError> {
// TODO: Refactor towards concurrent streams model where blockchain scanning
// continues independently from execution and submitting
// transactions
let pending_requests = self.scan_block_range(from_block, to_block).await?;
if !pending_requests.is_empty() {
let results = self.process_request_batch(pending_requests).await?;
self.send_results(results).await?;
}
Ok(())
}

// TODO: This code is much more complex than it should be because of the issue
// in ethers where calling `contract.events()` sets the address filter but
// not the topic0 filter, thus when used on an interface rather than final
Expand Down
2 changes: 1 addition & 1 deletion src/app/oracle-provider/tests/tests/test_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn test_oracle_e2e() {
config,
rpc_client.clone(),
api_client,
provider::OdfOracleProviderMetrics::new(),
provider::OdfOracleProviderMetrics::new(0, "localhost"),
);

provider.run_once(Some(0), None).await.unwrap();
Expand Down
40 changes: 28 additions & 12 deletions src/utils/observability/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "observability"
description = "Utilities for tracing and structured logging"
description = "Utilities for tracing, structured logging, and metrics"
version = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
Expand All @@ -21,6 +21,20 @@ workspace = true
doctest = false


[features]
default = []

opentelemetry = [
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:opentelemetry-otlp",
"dep:opentelemetry-semantic-conventions",
"dep:tracing-opentelemetry",
]

prometheus = ["dep:prometheus"]


[dependencies]
async-trait = { version = "0.1" }
axum = { version = "0.6", default-features = false, features = [
Expand All @@ -30,25 +44,27 @@ axum = { version = "0.6", default-features = false, features = [
] }
dill = { version = "0.9", default-features = false }
http = { version = "0.2", default-features = false }
opentelemetry = { version = "0.23", default-features = false }
opentelemetry_sdk = { version = "0.23", default-features = false, features = [
"rt-tokio",
] }
opentelemetry-otlp = { version = "0.16", default-features = false, features = [
"trace",
"grpc-tonic",
] }
opentelemetry-semantic-conventions = { version = "0.16", default-features = false }
prometheus = { version = "0.13", default-features = false }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", default-features = false }
thiserror = { version = "1", default-features = false }
tracing = { version = "0.1", default-features = false }
tracing-appender = { version = "0.2", default-features = false }
tracing-opentelemetry = { version = "0.24", default-features = false }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tower = { version = "0.4", default-features = false }
tower-http = { version = "0.4", default-features = false, features = ["trace"] }

opentelemetry = { optional = true, version = "0.23", default-features = false }
opentelemetry_sdk = { optional = true, version = "0.23", default-features = false, features = [
"rt-tokio",
] }
opentelemetry-otlp = { optional = true, version = "0.16", default-features = false, features = [
"trace",
"grpc-tonic",
] }
opentelemetry-semantic-conventions = { optional = true, version = "0.16", default-features = false }
tracing-opentelemetry = { optional = true, version = "0.24", default-features = false }

prometheus = { optional = true, version = "0.13", default-features = false }


[dev-dependencies]
18 changes: 5 additions & 13 deletions src/utils/observability/src/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,22 @@ pub struct MakeSpan;

impl<B> tower_http::trace::MakeSpan<B> for MakeSpan {
fn make_span(&mut self, request: &http::Request<B>) -> tracing::Span {
use opentelemetry::trace::TraceContextExt as _;
use tracing_opentelemetry::OpenTelemetrySpanExt as _;

// TODO: Extract parent context from the request

let method = request.method();
let route = RouteOrUri::from(request);

let span = tracing::info_span!(
let span = crate::tracing::root_span!(
"http_request",
%method,
%route,
// Placeholders for OTEL fileds that will be populated after span creation
trace_id = tracing::field::Empty,
"otel.name" = tracing::field::Empty,
);

// Extract trace ID from the OTEL context and add it to the tracing span
let context = span.context();
let otel_span = context.span();
let span_context = otel_span.span_context();
let trace_id = span_context.trace_id();
if span_context.is_valid() {
span.record("trace_id", tracing::field::display(trace_id));
#[cfg(feature = "opentelemetry")]
{
crate::tracing::include_otel_trace_id(&span);

span.record(
"otel.name",
tracing::field::display(SpanName::new(method, route)),
Expand Down
16 changes: 12 additions & 4 deletions src/utils/observability/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ pub trait HealthCheck: Send + Sync {
pub async fn health_handler(
axum::Extension(catalog): axum::Extension<dill::Catalog>,
axum::extract::Query(args): axum::extract::Query<CheckArgs>,
) -> Result<axum::Json<serde_json::Value>, CheckError> {
) -> Result<axum::Json<CheckSuccess>, CheckError> {
for checker in catalog.get::<dill::AllOf<dyn HealthCheck>>().unwrap() {
checker.check(args.r#type).await?;
}

Ok(axum::Json(serde_json::json!({
"ok": true,
})))
Ok(axum::Json(CheckSuccess { ok: true }))
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CheckSuccess {
pub ok: bool,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -77,3 +83,5 @@ pub struct CheckArgs {
#[serde(default)]
pub r#type: CheckType,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Loading

0 comments on commit d3bb067

Please sign in to comment.