Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Implement bitswap in the network behaviour using libp2p_bitswap. #6795

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2b67bbb
WIP, good start
expenses Jul 30, 2020
d5d3130
Add networking tests
expenses Aug 2, 2020
5bd7cb7
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 2, 2020
6fa276d
Reindent bitswap tests using tabs
expenses Aug 3, 2020
926575f
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 3, 2020
471b171
Use OffchainStorage instead of AuxStore
expenses Aug 3, 2020
83bc9c6
Apply suggestions
expenses Aug 3, 2020
be7baf5
Convert spaces to tabs
expenses Aug 3, 2020
6147c6a
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 3, 2020
f90a39c
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 4, 2020
2496778
Implement IPLD storage traits
expenses Aug 5, 2020
3244af9
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 5, 2020
8d028d4
Documentation
expenses Aug 5, 2020
ae5fd43
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 6, 2020
ccb5910
Fix line widths
expenses Aug 6, 2020
3da54fa
Fix browser build
expenses Aug 6, 2020
6f3d206
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 6, 2020
e7ffe2a
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 10, 2020
8e4c008
Insert bitswap blocks into the dht, rework to put bitswap methods on …
expenses Aug 10, 2020
435fa52
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 10, 2020
d61bddc
Merge remote-tracking branch 'origin/master' into HEAD
expenses Aug 31, 2020
b8804df
Fix wasm build, but via patch
expenses Aug 31, 2020
f80e0ad
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 8, 2020
4b954d2
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 14, 2020
83de229
Remove storage impl and update
expenses Sep 18, 2020
a8a0f8b
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 18, 2020
1f9c171
Fix wasm build
expenses Sep 18, 2020
9923b11
Remove stuff
expenses Sep 18, 2020
30fd353
Remove bitswap network tests
expenses Sep 18, 2020
38f4667
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 21, 2020
8187024
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 21, 2020
5de7605
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 22, 2020
da1c738
Remove changes that slipped in accidentally
expenses Sep 22, 2020
b8f0981
Improve documentation somewhat
expenses Sep 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 302 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl<B: BlockT> Future for GossipEngine<B> {

this.forwarding_state = ForwardingState::Busy(to_forward.into());
},
Event::Dht(_) => {}
Event::Dht(_) => {},
Event::Bitswap(_) => {},
}
// The network event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => return Poll::Ready(()),
Expand Down
2 changes: 2 additions & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ unsigned-varint = { version = "0.4.0", features = ["futures", "futures-codec"] }
void = "1.0.2"
wasm-timer = "0.2"
zeroize = "1.0.0"
libp2p-bitswap = "0.7.0"
tiny-cid = "0.2.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alphabetical ordering, please!


[dependencies.libp2p]
version = "0.28.1"
Expand Down
34 changes: 34 additions & 0 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Exchanges blocks of data with other nodes.
pub(crate) bitswap: libp2p_bitswap::Bitswap,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if you added methods to Behaviour rather than exposing implementation details.

/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Block request handling.
Expand Down Expand Up @@ -172,6 +174,18 @@ pub enum BehaviourOut<B: BlockT> {
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
/// request duration.
Dht(DhtEvent, Duration),

/// Event generated by bitswap.
Bitswap(BitswapEvent)
}

