Skip to content

Commit

Permalink
Fix limit handling on rpc configuration (graphprotocol#4353)
Browse files Browse the repository at this point in the history
- Set 0 for unlimited
- Unset hosts can make no calls
- Limit != 0 does exactly what you'd think it does
  • Loading branch information
mangas authored Feb 15, 2023
1 parent bd90946 commit 00a40b4
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 49 deletions.
213 changes: 202 additions & 11 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, bail, Context};
use graph::cheap_clone::CheapClone;
use graph::firehose::SubgraphLimit;
use graph::prelude::rand::{self, seq::IteratorRandom};
use std::cmp::Ordering;
use std::collections::HashMap;
Expand All @@ -20,7 +21,7 @@ pub struct EthereumNetworkAdapter {
/// strong_count on `adapter` to determine whether the adapter is above
/// that limit. That's a somewhat imprecise but convenient way to
/// determine the number of connections
limit: usize,
limit: SubgraphLimit,
}

impl EthereumNetworkAdapter {
Expand Down Expand Up @@ -56,7 +57,11 @@ impl EthereumNetworkAdapters {
self.adapters
.iter()
.filter(move |adapter| Some(&adapter.capabilities) == cheapest_sufficient_capability)
.filter(|adapter| Arc::strong_count(&adapter.adapter) < adapter.limit)
.filter(|adapter| {
adapter
.limit
.has_capacity(Arc::strong_count(&adapter.adapter))
})
.map(|adapter| adapter.adapter.cheap_clone())
}

Expand Down Expand Up @@ -92,9 +97,12 @@ impl EthereumNetworkAdapters {
&self,
capabilities: Option<&NodeCapabilities>,
) -> anyhow::Result<Arc<EthereumAdapter>> {
match self.call_only_adapter()? {
Some(adapter) => Ok(adapter),
None => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities {
// call_only_adapter can fail if we're out of capcity, this is fine since
// we would want to fallback onto a full adapter
// so we will ignore this error and return whatever comes out of `cheapest_with`
match self.call_only_adapter() {
Ok(Some(adapter)) => Ok(adapter),
_ => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities {
// Archive is required for call_only
archive: true,
traces: false,
Expand All @@ -115,7 +123,10 @@ impl EthereumNetworkAdapters {

// TODO: This will probably blow up a lot sooner than [limit] amount of
// subgraphs, since we probably use a few instances.
if Arc::strong_count(&adapters.adapter) >= adapters.limit {
if !adapters
.limit
.has_capacity(Arc::strong_count(&adapters.adapter))
{
bail!("call only adapter has reached the concurrency limit");
}

Expand All @@ -142,7 +153,7 @@ impl EthereumNetworks {
name: String,
capabilities: NodeCapabilities,
adapter: Arc<EthereumAdapter>,
limit: usize,
limit: SubgraphLimit,
) {
let network_adapters = self
.networks
Expand Down Expand Up @@ -213,7 +224,7 @@ impl EthereumNetworks {
mod tests {
use std::sync::Arc;

use graph::{prelude::MetricsRegistry, tokio, url::Url};
use graph::{firehose::SubgraphLimit, prelude::MetricsRegistry, tokio, url::Url};
use graph_mock::MockMetricsRegistry;
use http::HeaderMap;

Expand Down Expand Up @@ -320,7 +331,7 @@ mod tests {
traces: false,
},
eth_call_adapter.clone(),
3,
SubgraphLimit::Limit(3),
);
ethereum_networks.insert(
chain.clone(),
Expand All @@ -329,7 +340,7 @@ mod tests {
traces: false,
},
eth_adapter.clone(),
3,
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
Expand Down Expand Up @@ -360,7 +371,10 @@ mod tests {
{
let adapter = adapters.call_or_cheapest(None).unwrap();
assert!(adapter.is_call_only());
assert!(adapters.call_or_cheapest(None).is_err());
assert_eq!(
adapters.call_or_cheapest(None).unwrap().is_call_only(),
false
);
}

// Check empty falls back to call only
Expand All @@ -375,4 +389,181 @@ mod tests {
assert_eq!(adapter.is_call_only(), false);
}
}

#[tokio::test]
async fn adapter_selector_unlimited() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_call_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
true,
)
.await,
);

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_call_adapter.clone(),
SubgraphLimit::Unlimited,
);
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_call_adapter), 2);
assert_eq!(Arc::strong_count(&eth_adapter), 2);

let keep: Vec<Arc<EthereumAdapter>> = vec![0; 10]
.iter()
.map(|_| adapters.call_or_cheapest(None).unwrap())
.collect();
assert_eq!(keep.iter().any(|a| !a.is_call_only()), false);
}

#[tokio::test]
async fn adapter_selector_disable_call_only_fallback() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_call_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
true,
)
.await,
);

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_call_adapter.clone(),
SubgraphLimit::Disabled,
);
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_call_adapter), 2);
assert_eq!(Arc::strong_count(&eth_adapter), 2);
assert_eq!(
adapters.call_or_cheapest(None).unwrap().is_call_only(),
false
);
}

