Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

collator-protocol: short-term fixes for connectivity #4640

Merged
merged 9 commits into from
Jan 4, 2022
114 changes: 68 additions & 46 deletions node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,14 @@ impl State {

/// Distribute a collation.
///
/// Figure out the core our para is assigned to and the relevant validators.
/// Issue a connection request to these validators.
/// If the para is not scheduled or next up on any core, at the relay-parent,
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
/// If the para is not scheduled on any core, at the relay parent,
/// or the relay parent isn't in our view or we already collated on the relay parent,
/// we ignore the message as it must be invalid in that case -
/// although this indicates a logic error elsewhere in the node.
///
/// Otherwise, start advertising the collation to interested peers.
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
Expand Down Expand Up @@ -358,32 +357,8 @@ where
return Ok(())
}

// Determine which core the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => {
tracing::warn!(
target: LOG_TARGET,
para_id = %id,
?relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);

return Ok(())
},
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);
if !state.our_validators_groups.contains_key(&relay_parent) {
tracing::warn!(target: LOG_TARGET, "There are no validators assigned to the core.",);

return Ok(())
}
Expand All @@ -394,16 +369,9 @@ where
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
"Accepted collation, connecting to validators."
"Accepted collation",
);

// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;

state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());

if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
}
Expand Down Expand Up @@ -522,7 +490,7 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
// ignore address resolution failure
// will reissue a new request on new collation
// will reissue a new request on new relay parent
let (failed, _) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
Expand Down Expand Up @@ -633,8 +601,7 @@ where
);
},
Some(id) => {
distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender)
.await?;
distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
},
None => {
tracing::warn!(
Expand Down Expand Up @@ -919,7 +886,7 @@ where
},
OurViewChange(view) => {
tracing::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(state, view).await?;
handle_our_view_change(ctx, runtime, state, view).await?;
},
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
Expand All @@ -933,7 +900,16 @@ where
}

/// Handles our view changes.
async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
async fn handle_our_view_change<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
view: OurView,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
for removed in state.view.difference(&view) {
tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");

Expand Down Expand Up @@ -966,8 +942,54 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
state.waiting_collation_fetches.remove(removed);
}

let new_leaves: Vec<_> = view.difference(&state.view).cloned().collect();
state.view = view;

let id = match state.collating_on {
Some(id) => id,
None => return Ok(()),
};

for relay_parent in new_leaves {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?id,
"Processing new relay parent.",
);

// Determine our assigned core.
// If it is not scheduled then ignore the relay parent.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => continue,
};

// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;

let validators = current_validators.validators;
let no_one_is_assigned = validators.is_empty();

if no_one_is_assigned {
ordian marked this conversation as resolved.
Show resolved Hide resolved
continue
}

tracing::debug!(
target: LOG_TARGET,
?relay_parent,
?validators,
para_id = ?id,
"Connecting to validators.",
);

// Add the current validator group to the reserved peers
connect_to_validators(ctx, validators).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might cause issues. new_leaves is an Option at the moment, but that does not really fix the issue: We are not only collating on the very latest leaf. If we get a new leaves update for forks, we will only be able to collate with validators on the fork which had the most recent leaf update (Assuming other forks have different block numbers and thus might have other validators assigned.)

We could collect validators of all currently active leaves. Which in turn means we would stay connected to validators on stale forks, until it's block height gets finalized - so it is not perfect either.

What can happen with this implementation is, that collators will not be able to collate on the "right" fork at rotation boundaries sometimes. Considering that rotation boundaries are tough anyways as of now, this should actually be fine for the interim solution this is. ... so disregard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid concern which I also have considered, but as you pointed out this is an interim solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_leaves is an Option at the moment, but that does not really fix the issue: We are not only collating on the very latest leaf

note that we're using OurView instead of ActiveLeavesUpdate, so new_leaves can be a few leaves

Copy link
Member Author

@ordian ordian Dec 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
}

Ok(())
}

Expand Down
110 changes: 54 additions & 56 deletions node/network/collator-protocol/src/collator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sp_core::crypto::Pair;
use sp_keyring::Sr25519Keyring;
use sp_runtime::traits::AppVerify;

use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view};
use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view, OurView};
use polkadot_node_primitives::BlockData;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::{
Expand Down Expand Up @@ -172,13 +172,7 @@ impl TestState {
our_view![self.relay_parent]
};

overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
our_view,
)),
)
.await;
set_our_view(virtual_overseer, &self, our_view).await;
}
}

Expand Down Expand Up @@ -278,44 +272,19 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
)
.await;

overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
our_view![test_state.relay_parent],
)),
)
.await;
set_our_view(virtual_overseer, test_state, our_view![test_state.relay_parent]).await;
}