/// An event generated by bitswap.
#[derive(Clone, Debug)]
pub enum BitswapEvent {
/// A block was received.
ReceivedBlock(PeerId, tiny_cid::Cid, Box<[u8]>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be called multiple times with the same block? What's the relationship between the Cid and the data? Is it guaranteed to be in response to a WANT request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be called multiple times with the same block?

no, unless the events weren't polled until the block arrived and a new want request was submitted.

What's the relationship between the Cid and the data

the cid hash matches the data, the cid is the one submitted in the want request

Is it guaranteed to be in response to a WANT request?

it's guaranteed to be in response to a want request.

/// A WANT request was received.
ReceivedWant(PeerId, tiny_cid::Cid, i32),
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
Expand All @@ -197,6 +211,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
finality_proof_requests,
light_client_handler,
events: VecDeque::new(),
bitswap: libp2p_bitswap::Bitswap::new(),
role,
})
}
Expand Down Expand Up @@ -537,6 +552,25 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<libp2p_bitswap::BitswapEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: libp2p_bitswap::BitswapEvent) {
match event {
libp2p_bitswap::BitswapEvent::ReceivedBlock(peer_id, cid, data) => {
self.events.push_back(
BehaviourOut::Bitswap(BitswapEvent::ReceivedBlock(peer_id, cid, data))
);
},
libp2p_bitswap::BitswapEvent::ReceivedWant(peer_id, cid, priority) => {
self.events.push_back(
BehaviourOut::Bitswap(BitswapEvent::ReceivedWant(peer_id, cid, priority))
);
}
libp2p_bitswap::BitswapEvent::ReceivedCancel(..) => {},
}
}
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
if let Some(event) = self.events.pop_front() {
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<M> QueuedSender<M> {
protocol: ConsensusEngineId,
queue_size_limit: usize,
messages_encode: F
) -> (Self, impl Future<Output = ()> + Send + 'static)
) -> (Self, impl Future<Output = ()> + 'static)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed for this PR?
It's not a negative change, but please don't introduce changes that are unrelated to the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where
M: Send + 'static,
B: BlockT + 'static,
Expand Down
1 change: 1 addition & 0 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub mod network_state;

#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use behaviour::BitswapEvent;
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo};
pub use service::{
NetworkService, NetworkWorker, RequestFailure, OutboundFailure, NotificationSender,
Expand Down
3 changes: 3 additions & 0 deletions client/network/src/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bytes::Bytes;
use libp2p::core::PeerId;
use libp2p::kad::record::Key;
use sp_runtime::ConsensusEngineId;
use crate::behaviour::BitswapEvent;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use crate::behaviour::BitswapEvent;
pub use crate::behaviour::BitswapEvent;


/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand All @@ -45,6 +46,8 @@ pub enum DhtEvent {
pub enum Event {
/// Event generated by a DHT.
Dht(DhtEvent),
/// Event generated by Bitswap.
Bitswap(BitswapEvent),

/// Opened a substream with the given node with the given notifications protocol.
///
Expand Down
70 changes: 64 additions & 6 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,21 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.service.add_reserved_peer(peer)
}

/// Get the number of bitswap peers we are connected to.
pub fn bitswap_num_peers(&self) -> usize {
self.network_service.bitswap.peers().count()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a "bitswap peer" and what's a "non-bitswap peer"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these are open connections to other peers. It will send want requests to any peer that connects that speaks bitswap. For each open connection it maintains a list of blocks the other peer wants. If in the future the block is available it sends it to the peer. This architecture is because when I first wrote bitswap it seemed like you could have a strategy that gets a block from a friendly peer to get a block from a new peer. Now I think it should probably just ignore the want requests of blocks it doesn't have.


/// Get the number of bitswap peers who want a block.
pub fn bitswap_num_peers_want(&self, cid: &tiny_cid::Cid) -> usize {
self.network_service.bitswap.peers_want(cid).count()
}

/// Get if a specific bitswap peer wants a block.
pub fn bitswap_peer_wants_cid(&self, peer_id: &PeerId, cid: &tiny_cid::Cid) -> bool {
self.network_service.bitswap.peers_want(cid).find(|id| **id == *peer_id).is_some()
}
}

impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
Expand Down Expand Up @@ -1027,6 +1042,34 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
.to_worker
.unbounded_send(ServiceToWorkerMsg::OwnBlockImported(hash, number));
}

/// Send a bitswap block to a peer.
pub fn bitswap_send_block(&self, peer_id: PeerId, cid: tiny_cid::Cid, data: Box<[u8]>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapSendBlock(peer_id, cid, data));
}

/// Send a bitswap block to all peers that have the block in their wantlist.
pub fn bitswap_send_block_all(&self, cid: tiny_cid::Cid, data: Box<[u8]>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapSendBlockAll(cid, data));
}

/// Send a bitswap WANT request to all peers for a block.
pub fn bitswap_want_block(&self, cid: tiny_cid::Cid, priority: i32) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapWantBlock(cid, priority));
}

/// Cancel a bitswap WANT request.
pub fn bitswap_cancel_block(&self, cid: tiny_cid::Cid) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapCancelBlock(cid));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be documented.
I don't know how bitswap works, and that's great because it should in my opinion be possible for someone who doesn't know how bitswap works to understand this API.

