Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async API when Response has been processed. #1281

Merged
merged 29 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4773227
feat: add `RpcModule::register_raw_method`
niklasad1 Jan 30, 2024
ee9b8ea
add proc macro support
niklasad1 Jan 30, 2024
b595e5f
rename API
niklasad1 Jan 30, 2024
304c938
simplify API with MethodResponse::notify_when_sent
niklasad1 Jan 31, 2024
973e62d
improve notify API
niklasad1 Jan 31, 2024
90da71b
fix nits
niklasad1 Jan 31, 2024
53520d8
introduce ResponsePayloadV2
niklasad1 Jan 31, 2024
f71d046
impl ResponsePayloadV2 for T
niklasad1 Feb 1, 2024
dc88a6d
cleanup
niklasad1 Feb 1, 2024
51ce17e
client: proc macro support for custom ret_ty
niklasad1 Feb 1, 2024
ffea2ca
add tests
niklasad1 Feb 2, 2024
f549bf4
address grumbles
niklasad1 Feb 2, 2024
2f1fdd8
remove unused code
niklasad1 Feb 2, 2024
0c98f5c
fix tests
niklasad1 Feb 3, 2024
db7e08c
proc: revert unrelated changes
niklasad1 Feb 5, 2024
925108a
remove panics; move should be enough
niklasad1 Feb 5, 2024
d256661
bring back UI tests
niklasad1 Feb 5, 2024
0eba672
grumbles: remove NotifiedError
niklasad1 Feb 5, 2024
8391579
break stuff for uniform API
niklasad1 Feb 5, 2024
f4b5a92
make more stuff private
niklasad1 Feb 5, 2024
e835a72
remove ResponseErrorUnit type alias
niklasad1 Feb 5, 2024
7f93b20
fix ui tests
niklasad1 Feb 5, 2024
7006a00
Update proc-macros/src/render_server.rs
niklasad1 Feb 5, 2024
487df31
Rename ws_notify_on_method_answered.rs to response_payload_notify_on_…
niklasad1 Feb 5, 2024
f9d7fc3
Merge remote-tracking branch 'origin/master' into low-level-api-v1
niklasad1 Feb 6, 2024
da41c62
remove unit_error APIs
niklasad1 Feb 6, 2024
a83c766
Merge remote-tracking branch 'origin/low-level-api-v1' into low-level…
niklasad1 Feb 6, 2024
802f77f
replace notify_on_x with notify_on_completion
niklasad1 Feb 6, 2024
2836906
Update server/src/transport/ws.rs
niklasad1 Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 241 additions & 28 deletions core/src/server/helpers.rs

Large diffs are not rendered by default.

45 changes: 28 additions & 17 deletions core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod rpc_module;
mod subscription;

pub use error::*;
pub use helpers::{BatchResponseBuilder, BoundedWriter, MethodResponse, MethodSink};
pub use helpers::*;
pub use rpc_module::*;
pub use subscription::*;

Expand All @@ -53,7 +53,7 @@ pub trait IntoResponse {
type Output: serde::Serialize + Clone;

/// Something that can be converted into a JSON-RPC method call response.
fn into_response(self) -> ResponsePayload<'static, Self::Output>;
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output>;
}

impl<T, E: Into<ErrorObjectOwned>> IntoResponse for Result<T, E>
Expand All @@ -62,10 +62,10 @@ where
{
type Output = T;

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
match self {
Ok(val) => ResponsePayload::result(val),
Err(e) => ResponsePayload::Error(e.into()),
Ok(val) => ResponsePayloadV2::result(val),
Err(e) => ResponsePayloadV2::new(ResponsePayload::Error(e.into())),
}
}
}
Expand All @@ -76,8 +76,8 @@ where
{
type Output = Option<T>;

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
ResponsePayload::result(self)
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
ResponsePayloadV2::result(self)
}
}

Expand All @@ -87,8 +87,8 @@ where
{
type Output = Vec<T>;

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
ResponsePayload::result(self)
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
ResponsePayloadV2::result(self)
}
}

Expand All @@ -98,8 +98,8 @@ where
{
type Output = [T; N];

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
ResponsePayload::result(self)
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
ResponsePayloadV2::result(self)
}
}

Expand All @@ -109,16 +109,27 @@ where
{
type Output = T;

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
ResponsePayloadV2::new(self)
}
}

impl<T> IntoResponse for ResponsePayloadV2<'static, T>
where
T: serde::Serialize + Clone,
{
type Output = T;

fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
self
}
}

