Skip to content

Commit

Permalink
Offchain-worker: Make it possible to disable http support (paritytech…
Browse files Browse the repository at this point in the history
…#10087)

* Offchain-worker: Make it possible to disable http support

If a chain doesn't require http support in its offchain workers, this pr enables them to disable the
http support.

* Switch to bitflags

* Use Capabilities

* Update client/offchain/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Fix test

* Update client/offchain/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
  • Loading branch information
2 people authored and grishasobol committed Mar 28, 2022
1 parent c1f4cd7 commit 82e9f15
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 114 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions client/api/src/execution_extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ impl<Block: traits::Block> ExecutionExtensions<Block> {

let mut extensions = self.extensions_factory.read().extensions_for(capabilities);

if capabilities.has(offchain::Capability::Keystore) {
if capabilities.contains(offchain::Capabilities::KEYSTORE) {
if let Some(ref keystore) = self.keystore {
extensions.register(KeystoreExt(keystore.clone()));
}
}

if capabilities.has(offchain::Capability::TransactionPool) {
if capabilities.contains(offchain::Capabilities::TRANSACTION_POOL) {
if let Some(pool) = self.transaction_pool.read().as_ref().and_then(|x| x.upgrade()) {
extensions
.register(TransactionPoolExt(
Expand All @@ -176,8 +176,8 @@ impl<Block: traits::Block> ExecutionExtensions<Block> {
}
}

if capabilities.has(offchain::Capability::OffchainDbRead) ||
capabilities.has(offchain::Capability::OffchainDbWrite)
if capabilities.contains(offchain::Capabilities::OFFCHAIN_DB_READ) ||
capabilities.contains(offchain::Capabilities::OFFCHAIN_DB_WRITE)
{
if let Some(offchain_db) = self.offchain_db.as_ref() {
extensions.register(OffchainDbExt::new(offchain::LimitedExternalities::new(
Expand Down Expand Up @@ -210,7 +210,7 @@ impl<Block: traits::Block> ExecutionExtensions<Block> {
ExecutionContext::BlockConstruction => self.strategies.block_construction.get_manager(),
ExecutionContext::Syncing => self.strategies.syncing.get_manager(),
ExecutionContext::Importing => self.strategies.importing.get_manager(),
ExecutionContext::OffchainCall(Some((_, capabilities))) if capabilities.has_all() =>
ExecutionContext::OffchainCall(Some((_, capabilities))) if capabilities.is_all() =>
self.strategies.offchain_worker.get_manager(),
ExecutionContext::OffchainCall(_) => self.strategies.other.get_manager(),
};
Expand Down
1 change: 1 addition & 0 deletions client/offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sc-utils = { version = "4.0.0-dev", path = "../utils" }
threadpool = "1.7"
hyper = "0.14.11"
hyper-rustls = "0.22.1"
once_cell = "1.8"

[dev-dependencies]
sc-client-db = { version = "0.10.0-dev", default-features = true, path = "../db" }
Expand Down
12 changes: 5 additions & 7 deletions client/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ impl AsyncApi {
pub fn new(
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
is_validator: bool,
shared_client: SharedClient,
shared_http_client: SharedClient,
) -> (Api, Self) {
let (http_api, http_worker) = http::http(shared_client);
let (http_api, http_worker) = http::http(shared_http_client);

let api = Api { network_provider, is_validator, http: http_api };

Expand All @@ -310,10 +310,8 @@ impl AsyncApi {
}

/// Run a processing task for the API
pub fn process(mut self) -> impl Future<Output = ()> {
let http = self.http.take().expect("Take invoked only once.");

http
pub fn process(self) -> impl Future<Output = ()> {
self.http.expect("`process` is only called once; qed")
}
}

Expand All @@ -328,7 +326,7 @@ mod tests {
time::SystemTime,
};

struct TestNetwork();
pub(super) struct TestNetwork();

impl NetworkProvider for TestNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
Expand Down
49 changes: 44 additions & 5 deletions client/offchain/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use futures::{channel::mpsc, future, prelude::*};
use hyper::{client, Body, Client as HyperClient};
use hyper_rustls::HttpsConnector;
use log::error;
use once_cell::sync::Lazy;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
use std::{
Expand All @@ -47,11 +48,13 @@ use std::{

/// Wrapper struct used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient(Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>);
pub struct SharedClient(Arc<Lazy<HyperClient<HttpsConnector<client::HttpConnector>, Body>>>);

impl SharedClient {
pub fn new() -> Self {
Self(Arc::new(HyperClient::builder().build(HttpsConnector::with_native_roots())))
Self(Arc::new(Lazy::new(|| {
HyperClient::builder().build(HttpsConnector::with_native_roots())
})))
}
}

Expand Down Expand Up @@ -567,7 +570,7 @@ pub struct HttpWorker {
/// Used to receive messages from the `HttpApi`.
from_api: TracingUnboundedReceiver<ApiToWorker>,
/// The engine that runs HTTP requests.
http_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
http_client: Arc<Lazy<HyperClient<HttpsConnector<client::HttpConnector>, Body>>>,
/// HTTP requests that are being worked on by the engine.
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
}
Expand Down Expand Up @@ -697,12 +700,15 @@ impl fmt::Debug for HttpWorkerRequest {

#[cfg(test)]
mod tests {
use super::{http, SharedClient};
use super::{
super::{tests::TestNetwork, AsyncApi},
*,
};
use crate::api::timestamp;
use core::convert::Infallible;
use futures::{future, StreamExt};
use lazy_static::lazy_static;
use sp_core::offchain::{Duration, HttpError, HttpRequestId, HttpRequestStatus};
use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus};

// Using lazy_static to avoid spawning lots of different SharedClients,
// as spawning a SharedClient is CPU-intensive and opens lots of fds.
Expand Down Expand Up @@ -1006,4 +1012,37 @@ mod tests {
}
}
}

#[test]
fn shared_http_client_is_only_initialized_on_access() {
let shared_client = SharedClient::new();

{
let mock = Arc::new(TestNetwork());
let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
api.timestamp();

futures::executor::block_on(async move {
assert!(futures::poll!(async_api.process()).is_pending());
});
}

// Check that the http client wasn't initialized, because it wasn't used.
assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err());

let shared_client = SharedClient::new();

{
let mock = Arc::new(TestNetwork());
let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
let id = api.http_request_start("lol", "nope", &[]).unwrap();
api.http_request_write_body(id, &[], None).unwrap();
futures::executor::block_on(async move {
assert!(futures::poll!(async_api.process()).is_pending());
});
}

// Check that the http client initialized, because it was used.
assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_ok());
}
}
43 changes: 32 additions & 11 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,40 @@ where
}
}

/// Options for [`OffchainWorkers`]
pub struct OffchainWorkerOptions {
/// Enable http requests from offchain workers?
///
/// If not enabled, any http request will panic.
pub enable_http_requests: bool,
}

/// An offchain workers manager.
pub struct OffchainWorkers<Client, Block: traits::Block> {
client: Arc<Client>,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
shared_client: api::SharedClient,
shared_http_client: api::SharedClient,
enable_http: bool,
}

impl<Client, Block: traits::Block> OffchainWorkers<Client, Block> {
/// Creates new `OffchainWorkers`.
/// Creates new [`OffchainWorkers`].
pub fn new(client: Arc<Client>) -> Self {
let shared_client = api::SharedClient::new();
Self::new_with_options(client, OffchainWorkerOptions { enable_http_requests: true })
}

/// Creates new [`OffchainWorkers`] using the given `options`.
pub fn new_with_options(client: Arc<Client>, options: OffchainWorkerOptions) -> Self {
Self {
client,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::with_name(
"offchain-worker".into(),
num_cpus::get(),
)),
shared_client,
shared_http_client: api::SharedClient::new(),
enable_http: options.enable_http_requests,
}
}
}
Expand Down Expand Up @@ -140,18 +154,22 @@ where
},
};
debug!("Checking offchain workers at {:?}: version:{}", at, version);
if version > 0 {
let process = (version > 0).then(|| {
let (api, runner) =
api::AsyncApi::new(network_provider, is_validator, self.shared_client.clone());
api::AsyncApi::new(network_provider, is_validator, self.shared_http_client.clone());
debug!("Spawning offchain workers at {:?}", at);
let header = header.clone();
let client = self.client.clone();

let mut capabilities = offchain::Capabilities::all();

capabilities.set(offchain::Capabilities::HTTP, self.enable_http);
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
let context =
ExecutionContext::OffchainCall(Some((api, offchain::Capabilities::all())));

let context = ExecutionContext::OffchainCall(Some((api, capabilities)));
let run = if version == 2 {
runtime.offchain_worker_with_context(&at, context, &header)
} else {
Expand All @@ -166,9 +184,12 @@ where
log::error!("Error running offchain workers at {:?}: {:?}", at, e);
}
});
futures::future::Either::Left(runner.process())
} else {
futures::future::Either::Right(futures::future::ready(()))

runner.process()
});

async move {
futures::future::OptionFuture::from(process).await;
}
}

Expand Down
1 change: 1 addition & 0 deletions primitives/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ parity-util-mem = { version = "0.10.0", default-features = false, features = [
futures = { version = "0.3.1", optional = true }
dyn-clonable = { version = "0.9.0", optional = true }
thiserror = { version = "1.0.21", optional = true }
bitflags = "1.3"

# full crypto
ed25519-dalek = { version = "1.0.1", default-features = false, features = [
Expand Down
12 changes: 5 additions & 7 deletions primitives/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,13 @@ impl ExecutionContext {
use ExecutionContext::*;

match self {
Importing | Syncing | BlockConstruction => offchain::Capabilities::none(),
Importing | Syncing | BlockConstruction => offchain::Capabilities::empty(),
// Enable keystore, transaction pool and Offchain DB reads by default for offchain
// calls.
OffchainCall(None) => [
offchain::Capability::Keystore,
offchain::Capability::OffchainDbRead,
offchain::Capability::TransactionPool,
][..]
.into(),
OffchainCall(None) =>
offchain::Capabilities::KEYSTORE |
offchain::Capabilities::OFFCHAIN_DB_READ |
offchain::Capabilities::TRANSACTION_POOL,
OffchainCall(Some((_, capabilities))) => *capabilities,
}
}
Expand Down
Loading

0 comments on commit 82e9f15

Please sign in to comment.