Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Sync: validate block responses for required data #5052

Merged
merged 6 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ mod rep {
pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
/// Peer role does not match (e.g. light peer connecting to another light peer).
pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role");
/// Peer response data does not have requested bits.
pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}

// Lock must always be taken in order declared here.
Expand Down Expand Up @@ -701,12 +703,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
peer: PeerId,
request: message::BlockRequest<B>
) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}",
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
request.id,
peer,
request.from,
request.to,
request.max);
request.max,
request.fields,
);

// sending block requests to the node that is unable to serve it is considered a bad behavior
if !self.config.roles.is_full() {
Expand Down Expand Up @@ -754,6 +758,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
message_queue: None,
justification,
};
// Stop if we don't have requested block body
if get_body && block_data.body.is_none() {
trace!(target: "sync", "Missing data for block request.");
break;
}
blocks.push(block_data);
match request.direction {
message::Direction::Ascending => id = BlockId::Number(number + One::one()),
Expand Down Expand Up @@ -784,7 +793,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
) -> CustomMessageOutcome<B> {
let blocks_range = match (
let blocks_range = || match (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an optimization not to format strings when tracing is disabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, didn't github folded the trace! call. Makes sense :)

response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
Expand All @@ -796,7 +805,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
response.id,
peer,
response.blocks.len(),
blocks_range
blocks_range(),
);

if request.fields == message::BlockAttributes::JUSTIFICATION {
Expand All @@ -811,6 +820,20 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
} else {
// Validate fields against the request.
if request.fields.contains(message::BlockAttributes::HEADER) && response.blocks.iter().any(|b| b.header.is_none()) {
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
trace!(target: "sync", "Missing header for a block");
return CustomMessageOutcome::None
}
if request.fields.contains(message::BlockAttributes::BODY) && response.blocks.iter().any(|b| b.body.is_none()) {
self.behaviour.disconnect_peer(&peer);
arkpar marked this conversation as resolved.
Show resolved Hide resolved
self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
trace!(target: "sync", "Missing body for a block");
return CustomMessageOutcome::None
}

match self.sync.on_block_data(peer, Some(request), response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ impl<B: BlockT> ChainSync<B> {
| PeerSyncState::DownloadingFinalityProof(..) => Vec::new()
}
} else {
// When request.is_none() just accept blocks
// When request.is_none() this is a block announcement. Just accept blocks.
blocks.into_iter().map(|b| {
IncomingBlock {
hash: b.hash,
Expand Down
71 changes: 57 additions & 14 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl<D> Peer<D> {
where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block
{
let best_hash = self.client.info().best_hash;
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block)
self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block, false)
}

/// Add blocks to the peer -- edit the block before adding. The chain will
Expand All @@ -247,7 +247,8 @@ impl<D> Peer<D> {
at: BlockId<Block>,
count: usize,
origin: BlockOrigin,
mut edit_block: F
mut edit_block: F,
headers_only: bool,
) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>) -> Block {
let full_client = self.client.as_full()
.expect("blocks could only be generated by full clients");
Expand All @@ -272,7 +273,7 @@ impl<D> Peer<D> {
origin,
header.clone(),
None,
Some(block.extrinsics)
if headers_only { None } else { Some(block.extrinsics) },
).unwrap();
let cache = if let Some(cache) = cache {
cache.into_iter().collect()
Expand All @@ -294,28 +295,46 @@ impl<D> Peer<D> {
self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx)
}

/// Push blocks to the peer (simplified: with or without a TX)
pub fn push_headers(&mut self, count: usize) -> H256 {
let best_hash = self.client.info().best_hash;
self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true)
}

/// Push blocks to the peer (simplified: with or without a TX) starting from
/// given hash.
pub fn push_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 {
self.generate_tx_blocks_at(at, count, with_tx, false)
}

/// Push blocks/headers to the peer (simplified: with or without a TX) starting from
/// given hash.
fn generate_tx_blocks_at(&mut self, at: BlockId<Block>, count: usize, with_tx: bool, headers_only:bool) -> H256 {
let mut nonce = 0;
if with_tx {
self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| {
let transfer = Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Alice.into(),
amount: 1,
nonce,
};
builder.push(transfer.into_signed_tx()).unwrap();
nonce = nonce + 1;
builder.build().unwrap().block
})
self.generate_blocks_at(
at,
count,
BlockOrigin::File, |mut builder| {
let transfer = Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Alice.into(),
amount: 1,
nonce,
};
builder.push(transfer.into_signed_tx()).unwrap();
nonce = nonce + 1;
builder.build().unwrap().block
},
headers_only
)
} else {
self.generate_blocks_at(
at,
count,
BlockOrigin::File,
|builder| builder.build().unwrap().block,
headers_only,
)
}
}
Expand Down Expand Up @@ -748,13 +767,37 @@ pub trait TestNetFactory: Sized {
Async::Ready(())
}

/// Polls the testnet until theres' no activiy of any kind.
///
/// Must be executed in a task context.
fn poll_until_idle(&mut self) -> Async<()> {
self.poll();

for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Async::NotReady
}
if peer.network.num_sync_requests() != 0 {
return Async::NotReady
}
}
Async::Ready(())
}

