Skip to content

Commit

Permalink
Merge pull request #182 from shashitnak/std-protobuf
Browse files Browse the repository at this point in the history
feat: adding wasm compatibility, windows support and removing protoc dependency
  • Loading branch information
sunli829 authored Mar 30, 2024
2 parents a608f96 + 3eacb3c commit c61c0a8
Show file tree
Hide file tree
Showing 12 changed files with 812 additions and 867 deletions.
1,291 changes: 604 additions & 687 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 23 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ license = "MIT"
keywords = ["async_graphql", "async", "graphql", "apollo", "studio"]
categories = ["network-programming", "asynchronous"]
edition = "2021"
resolver = "2"

[features]
default = ["tokio-comp", "compression"]
default = ["compression"]
compression = ["libflate"]
tokio-comp = ["tokio"]

[dependencies]
anyhow = "1"
Expand All @@ -25,21 +25,33 @@ cfg-if = "1"
derive_builder = "0.13"
futures = "0.3"
futures-locks = "0.7"
prost = "0.12"
prost-types = "0.12"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
sha2 = "0.10"
tonic = "0.10"
serde-json-wasm = "1.0.1"
tracing = "0.1"
tracing-futures = { version = "0.2.5", default-features = false, features = ["tokio", "futures-03", "std"] }
uuid = { version = "1.7", features = ["v4"] } # A library to generate and parse UUIDs.
uuid = { version = "1.7", features = ["v4", "js"] } # A library to generate and parse UUIDs.
wasm-bindgen-futures = "0.4.18"
protobuf = "3.4.0"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }

# Non-feature optional dependencies
libflate = { version = "2", optional = true }
tokio = { version = "1", features = ["full"], optional = true }

[target.'cfg(all(not(target_arch = "wasm32"), not(target_os = "windows")))'.dependencies]
uname = "0.1.1"

[build-dependencies]
reqwest = { version = "0.11", default-features = false, features = ["blocking", "rustls-tls"] }
protobuf-codegen = "3.4.0"
async-std = { version = "1", default-features = false, features = ["default", "tokio1"] }
cfg-if = "1.0.0"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
prost-build = "0.12.3"
protox = "0.6.0"
tonic-build = "0.10"

[target.'cfg(not(target_arch = "wasm32"))'.build-dependencies]
tokio = { version = "1", features = ["full"] }
54 changes: 40 additions & 14 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// Derived from https://github.com/pellizzetti/router/blob/cc0ebcaf1d68184e1fe06f16534fddff76286b40/apollo-spaceport/build.rs
use protobuf_codegen::Customize;
use std::io::Write;
use std::path::Path;
use std::{
error::Error,
fs::File,
Expand All @@ -11,8 +14,18 @@ fn main() -> Result<(), Box<dyn Error>> {
} else {
// Retrieve a live version of the reports.proto file
let proto_url = "https://usage-reporting.api.apollographql.com/proto/reports.proto";
let response = reqwest::blocking::get(proto_url)?;
let mut content = response.text()?;
let fut = reqwest::get(proto_url);

cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
let rt = tokio::runtime::Runtime::new().unwrap();
let response = rt.block_on(fut)?;
let mut content = rt.block_on(response.text())?;
} else {
let response = async_std::task::block_on(fut)?;
let mut content = async_std::task::block_on(response.text())?;
}
}

// Process the retrieved content to:
// - Insert a package Report; line after the import lines (currently only one) and before the first message definition
Expand Down Expand Up @@ -45,19 +58,32 @@ fn main() -> Result<(), Box<dyn Error>> {
}

// Process the proto files
let proto_files = vec!["proto/agents.proto", "proto/reports.proto"];
let proto_files = vec!["proto/reports.proto"];

protobuf_codegen::Codegen::new()
.pure()
.cargo_out_dir("proto")
.inputs(&proto_files)
.include(".")
.customize(Customize::default().gen_mod_rs(false))
.run_from_script();

let out_dir = std::env::var("OUT_DIR")?;
let path = Path::new(&out_dir).join("proto").join("reports.rs");
let content = std::fs::read_to_string(&path)?;

let content = content
.lines()
.filter(|line| !(line.contains("#![") || line.contains("//!")))
.fold(String::new(), |mut content, line| {
content.push_str(line);
content.push('\n');
content
});