impl IntoResponse for ErrorObjectOwned {
type Output = ErrorObjectOwned;
type Output = ();

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
ResponsePayload::Error(self)
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
ResponsePayloadV2::error(self)
}
}

Expand All @@ -128,8 +139,8 @@ macro_rules! impl_into_response {
impl IntoResponse for $n {
type Output = $n;

fn into_response(self) -> ResponsePayload<'static, Self::Output> {
ResponsePayload::result(self)
fn into_response(self) -> ResponsePayloadV2<'static, Self::Output> {
ResponsePayloadV2::result(self)
}
}
)+
Expand Down
90 changes: 82 additions & 8 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ use std::sync::Arc;

use crate::error::RegisterMethodError;
use crate::id_providers::RandomIntegerIdProvider;
use crate::server::LOG_TARGET;
use crate::server::helpers::{MethodResponse, MethodSink};
use crate::server::subscription::{
sub_message_to_json, BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink,
SubNotifResultOrError, Subscribers, Subscription, SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit,
SubscriptionState,
};
use crate::server::{ResponsePayloadV2, LOG_TARGET};
use crate::traits::ToRpcParams;
use futures_util::{future::BoxFuture, FutureExt};
use jsonrpsee_types::error::{ErrorCode, ErrorObject};
use jsonrpsee_types::{
Id, Params, Request, Response, ResponsePayload, ResponseSuccess, SubscriptionId as RpcSubscriptionId, ErrorObjectOwned,
ErrorObjectOwned, Id, Params, Request, Response, ResponseSuccess, SubscriptionId as RpcSubscriptionId,
};
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -78,7 +78,6 @@ pub type MaxResponseSize = usize;
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
pub type RawRpcResponse = (MethodResponse, mpsc::Receiver<String>);


/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
#[derive(thiserror::Error, Debug)]
pub enum MethodsError {
Expand All @@ -97,12 +96,11 @@ pub enum MethodsError {
/// and `Subscribe` calls are handled differently
/// because we want to prevent subscriptions to start
/// before the actual subscription call has been answered.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum CallOrSubscription {
/// The subscription callback itself sends back the result
/// so it must not be sent back again.
Subscription(MethodResponse),

/// Treat it as ordinary call.
Call(MethodResponse),
}
Expand Down Expand Up @@ -430,7 +428,8 @@ impl Methods {
let as_success: ResponseSuccess<serde_json::Value> =
serde_json::from_str::<Response<_>>(&resp.result)?.try_into()?;

let sub_id = as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.result.clone()))?;
let sub_id =
as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.result.clone()))?;

Ok(Subscription { sub_id, rx })
}
Expand Down Expand Up @@ -485,7 +484,71 @@ impl<Context> From<RpcModule<Context>> for Methods {
}

impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// Register a low-level method call where
/// it's possible to get enforce order of things
/// on the connection such as ordering that response
/// is sent before a subscription notifcation or something similar.
///
/// ## Examples
///
/// ```
/// use jsonrpsee_core::server::{RpcModule, MethodResponse, response_channel};
/// use jsonrpsee_types::ResponsePayload;
/// use futures_util::FutureExt;
///
/// let mut module = RpcModule::new(());
/// module.register_raw_method("say_hello", |id, _params, _ctx, max_response_size| {
/// let (tx, rx) = response_channel();
///
/// // This future will be spawned after the method call has been
/// // sent out on the socket message buffer.
/// // This may useful if one needs to order operations in some manner.
/// tokio::spawn(async move {
/// // Wait for response to sent to the internal WebSocket message buffer
/// // and if that fails just quit because it means that the connection
// // was already closed.
/// if rx.is_sent().await.is_err() {
/// return;
/// }
///
/// loop {
/// tokio::time::sleep(std::time::Duration::from_millis(100)).await;
/// println!("after method response");
/// }
/// });
///
/// MethodResponse::response(id, ResponsePayloadV2::result("foo"), max_response_size).notify_on_success(tx)
/// }).unwrap();
///```
pub fn register_raw_method<F>(
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
method_name: &'static str,
callback: F,
) -> Result<&mut MethodCallback, RegisterMethodError>
where
Context: Send + Sync + 'static,
F: Fn(Id, Params, &Context, MaxResponseSize) -> MethodResponse + Send + Sync + 'static,
{
let ctx = self.ctx.clone();
self.methods.verify_and_insert(
method_name,
MethodCallback::Sync(Arc::new(move |id, params, max_response_size| {
callback(id, params, &*ctx, max_response_size)
})),
)
}