/// Result of [`distribute_collation`]
struct DistributeCollation {
candidate: CandidateReceipt,
pov_block: PoV,
}

/// Create some PoV and distribute it.
async fn distribute_collation(
/// Check our view change triggers the right messages
/// assuming our view contains `test_state.relay_parent` as the only new relay parent.
async fn set_our_view(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
// whether or not we expect a connection request or not.
should_connect: bool,
) -> DistributeCollation {
// Now we want to distribute a `PoVBlock`
let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) };

let pov_hash = pov_block.hash();

let candidate = TestCandidateBuilder {
para_id: test_state.para_id,
relay_parent: test_state.relay_parent,
pov_hash,
..Default::default()
}
.build();

our_view: OurView,
) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None),
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(our_view)),
)
.await;

Expand Down Expand Up @@ -369,16 +338,45 @@ async fn distribute_collation(
}
}

if should_connect {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
..
}
) => {}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
..
}
) => {}
);
}

/// Result of [`distribute_collation`]
struct DistributeCollation {
candidate: CandidateReceipt,
pov_block: PoV,
}

/// Create some PoV and distribute it.
async fn distribute_collation(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
) -> DistributeCollation {
// Now we want to distribute a `PoVBlock`
let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) };

let pov_hash = pov_block.hash();

let candidate = TestCandidateBuilder {
para_id: test_state.para_id,
relay_parent: test_state.relay_parent,
pov_hash,
..Default::default()
}
.build();

overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None),
)
.await;

DistributeCollation { candidate, pov_block }
}
Expand Down Expand Up @@ -508,7 +506,7 @@ fn advertise_and_send_collation() {
setup_system(&mut virtual_overseer, &test_state).await;

let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
distribute_collation(&mut virtual_overseer, &test_state).await;

for (val, peer) in test_state
.current_group_validator_authority_ids()
Expand Down Expand Up @@ -625,7 +623,7 @@ fn advertise_and_send_collation() {

assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());

distribute_collation(&mut virtual_overseer, &test_state, true).await;
distribute_collation(&mut virtual_overseer, &test_state).await;

// Send info about peer's view.
overseer_send(
Expand Down Expand Up @@ -713,7 +711,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() {
// And let it tell us that it is has the same view.
send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await;

distribute_collation(virtual_overseer, &test_state, true).await;
distribute_collation(virtual_overseer, &test_state).await;

expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent).await;

Expand Down Expand Up @@ -752,14 +750,14 @@ fn collate_on_two_different_relay_chain_blocks() {
expect_declare_msg(virtual_overseer, &test_state, &peer).await;
expect_declare_msg(virtual_overseer, &test_state, &peer2).await;

distribute_collation(virtual_overseer, &test_state, true).await;
distribute_collation(virtual_overseer, &test_state).await;

let old_relay_parent = test_state.relay_parent;

// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
test_state.advance_to_new_round(virtual_overseer, true).await;

distribute_collation(virtual_overseer, &test_state, true).await;
distribute_collation(virtual_overseer, &test_state).await;

send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await;
expect_advertise_collation_msg(virtual_overseer, &peer, old_relay_parent).await;
Expand Down Expand Up @@ -789,7 +787,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() {
connect_peer(virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
expect_declare_msg(virtual_overseer, &test_state, &peer).await;

distribute_collation(virtual_overseer, &test_state, true).await;
distribute_collation(virtual_overseer, &test_state).await;

send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent).await;
Expand Down Expand Up @@ -874,7 +872,7 @@ where
setup_system(virtual_overseer, &test_state).await;

let DistributeCollation { candidate, pov_block } =
distribute_collation(virtual_overseer, &test_state, true).await;
distribute_collation(virtual_overseer, &test_state).await;

for (val, peer) in test_state
.current_group_validator_authority_ids()
Expand Down
2 changes: 1 addition & 1 deletion node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct CollatorEvictionPolicy {
impl Default for CollatorEvictionPolicy {
fn default() -> Self {
CollatorEvictionPolicy {
inactive_collator: Duration::from_secs(24),
inactive_collator: Duration::from_secs(5),
undeclared: Duration::from_secs(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
Expand Down
2 changes: 1 addition & 1 deletion node/network/protocol/src/peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl PeerSet {
max_notification_size,
set_config: SetConfig {
// Non-authority nodes don't need to accept incoming connections on this peer set:
in_peers: if is_authority == IsAuthority::Yes { 25 } else { 0 },
in_peers: if is_authority == IsAuthority::Yes { 100 } else { 0 },
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: if is_authority == IsAuthority::Yes {
Expand Down