/// Blocks the current thread until we are sync'ed.
///
/// Calls `poll_until_sync` repeatedly with the runtime passed as parameter.
fn block_until_sync(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap();
}

/// Blocks the current thread until there are no pending packets.
///
/// Calls `poll_until_idle` repeatedly with the runtime passed as parameter.
fn block_until_idle(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_idle()))).unwrap();
}

/// Polls the testnet. Processes all the pending actions and returns `NotReady`.
fn poll(&mut self) {
self.mut_peers(|peers| {
Expand Down
21 changes: 21 additions & 0 deletions client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,24 @@ fn does_not_sync_announced_old_best_block() {
})).unwrap();
assert!(!net.peer(1).is_major_syncing());
}

#[test]
fn full_sync_requires_block_body() {
// Check that we don't sync headers-only in full mode.
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);

net.peer(0).push_headers(1);
// Wait for nodes to connect
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
net.block_until_idle(&mut runtime);
assert_eq!(net.peer(1).client.info().best_number, 0);
}
6 changes: 3 additions & 3 deletions client/state-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
{
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db", "Pinned block: {:?}", hash);
trace!(target: "state-db-pin", "Pinned block: {:?}", hash);
self.non_canonical.pin(hash);
}
*refs += 1;
Expand All @@ -357,11 +357,11 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> {
Entry::Occupied(mut entry) => {
*entry.get_mut() -= 1;
if *entry.get() == 0 {
trace!(target: "state-db", "Unpinned block: {:?}", hash);
trace!(target: "state-db-pin", "Unpinned block: {:?}", hash);
entry.remove();
self.non_canonical.unpin(hash);
} else {
trace!(target: "state-db", "Releasing reference for {:?}", hash);
trace!(target: "state-db-pin", "Releasing reference for {:?}", hash);
}
},
Entry::Vacant(_) => {},
Expand Down
4 changes: 2 additions & 2 deletions client/state-db/src/noncanonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
while let Some(hash) = parent {
let refs = self.pinned.entry(hash.clone()).or_default();
if *refs == 0 {
trace!(target: "state-db", "Pinned non-canon block: {:?}", hash);
trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash);
}
*refs += 1;
parent = self.parents.get(hash);
Expand All @@ -455,7 +455,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
if *entry.get() == 0 {
entry.remove();
if let Some(inserted) = self.pinned_insertions.remove(&hash) {
trace!(target: "state-db", "Discarding unpinned non-canon block: {:?}", hash);
trace!(target: "state-db-pin", "Discarding unpinned non-canon block: {:?}", hash);
discard_values(&mut self.values, inserted);
self.parents.remove(&hash);
}
Expand Down