tonic_build::configure()
.type_attribute("ContextualizedStats", "#[derive(serde::Serialize)]")
.type_attribute("StatsContext", "#[derive(serde::Serialize)]")
.type_attribute("QueryLatencyStats", "#[derive(serde::Serialize)]")
.type_attribute("TypeStat", "#[derive(serde::Serialize)]")
.type_attribute("PathErrorStats", "#[derive(serde::Serialize)]")
.type_attribute("FieldStat", "#[derive(serde::Serialize)]")
.type_attribute("ReferencedFieldsForType", "#[derive(serde::Serialize)]")
.type_attribute("StatsContext", "#[derive(Eq, Hash)]")
.build_server(true)
.compile(&proto_files, &["."])?;
std::fs::remove_file(&path)?;
let mut file = std::fs::File::create(&path)?;
file.write_all(content.as_bytes())?;

for file in proto_files {
println!("cargo:rerun-if-changed={}", file);
Expand Down
8 changes: 4 additions & 4 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#[cfg(feature = "compression")]
#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
use libflate::gzip;

#[cfg(feature = "compression")]
#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
const TARGET_LOG_COMPRESSION: &str = "apollo-studio-extension-compression";

#[cfg(feature = "compression")]
#[cfg(all(feature = "compression", not(target_arch = "wasm32")))]
pub fn compress(msg: Vec<u8>) -> Result<Vec<u8>, std::io::Error> {
let mut encoder = gzip::Encoder::new(Vec::new()).unwrap();
let mut msg = std::io::Cursor::new(msg);
Expand All @@ -20,7 +20,7 @@ pub fn compress(msg: Vec<u8>) -> Result<Vec<u8>, std::io::Error> {
encoder.finish().into_result()
}

#[cfg(not(feature = "compression"))]
#[cfg(any(not(feature = "compression"), target_arch = "wasm32"))]
pub fn compress(msg: Vec<u8>) -> Result<Vec<u8>, std::io::Error> {
Ok::<Vec<u8>, std::io::Error>(msg)
}
79 changes: 44 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,25 @@
//!
//! * `compression` - To enable GZIP Compression when sending traces to Apollo Studio.
mod compression;
mod packages;
mod proto;
pub mod register;
mod report_aggregator;

mod runtime;
mod packages;

use futures::SinkExt;
use prost_types::Timestamp;
use protobuf::{well_known_types::timestamp::Timestamp, EnumOrUnknown, MessageField};
use report_aggregator::ReportAggregator;
use runtime::spawn;
use packages::serde_json;

#[macro_use]
extern crate tracing;

use futures_locks::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use async_graphql::QueryPathSegment;
use chrono::{DateTime, Utc};
Expand All @@ -55,13 +57,13 @@ use async_graphql::extensions::{
};
use async_graphql::parser::types::{ExecutableDocument, OperationType, Selection};
use async_graphql::{Response, ServerResult, Value, Variables};
use proto::report::{
use proto::reports::{
trace::{self, node, Node},
Trace,
};
use std::convert::TryInto;

pub use proto::report::trace::http::Method;
pub use proto::reports::trace::http::Method;

/// Apollo Tracing Extension to send traces to Apollo Studio
/// The extension to include to your `async_graphql` instance to connect with Apollo Studio.
Expand Down Expand Up @@ -185,7 +187,7 @@ impl Extension for ApolloTracingExtension {
.any(|(_, operation)| operation.node.selection_set.node.items.iter().any(|selection| matches!(&selection.node, Selection::Field(field) if field.node.name.node == "__schema")));
if !is_schema {
let result: String =
ctx.stringify_execute_doc(&document, &Variables::from_json(serde_json::json!({})));
ctx.stringify_execute_doc(&document, &Variables::from_json(serde_json::from_str("{}").unwrap()));
let name = document
.operations
.iter()
Expand All @@ -194,7 +196,7 @@ impl Extension for ApolloTracingExtension {
.map(|x| x.as_str())
.unwrap_or("no_name");
let query_type = format!("# {name}\n {query}", name = name, query = result);
*self.operation_name.write().await = query_type;
*self.operation_name.write().unwrap() = query_type;
}
Ok(document)
}
Expand Down Expand Up @@ -227,7 +229,9 @@ impl Extension for ApolloTracingExtension {
let client_version = tracing_extension
.client_version
.unwrap_or_else(|| "no client version".to_string());
let method = tracing_extension.method.unwrap_or(Method::Unknown);
let method = tracing_extension
.method
.or(<Method as protobuf::Enum>::from_str("UNKNOWN"));
let status_code = tracing_extension.status_code.unwrap_or(0);

let mut trace: Trace = Trace {
Expand All @@ -245,34 +249,39 @@ impl Extension for ApolloTracingExtension {
.map(|x| x.to_string())
.unwrap_or_else(|| "no operation".to_string()),
..Default::default()
});
})
.into();

trace.http = Some(trace::Http {
method: method.into(),
trace.http = Some(trace::HTTP {
method: EnumOrUnknown::new(method.unwrap()),
status_code,
..Default::default()
});
})
.into();

trace.end_time = Some(Timestamp {
trace.end_time = MessageField::some(Timestamp {
nanos: inner.end_time.timestamp_subsec_nanos().try_into().unwrap(),
seconds: inner.end_time.timestamp(),
special_fields: Default::default(),
});

trace.start_time = Some(Timestamp {
nanos: inner
.start_time
.timestamp_subsec_nanos()
.try_into()
.unwrap(),
seconds: inner.start_time.timestamp(),
});
trace.start_time =
protobuf::MessageField::some(protobuf::well_known_types::timestamp::Timestamp {
nanos: inner
.start_time
.timestamp_subsec_nanos()
.try_into()
.unwrap(),
seconds: inner.start_time.timestamp(),
special_fields: Default::default(),
});

let root_node = self.root_node.read().await;
trace.root = Some(root_node.clone());
let root_node = self.root_node.read().unwrap();
trace.root = Some(root_node.clone()).into();

let mut sender = self.report.sender();

let operation_name = self.operation_name.read().await.clone();
let operation_name = self.operation_name.read().unwrap().clone();

let _handle = spawn(async move {
if let Err(e) = sender.send((operation_name, trace)).await {
Expand All @@ -295,7 +304,7 @@ impl Extension for ApolloTracingExtension {
let path = info.path_node.to_string_vec().join(".");
let field_name = info.path_node.field_name().to_string();
let parent_type = info.parent_type.to_string();
let return_type = info.return_type.to_string();
let _return_type = info.return_type.to_string();
let start_time = Utc::now() - self.inner.lock().await.start_time;
let path_node = info.path_node;

Expand All @@ -320,12 +329,11 @@ impl Extension for ApolloTracingExtension {
},
parent_type: parent_type.to_string(),
original_field_name: field_name,
r#type: return_type,
..Default::default()
};

let node = Arc::new(RwLock::new(node));
self.nodes.write().await.insert(path, node.clone());
self.nodes.write().unwrap().insert(path, node.clone());
let parent_node = path_node.parent.map(|x| x.to_string_vec().join("."));
// Use the path to create a new node
// https://github.com/apollographql/apollo-server/blob/291c17e255122d4733b23177500188d68fac55ce/packages/apollo-server-core/src/plugin/traceTreeBuilder.ts
Expand All @@ -334,7 +342,7 @@ impl Extension for ApolloTracingExtension {
Err(e) => {
let json = match serde_json::to_string(&e) {
Ok(content) => content,
Err(e) => serde_json::json!({ "error": format!("{:?}", e) }).to_string(),
Err(e) => format!("{{ \"error\": \"{e:?}\" }}"),
};
let error = trace::Error {
message: e.message.clone(),
Expand All @@ -345,19 +353,20 @@ impl Extension for ApolloTracingExtension {
.map(|x| trace::Location {
line: x.line as u32,
column: x.column as u32,
special_fields: protobuf::SpecialFields::default(),
})
.collect(),
json,
..Default::default()
};

node.write().await.error = vec![error];
node.write().unwrap().error = vec![error];
Err(e)
}
};
let end_time = Utc::now() - self.inner.lock().await.start_time;

node.write().await.end_time = match end_time
node.write().unwrap().end_time = match end_time
.num_nanoseconds()
.and_then(|x| u64::try_from(x).ok())
{
Expand All @@ -371,19 +380,19 @@ impl Extension for ApolloTracingExtension {

match parent_node {
None => {
let mut root_node = self.root_node.write().await;
let mut root_node = self.root_node.write().unwrap();
let child = &mut root_node.child;
let node = node.read().await;
let node = node.read().unwrap();
// Can't copy or pass a ref to Protobuf
// So we clone
child.push(node.clone());
}
Some(parent) => {
let nodes = self.nodes.read().await;
let nodes = self.nodes.read().unwrap();
let node_read = nodes.get(&parent).unwrap();
let mut parent = node_read.write().await;
let mut parent = node_read.write().unwrap();
let child = &mut parent.child;
let node = node.read().await;
let node = node.read().unwrap();
// Can't copy or pass a ref to Protobuf
// So we clone
child.push(node.clone());
Expand Down
4 changes: 3 additions & 1 deletion src/packages/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod uname;

pub mod serde_json;
pub mod uname;
5 changes: 5 additions & 0 deletions src/packages/serde_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

#[cfg(not(target_arch = "wasm32"))]
pub use serde_json::*;
#[cfg(target_arch = "wasm32")]
pub use serde_json_wasm::*;
Loading

0 comments on commit c61c0a8

Please sign in to comment.