#[tokio::test]
async fn adapter_selector_no_call_only_fallback() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
SubgraphLimit::Limit(3),
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_adapter), 2);
assert_eq!(
adapters.call_or_cheapest(None).unwrap().is_call_only(),
false
);
}
}
14 changes: 9 additions & 5 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,14 @@ approximate and can differ from the true number by a small amount
(generally less than 10)

The limit is set through rules that match on the node name. If a node's
name does not match any rule, the corresponding provider can be used for an
unlimited number of subgraphs. It is recommended that at least one provider
is generally unlimited. The limit is set in the following way:
name does not match any rule, the corresponding provider will be disabled
for that node.

If the match property is omitted then the provider will be unlimited on every
node.

It is recommended that at least one provider is generally unlimited.
The limit is set in the following way:

```toml
[chains.mainnet]
Expand All @@ -169,8 +174,7 @@ provider = [
Nodes named `some_node_.*` will use `mainnet-1` for at most 10 subgraphs,
and `mainnet-0` for everything else, nodes named `other_node_.*` will never
use `mainnet-1` and always `mainnet-0`. Any node whose name does not match
one of these patterns will use `mainnet-0` and `mainnet-1` for an unlimited
number of subgraphs.
one of these patterns will not be able to use and `mainnet-1`.

## Controlling Deployment

Expand Down
33 changes: 22 additions & 11 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,26 @@ pub struct FirehoseEndpoint {
pub token: Option<String>,
pub filters_enabled: bool,
pub compression_enabled: bool,
pub subgraph_limit: usize,
pub subgraph_limit: SubgraphLimit,
channel: Channel,
}

#[derive(Clone, Debug)]
// TODO: Find a new home for this type.
#[derive(Clone, Debug, PartialEq, Ord, Eq, PartialOrd)]
pub enum SubgraphLimit {
Unlimited,
Disabled,
Limit(usize),
NoTraffic,
Unlimited,
}

impl SubgraphLimit {
pub fn has_capacity(&self, current: usize) -> bool {
match self {
SubgraphLimit::Unlimited => true,
SubgraphLimit::Limit(limit) => limit > &current,
SubgraphLimit::Disabled => false,
}
}
}

impl Display for FirehoseEndpoint {
Expand Down Expand Up @@ -93,10 +104,10 @@ impl FirehoseEndpoint {

let subgraph_limit = match subgraph_limit {
// See the comment on the constant
SubgraphLimit::Unlimited => SUBGRAPHS_PER_CONN,
SubgraphLimit::Unlimited => SubgraphLimit::Limit(SUBGRAPHS_PER_CONN),
// This is checked when parsing from config but doesn't hurt to be defensive.
SubgraphLimit::Limit(limit) => limit.min(SUBGRAPHS_PER_CONN),
SubgraphLimit::NoTraffic => 0,
SubgraphLimit::Limit(limit) => SubgraphLimit::Limit(limit.min(SUBGRAPHS_PER_CONN)),
l => l,
};

FirehoseEndpoint {
Expand All @@ -109,11 +120,11 @@ impl FirehoseEndpoint {
}
}

// The SUBGRAPHS_PER_CONN upper bound was already limited so we leave it the same
// we need to use inclusive limits (<=) because there will always be a reference
// we need to -1 because there will always be a reference
// inside FirehoseEndpoints that is not used (is always cloned).
pub fn has_subgraph_capacity(self: &Arc<Self>) -> bool {
Arc::strong_count(&self) <= self.subgraph_limit
self.subgraph_limit
.has_capacity(Arc::strong_count(&self).checked_sub(1).unwrap_or(0))
}

pub async fn get_block<M>(
Expand Down Expand Up @@ -501,7 +512,7 @@ mod test {
None,
false,
false,
SubgraphLimit::NoTraffic,
SubgraphLimit::Disabled,
))];

let mut endpoints = FirehoseEndpoints::from(endpoint);
Expand Down
Loading

0 comments on commit 00a40b4

Please sign in to comment.