What's the general workflow of the thing? What happens if send_block is called on a peer that doesn't want this block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bitswap_send_block is usually in response to a want event. the block is added to a queue and if a cancel request is received before the message is sent it will be removed from the queue. reading the code, if the cancel request comes in before the want request was processed, the cancel request will be ignored. I guess it could use some work, but it's not "terrible"

}

impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle
Expand Down Expand Up @@ -1159,6 +1202,10 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
DisconnectPeer(PeerId),
UpdateChain,
OwnBlockImported(B::Hash, NumberFor<B>),
BitswapSendBlock(PeerId, tiny_cid::Cid, Box<[u8]>),
BitswapSendBlockAll(tiny_cid::Cid, Box<[u8]>),
BitswapWantBlock(tiny_cid::Cid, i32),
BitswapCancelBlock(tiny_cid::Cid),
}

/// Main network worker. Must be polled in order for the network to advance.
Expand Down Expand Up @@ -1301,6 +1348,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().update_chain(),
ServiceToWorkerMsg::OwnBlockImported(hash, number) =>
this.network_service.user_protocol_mut().own_block_imported(hash, number),
ServiceToWorkerMsg::BitswapSendBlock(peer_id, cid, block) =>
this.network_service.bitswap.send_block(&peer_id, cid, block),
ServiceToWorkerMsg::BitswapSendBlockAll(cid, block) =>
this.network_service.bitswap.send_block_all(&cid, &block),
ServiceToWorkerMsg::BitswapWantBlock(cid, priority) =>
this.network_service.bitswap.want_block(cid, priority),
ServiceToWorkerMsg::BitswapCancelBlock(cid) =>
this.network_service.bitswap.cancel_block(&cid),
}
}

Expand Down Expand Up @@ -1506,6 +1561,9 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {

this.event_streams.send(Event::Dht(event));
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Bitswap(ev))) => {
this.event_streams.send(Event::Bitswap(ev));
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);

Expand All @@ -1531,14 +1589,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout))))))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError))))))))))) => "force-closed",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged)))))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
Expand Down
10 changes: 10 additions & 0 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ impl Metrics {
self.events_total
.with_label_values(&["dht", "sent", name])
.inc_by(num);
},
Event::Bitswap(_) => {
self.events_total
.with_label_values(&["bitswap", "sent", name])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
Expand Down Expand Up @@ -258,6 +263,11 @@ impl Metrics {
.with_label_values(&["dht", "received", name])
.inc();
}
Event::Bitswap(_) => {
self.events_total
.with_label_values(&["bitswap", "received", name])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name])
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ fn notifications_state_consistent() {
// Add new events here.
future::Either::Left(Event::Dht(_)) => {}
future::Either::Right(Event::Dht(_)) => {}
future::Either::Left(Event::Bitswap(_)) => {}
future::Either::Right(Event::Bitswap(_)) => {}
};
}
});
Expand Down
2 changes: 1 addition & 1 deletion client/rpc-api/src/system/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl From<Error> for rpc::Error {
data: serde_json::to_value(h).ok(),
},
Error::MalformattedPeerArg(ref e) => rpc::Error {
code :rpc::ErrorCode::ServerError(BASE_ERROR + 2),
code: rpc::ErrorCode::ServerError(BASE_ERROR + 2),
message: e.clone(),
data: None,
}
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, Transact
async fn build_network_future<
B: BlockT,
C: BlockchainEvents<B>,
H: sc_network::ExHashT
H: sc_network::ExHashT,
> (
role: Role,
mut network: sc_network::NetworkWorker<B, H>,
Expand Down
2 changes: 1 addition & 1 deletion primitives/core/src/offchain/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::iter::Iterator;
/// In-memory storage for offchain workers.
#[derive(Debug, Clone, Default)]
pub struct InMemOffchainStorage {
storage: HashMap<Vec<u8>, Vec<u8>>,
pub(crate) storage: HashMap<Vec<u8>, Vec<u8>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't my code, but similarly I don't think it's great software design.

}

impl InMemOffchainStorage {
Expand Down
5 changes: 5 additions & 0 deletions primitives/core/src/offchain/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ impl TestPersistentOffchainDB {
}
}
}

/// Get whether the DB is empty.
pub fn is_empty(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see how this is relevant for the rest of the PR.

self.persistent.read().storage.is_empty()
}
}

impl OffchainStorage for TestPersistentOffchainDB {
Expand Down