diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index dd8b5e6c28ee4..d344321e68dd0 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -131,6 +131,8 @@ mod rep { pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol"); /// Peer role does not match (e.g. light peer connecting to another light peer). pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role"); + /// Peer response data does not have requested bits. + pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response"); } // Lock must always be taken in order declared here. @@ -701,12 +703,14 @@ impl Protocol { peer: PeerId, request: message::BlockRequest ) { - trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", + trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}", request.id, peer, request.from, request.to, - request.max); + request.max, + request.fields, + ); // sending block requests to the node that is unable to serve it is considered a bad behavior if !self.config.roles.is_full() { @@ -754,6 +758,11 @@ impl Protocol { message_queue: None, justification, }; + // Stop if we don't have requested block body + if get_body && block_data.body.is_none() { + trace!(target: "sync", "Missing data for block request."); + break; + } blocks.push(block_data); match request.direction { message::Direction::Ascending => id = BlockId::Number(number + One::one()), @@ -784,7 +793,7 @@ impl Protocol { request: message::BlockRequest, response: message::BlockResponse, ) -> CustomMessageOutcome { - let blocks_range = match ( + let blocks_range = || match ( response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), ) { @@ -796,7 +805,7 @@ impl Protocol { response.id, peer, response.blocks.len(), - blocks_range + blocks_range(), ); if request.fields == message::BlockAttributes::JUSTIFICATION { @@ -811,6 +820,20 @@ impl Protocol { } } } else { + // Validate fields against the request. + if request.fields.contains(message::BlockAttributes::HEADER) && response.blocks.iter().any(|b| b.header.is_none()) { + self.behaviour.disconnect_peer(&peer); + self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE); + trace!(target: "sync", "Missing header for a block"); + return CustomMessageOutcome::None + } + if request.fields.contains(message::BlockAttributes::BODY) && response.blocks.iter().any(|b| b.body.is_none()) { + self.behaviour.disconnect_peer(&peer); + self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE); + trace!(target: "sync", "Missing body for a block"); + return CustomMessageOutcome::None + } + match self.sync.on_block_data(peer, Some(request), response) { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 0c5355ea21f90..b1cd89155effe 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -751,7 +751,7 @@ impl ChainSync { | PeerSyncState::DownloadingFinalityProof(..) => Vec::new() } } else { - // When request.is_none() just accept blocks + // When request.is_none() this is a block announcement. Just accept blocks. blocks.into_iter().map(|b| { IncomingBlock { hash: b.hash, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 982e2ff512396..d09897e853ec2 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -237,7 +237,7 @@ impl Peer { where F: FnMut(BlockBuilder) -> Block { let best_hash = self.client.info().best_hash; - self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block) + self.generate_blocks_at(BlockId::Hash(best_hash), count, origin, edit_block, false) } /// Add blocks to the peer -- edit the block before adding. The chain will @@ -247,7 +247,8 @@ impl Peer { at: BlockId, count: usize, origin: BlockOrigin, - mut edit_block: F + mut edit_block: F, + headers_only: bool, ) -> H256 where F: FnMut(BlockBuilder) -> Block { let full_client = self.client.as_full() .expect("blocks could only be generated by full clients"); @@ -272,7 +273,7 @@ impl Peer { origin, header.clone(), None, - Some(block.extrinsics) + if headers_only { None } else { Some(block.extrinsics) }, ).unwrap(); let cache = if let Some(cache) = cache { cache.into_iter().collect() @@ -294,28 +295,46 @@ impl Peer { self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } + /// Push blocks to the peer (simplified: with or without a TX) + pub fn push_headers(&mut self, count: usize) -> H256 { + let best_hash = self.client.info().best_hash; + self.generate_tx_blocks_at(BlockId::Hash(best_hash), count, false, true) + } + /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash. pub fn push_blocks_at(&mut self, at: BlockId, count: usize, with_tx: bool) -> H256 { + self.generate_tx_blocks_at(at, count, with_tx, false) + } + + /// Push blocks/headers to the peer (simplified: with or without a TX) starting from + /// given hash. + fn generate_tx_blocks_at(&mut self, at: BlockId, count: usize, with_tx: bool, headers_only:bool) -> H256 { let mut nonce = 0; if with_tx { - self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| { - let transfer = Transfer { - from: AccountKeyring::Alice.into(), - to: AccountKeyring::Alice.into(), - amount: 1, - nonce, - }; - builder.push(transfer.into_signed_tx()).unwrap(); - nonce = nonce + 1; - builder.build().unwrap().block - }) + self.generate_blocks_at( + at, + count, + BlockOrigin::File, |mut builder| { + let transfer = Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Alice.into(), + amount: 1, + nonce, + }; + builder.push(transfer.into_signed_tx()).unwrap(); + nonce = nonce + 1; + builder.build().unwrap().block + }, + headers_only + ) } else { self.generate_blocks_at( at, count, BlockOrigin::File, |builder| builder.build().unwrap().block, + headers_only, ) } } @@ -748,6 +767,23 @@ pub trait TestNetFactory: Sized { Async::Ready(()) } + /// Polls the testnet until theres' no activiy of any kind. + /// + /// Must be executed in a task context. + fn poll_until_idle(&mut self) -> Async<()> { + self.poll(); + + for peer in self.peers().iter() { + if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { + return Async::NotReady + } + if peer.network.num_sync_requests() != 0 { + return Async::NotReady + } + } + Async::Ready(()) + } + /// Blocks the current thread until we are sync'ed. /// /// Calls `poll_until_sync` repeatedly with the runtime passed as parameter. @@ -755,6 +791,13 @@ pub trait TestNetFactory: Sized { runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap(); } + /// Blocks the current thread until there are no pending packets. + /// + /// Calls `poll_until_idle` repeatedly with the runtime passed as parameter. + fn block_until_idle(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) { + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_idle()))).unwrap(); + } + /// Polls the testnet. Processes all the pending actions and returns `NotReady`. fn poll(&mut self) { self.mut_peers(|peers| { diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 210a4fb38bb68..2094ddae60c60 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -660,3 +660,24 @@ fn does_not_sync_announced_old_best_block() { })).unwrap(); assert!(!net.peer(1).is_major_syncing()); } + +#[test] +fn full_sync_requires_block_body() { + // Check that we don't sync headers-only in full mode. + let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); + let mut net = TestNet::new(2); + + net.peer(0).push_headers(1); + // Wait for nodes to connect + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); + net.block_until_idle(&mut runtime); + assert_eq!(net.peer(1).client.info().best_number, 0); +} diff --git a/client/state-db/src/lib.rs b/client/state-db/src/lib.rs index 0eab640de84e8..f670e4f35f3ba 100644 --- a/client/state-db/src/lib.rs +++ b/client/state-db/src/lib.rs @@ -340,7 +340,7 @@ impl StateDbSync { { let refs = self.pinned.entry(hash.clone()).or_default(); if *refs == 0 { - trace!(target: "state-db", "Pinned block: {:?}", hash); + trace!(target: "state-db-pin", "Pinned block: {:?}", hash); self.non_canonical.pin(hash); } *refs += 1; @@ -357,11 +357,11 @@ impl StateDbSync { Entry::Occupied(mut entry) => { *entry.get_mut() -= 1; if *entry.get() == 0 { - trace!(target: "state-db", "Unpinned block: {:?}", hash); + trace!(target: "state-db-pin", "Unpinned block: {:?}", hash); entry.remove(); self.non_canonical.unpin(hash); } else { - trace!(target: "state-db", "Releasing reference for {:?}", hash); + trace!(target: "state-db-pin", "Releasing reference for {:?}", hash); } }, Entry::Vacant(_) => {}, diff --git a/client/state-db/src/noncanonical.rs b/client/state-db/src/noncanonical.rs index db2f58fa8981d..de7294d770a43 100644 --- a/client/state-db/src/noncanonical.rs +++ b/client/state-db/src/noncanonical.rs @@ -436,7 +436,7 @@ impl NonCanonicalOverlay { while let Some(hash) = parent { let refs = self.pinned.entry(hash.clone()).or_default(); if *refs == 0 { - trace!(target: "state-db", "Pinned non-canon block: {:?}", hash); + trace!(target: "state-db-pin", "Pinned non-canon block: {:?}", hash); } *refs += 1; parent = self.parents.get(hash); @@ -455,7 +455,7 @@ impl NonCanonicalOverlay { if *entry.get() == 0 { entry.remove(); if let Some(inserted) = self.pinned_insertions.remove(&hash) { - trace!(target: "state-db", "Discarding unpinned non-canon block: {:?}", hash); + trace!(target: "state-db-pin", "Discarding unpinned non-canon block: {:?}", hash); discard_values(&mut self.values, inserted); self.parents.remove(&hash); }