From 5efa948f1179b8969dbf85f03b3a22e5d96f1213 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Sat, 24 Feb 2024 09:49:37 -0500 Subject: [PATCH] add rpc_methods endpoint (#76) --- Cargo.lock | 3 ++ lib-xps/Cargo.toml | 2 +- lib-xps/src/lib.rs | 107 ++++++++++++++++++++++++++++++++++++++------ lib-xps/src/util.rs | 10 +---- xps/Cargo.toml | 3 ++ xps/src/main.rs | 16 ++++++- 6 files changed, 117 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ec1e39..2beca09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4575,8 +4575,11 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "ethers", "lib-xps", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/lib-xps/Cargo.toml b/lib-xps/Cargo.toml index 667d3b7..99ce550 100644 --- a/lib-xps/Cargo.toml +++ b/lib-xps/Cargo.toml @@ -11,6 +11,7 @@ log.workspace = true tracing.workspace = true tracing-subscriber.workspace = true serde.workspace = true +serde_json.workspace = true tokio.workspace = true async-trait.workspace = true jsonrpsee.workspace = true @@ -30,4 +31,3 @@ messaging = { path = "../messaging" } jsonrpsee = { workspace = true, features = ["macros", "server", "client"] } tokio = { workspace = true, features = ["macros", "rt", "time"] } futures = "0.3" -serde_json.workspace = true diff --git a/lib-xps/src/lib.rs b/lib-xps/src/lib.rs index 1cde25c..3a29730 100644 --- a/lib-xps/src/lib.rs +++ b/lib-xps/src/lib.rs @@ -1,13 +1,11 @@ pub mod rpc; pub mod types; +#[cfg(test)] mod util; use anyhow::Result; -use ethers::{ - abi::Address, - providers::{Provider, Ws}, -}; -use jsonrpsee::server::Server; +use ethers::{abi::Address, providers::Middleware}; +use jsonrpsee::{server::Server, RpcModule}; use std::str::FromStr; use xps_types::{CONVERSATION, DID_ETH_REGISTRY}; @@ -15,24 +13,107 @@ pub use crate::rpc::{XpsClient, XpsMethods, XpsServer}; use crate::types::GatewayContext; /// Entrypoint for the xps Gateway -pub async fn run>(host: String, port: u16, provider: P) -> Result<()> { - crate::util::init_logging(); - +pub async fn run

(host: String, port: u16, provider: P) -> Result<()> +where + P: Middleware + 'static, +{ let server_addr = format!("{}:{}", host, port); - - // a port of 0 allows the OS to choose an open port let server = Server::builder().build(server_addr).await?; let addr = server.local_addr()?; let registry_contract = Address::from_str(DID_ETH_REGISTRY)?; let conversation_contract = Address::from_str(CONVERSATION)?; - let provider = Provider::::connect(provider.as_ref()).await.unwrap(); let context = GatewayContext::new(registry_contract, conversation_contract, provider).await?; - let xps_methods = rpc::XpsMethods::new(&context); - let handle = server.start(xps_methods.into_rpc()); + let mut methods = RpcModule::new(()); + methods.merge(rpc::XpsMethods::new(&context).into_rpc())?; + let methods = build_rpc_api(methods); + + let handle = server.start(methods); log::info!("Server Started at {addr}"); handle.stopped().await; Ok(()) } + +// create an endpoint that lists all the methods available on the server, at the +// endpoint `/rpc_methods` +fn build_rpc_api(mut rpc_api: RpcModule) -> RpcModule { + let mut available_methods = rpc_api.method_names().collect::>(); + // The "rpc_methods" is defined below and we want it to be part of the reported methods. + available_methods.push("rpc_methods"); + available_methods.sort(); + + rpc_api + .register_method("rpc_methods", move |_, _| { + serde_json::json!({ + "methods": available_methods, + }) + }) + .expect("infallible, all other methods have their own address space"); + + rpc_api +} + +#[cfg(test)] +mod tests { + use super::*; + use ethers::{prelude::Provider, types::U64}; + use jsonrpsee::{core::client::ClientT, ws_client::WsClientBuilder}; + + #[tokio::test] + async fn test_run() -> Result<()> { + let (provider, mock) = Provider::mocked(); + // chainID + mock.push(U64::from(0x1)).unwrap(); + let port = 43594; + let handle = tokio::spawn(async move { + match run("127.0.0.1".to_string(), 43594, provider).await { + Err(e) => log::error!("Error running server: {e}"), + Ok(_) => log::info!("Server Stopped"), + } + }); + + // give the server some time to start + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let client = WsClientBuilder::default() + .build(&format!("ws://127.0.0.1:{port}")) + .await?; + + #[derive(Debug, serde::Deserialize)] + struct Methods { + methods: Vec, + } + + let methods = client + .request::>("rpc_methods", vec![]) + .await?; + + assert_eq!( + methods.methods, + vec![ + "rpc_methods", + "xps_balance", + "xps_fetchKeyPackages", + "xps_grantInstallation", + "xps_nonce", + "xps_revokeInstallation", + "xps_sendMessage", + "xps_status", + "xps_walletAddress", + ] + ); + + handle.abort(); + Ok(()) + } + + #[test] + fn test_build_api() { + let methods = RpcModule::new(()); + let methods = build_rpc_api(methods); + let methods: Vec = methods.method_names().map(String::from).collect(); + assert_eq!(methods, vec!["rpc_methods",]); + } +} diff --git a/lib-xps/src/util.rs b/lib-xps/src/util.rs index 4fdbe75..f40a442 100644 --- a/lib-xps/src/util.rs +++ b/lib-xps/src/util.rs @@ -1,17 +1,9 @@ //! Internal Utility functions for use in crate -#[cfg(test)] use std::sync::Once; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry}; -#[cfg(test)] static INIT: Once = Once::new(); -pub(crate) fn init_logging() { - let fmt = fmt::layer().compact(); - Registry::default().with(env()).with(fmt).init() -} - -#[cfg(test)] #[ctor::ctor] fn __init_test_logging() { INIT.call_once(|| { @@ -22,6 +14,6 @@ fn __init_test_logging() { /// Try to get the logging environment from the `RUST_LOG` environment variable. /// If it is not set, use the default of `info`. -fn env() -> EnvFilter { +pub fn env() -> EnvFilter { EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")) } diff --git a/xps/Cargo.toml b/xps/Cargo.toml index 5d3a1d9..03650b7 100644 --- a/xps/Cargo.toml +++ b/xps/Cargo.toml @@ -8,5 +8,8 @@ edition = "2021" [dependencies] anyhow.workspace = true tokio.workspace = true +ethers = { workspace = true, features = ["ws"] } +tracing.workspace = true +tracing-subscriber.workspace = true lib-xps = { path = "../lib-xps" } clap = { version = "4.4.18", features = ["derive"] } diff --git a/xps/src/main.rs b/xps/src/main.rs index ebbe998..f52fc18 100644 --- a/xps/src/main.rs +++ b/xps/src/main.rs @@ -1,6 +1,10 @@ use anyhow::Result; use clap::Parser; +use ethers::providers::{Provider, Ws}; use lib_xps::run; +use tracing_subscriber::{ + filter::LevelFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry, +}; #[derive(Parser, Debug)] #[command(name = "xps", version = "0.1.0", about = "XMTP Postal Service")] @@ -19,11 +23,21 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { + init_logging(); let args = Args::parse(); - crate::run(args.host, args.port, args.endpoint).await?; + let provider = Provider::::connect(&args.endpoint).await?; + crate::run(args.host, args.port, provider).await?; Ok(()) } +fn init_logging() { + let fmt = fmt::layer().compact(); + let env = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + Registry::default().with(env).with(fmt).init() +} + #[cfg(test)] mod tests { use super::*;