/// Register a new synchronous RPC method, which computes the response with the given callback.
///
/// ## Examples
///
/// ```
/// use jsonrpsee_core::server::RpcModule;
///
/// let mut module = RpcModule::new(());
/// module.register_method("say_hello", |_params, _ctx| "lo").unwrap();
/// ```

pub fn register_method<R, F>(
&mut self,
method_name: &'static str,
Expand All @@ -507,6 +570,17 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}

/// Register a new asynchronous RPC method, which computes the response with the given callback.
///
/// ## Examples
///
/// ```
/// use jsonrpsee_core::server::RpcModule;
///
/// let mut module = RpcModule::new(());
/// module.register_async_method("say_hello", |_params, _ctx| async { "lo" }).unwrap();
///
/// ```
///
pub fn register_async_method<R, Fun, Fut>(
&mut self,
method_name: &'static str,
Expand Down Expand Up @@ -884,7 +958,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
id
);

return MethodResponse::response(id, ResponsePayload::result(false), max_response_size);
return MethodResponse::response(id, ResponsePayloadV2::result(false), max_response_size);
}
};

Expand All @@ -900,7 +974,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
);
}

MethodResponse::response(id, ResponsePayload::result(result), max_response_size)
MethodResponse::response(id, ResponsePayloadV2::result(result), max_response_size)
})),
);
}
Expand Down
12 changes: 5 additions & 7 deletions core/src/server/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@

//! Subscription related types and traits for server implementations.

use super::MethodsError;
use super::helpers::{MethodResponse, MethodSink};
use crate::server::LOG_TARGET;
use super::{MethodsError, ResponsePayloadV2};
use crate::server::error::{DisconnectError, PendingSubscriptionAcceptError, SendTimeoutError, TrySendError};
use crate::server::rpc_module::ConnectionId;
use crate::{traits::IdProvider, error::StringError};
use crate::server::LOG_TARGET;
use crate::{error::StringError, traits::IdProvider};
use jsonrpsee_types::SubscriptionPayload;
use jsonrpsee_types::{
response::SubscriptionError, ErrorObjectOwned, Id, ResponsePayload, SubscriptionId, SubscriptionResponse,
};
use jsonrpsee_types::{response::SubscriptionError, ErrorObjectOwned, Id, SubscriptionId, SubscriptionResponse};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -273,7 +271,7 @@ impl PendingSubscriptionSink {
pub async fn accept(self) -> Result<SubscriptionSink, PendingSubscriptionAcceptError> {
let response = MethodResponse::subscription_response(
self.id,
ResponsePayload::result_borrowed(&self.uniq_sub.sub_id),
ResponsePayloadV2::result_borrowed(&self.uniq_sub.sub_id),
self.inner.max_response_size() as usize,
);
let success = response.is_success();
Expand Down
84 changes: 84 additions & 0 deletions examples/examples/ws_notify_on_method_answered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe copyright years need to change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's take of it in another PR

//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::Server;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{response_channel, rpc_params, ResponsePayloadV2};

#[rpc(server, namespace = "state")]
pub trait Rpc {
/// Async method call example.
#[method(name = "getKeys")]
fn storage_keys(&self) -> ResponsePayloadV2<'static, String>;
}

pub struct RpcServerImpl;

impl RpcServer for RpcServerImpl {
fn storage_keys(&self) -> ResponsePayloadV2<'static, String> {
let (tx, rx) = response_channel();

tokio::spawn(async move {
rx.is_sent().await.unwrap();
println!("Method response to `state_getKeys` finished");
});

ResponsePayloadV2::result("ehheeheh".to_string()).notify_on_success(tx)
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let server_addr = run_server().await?;
let url = format!("ws://{}", server_addr);

let client = WsClientBuilder::default().build(&url).await?;
assert_eq!("ehheeheh".to_string(), client.request::<String, _>("state_getKeys", rpc_params![]).await.unwrap());

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = Server::builder().build("127.0.0.1:0").await?;

let addr = server.local_addr()?;
let handle = server.start(RpcServerImpl.into_rpc());

// In this example we don't care about doing shutdown so let's it run forever.
// You may use the `ServerHandle` to shut it down or manage it yourself.
tokio::spawn(handle.stopped());

Ok(addr)
}
Loading
Loading