Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Implement bitswap in the network behaviour using libp2p_bitswap. #6795

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2b67bbb
WIP, good start
expenses Jul 30, 2020
d5d3130
Add networking tests
expenses Aug 2, 2020
5bd7cb7
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 2, 2020
6fa276d
Reindent bitswap tests using tabs
expenses Aug 3, 2020
926575f
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 3, 2020
471b171
Use OffchainStorage instead of AuxStore
expenses Aug 3, 2020
83bc9c6
Apply suggestions
expenses Aug 3, 2020
be7baf5
Convert spaces to tabs
expenses Aug 3, 2020
6147c6a
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 3, 2020
f90a39c
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 4, 2020
2496778
Implement IPLD storage traits
expenses Aug 5, 2020
3244af9
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 5, 2020
8d028d4
Documentation
expenses Aug 5, 2020
ae5fd43
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 6, 2020
ccb5910
Fix line widths
expenses Aug 6, 2020
3da54fa
Fix browser build
expenses Aug 6, 2020
6f3d206
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 6, 2020
e7ffe2a
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 10, 2020
8e4c008
Insert bitswap blocks into the dht, rework to put bitswap methods on …
expenses Aug 10, 2020
435fa52
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Aug 10, 2020
d61bddc
Merge remote-tracking branch 'origin/master' into HEAD
expenses Aug 31, 2020
b8804df
Fix wasm build, but via patch
expenses Aug 31, 2020
f80e0ad
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 8, 2020
4b954d2
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 14, 2020
83de229
Remove storage impl and update
expenses Sep 18, 2020
a8a0f8b
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 18, 2020
1f9c171
Fix wasm build
expenses Sep 18, 2020
9923b11
Remove stuff
expenses Sep 18, 2020
30fd353
Remove bitswap network tests
expenses Sep 18, 2020
38f4667
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 21, 2020
8187024
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 21, 2020
5de7605
Merge remote-tracking branch 'origin/master' into ashley-bitswap
expenses Sep 22, 2020
da1c738
Remove changes that slipped in accidentally
expenses Sep 22, 2020
b8f0981
Improve documentation somewhat
expenses Sep 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 300 additions & 89 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl<B: BlockT> Future for GossipEngine<B> {

this.forwarding_state = ForwardingState::Busy(to_forward.into());
},
Event::Dht(_) => {}
Event::Dht(_) => {},
Event::Bitswap(_) => {},
}
// The network event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => return Poll::Ready(()),
Expand Down
2 changes: 2 additions & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ futures-timer = "3.0.2"
futures_codec = "0.4.0"
hex = "0.4.0"
ip_network = "0.3.4"
libp2p-bitswap = "0.7.0"
linked-hash-map = "0.5.2"
linked_hash_set = "0.1.3"
log = "0.4.8"
Expand All @@ -57,6 +58,7 @@ sp-core = { version = "2.0.0-rc6", path = "../../primitives/core" }
sp-runtime = { version = "2.0.0-rc6", path = "../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc6", path = "../../primitives/utils" }
thiserror = "1"
tiny-cid = "0.2.5"
unsigned-varint = { version = "0.4.0", features = ["futures", "futures-codec"] }
void = "1.0.2"
wasm-timer = "0.2"
Expand Down
70 changes: 70 additions & 0 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
peer_info: peer_info::PeerInfoBehaviour,
/// Discovers nodes of the network.
discovery: DiscoveryBehaviour,
/// Exchanges blocks of data with other nodes.
bitswap: libp2p_bitswap::Bitswap,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Block request handling.
Expand Down Expand Up @@ -172,6 +174,23 @@ pub enum BehaviourOut<B: BlockT> {
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
/// request duration.
Dht(DhtEvent, Duration),

/// Event generated by bitswap.
Bitswap(BitswapEvent)
}

/// An event generated by bitswap.
#[derive(Clone, Debug)]
pub enum BitswapEvent {
/// A block has been received.
///
/// There are no guarantees about the recieved block:
/// * The CID and block contents might not match.
/// * We might have receieved a block without having sent out a want request first.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like an attack vector.
@dvc94ch Can a node just connect and send, say, 2 GiB of data without us having asked for it, and we will simply buffer it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be combined with the reputation system right?

Copy link
Contributor

@dvc94ch dvc94ch Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dvc94ch right, but that 2 GiB of data is still held in memory for a period of time, right?

Copy link
Contributor

@dvc94ch dvc94ch Sep 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that depends on the transport. A message is processed from the queue, and dropped if it wasn't requested. We don't have a sophisticated mechanism for disconnecting from peers, I thought that substrate handles bandwidth limiting, telemetry and reputation, and instead of handling this stuff myself in bitswap/ipfs-embed, I thought we can leverage that. I'm glad to redesign bitswap if there is some input on how it should work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A message is processed from the queue, and dropped if it wasn't requested. We don't have a sophisticated mechanism for disconnecting from peers, I thought that substrate handles bandwidth limiting, telemetry and reputation, and instead of handling this stuff myself in bitswap/ipfs-embed, I thought we can leverage that. I'm glad to redesign bitswap if there is some input on how it should work

I'm not familiar with the bitswap protocol or the internals of bitswap, so I'm just naively asking.
The problem I have in mind is: someone can just connect, open 1000 substreams (or whatever the maximum is), negotiate bitswap, send a length prefix of N, and then send N - 1 bytes but never the last byte. Do this multiple times simultaneously and you can probably make the node run out of memory.
This is entirely specific to the bitswap protocol, and not something Substrate can handle.

What we do for other protocols is enforce a limit of one substream per protocol per connection, and make sure that the remote can never send a large amount of data without having explicitly requested it first.

Copy link
Contributor

@dvc94ch dvc94ch Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we do for other protocols is enforce a limit of one substream per protocol per connection

ipfs-rust/libp2p-bitswap#7, can you point me to a specific protocol that handles this, so I can look how it's done?

make sure that the remote can never send a large amount of data without having explicitly requested it first.

I think this is already handled. Does this satisfy the requirement? https://github.com/ipfs-rust/libp2p-bitswap/blob/master/src/behaviour.rs#L216 Or is more work needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you point me to a specific protocol that handles this, so I can look how it's done?

I'm not familiar with this code unfortunately, but libp2p's request-response should do that

I think this is already handled. Does this satisfy the requirement? https://github.com/ipfs-rust/libp2p-bitswap/blob/master/src/behaviour.rs#L216 Or is more work needed?

As far as I understand, you're still entirely buffering the message, and then only discard it, so it really depends on the maximum size of the message. If it's 512 kiB it's ok, if it's 2 GiB it's not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these issues are handled much better now ipfs-rust/libp2p-bitswap#8.

@tomaka how concerned are you with interopability with other implementations? In particular now we avoid packing multiple requests responses in the same message, and reuse the substream to send multiple messages. Also even if we add that supporting bitswap 1.1.0 and 1.0.0 is not something I'm particularly interested in working on, and the api is really focused around bitswap 1.2.0 which works completely different from previous versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how concerned are you with interopability with other implementations?

If this feature is used specifically for large files storage using Substrate, I guess we don't really care.

/// * It is possible to receieve this message multiple times with the same block.
ReceivedBlock(PeerId, tiny_cid::Cid, Box<[u8]>),
/// A peer wants the block with the hash of `cid`, and with a given priority.
ReceivedWant(PeerId, tiny_cid::Cid, libp2p_bitswap::Priority),
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
Expand All @@ -197,6 +216,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
finality_proof_requests,
light_client_handler,
events: VecDeque::new(),
bitswap: libp2p_bitswap::Bitswap::new(),
role,
})
}
Expand Down Expand Up @@ -300,6 +320,37 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
pub fn light_client_request(&mut self, r: light_client_handler::Request<B>) -> Result<(), light_client_handler::Error> {
self.light_client_handler.request(r)
}

/// Count the number of connected peers who want the bitswap block with the hash of `cid`.
pub fn bitswap_num_peers_want(&self, cid: &tiny_cid::Cid) -> usize {
self.bitswap.peers_want(cid).count()
}

/// Determine whether a specific peer wants a bitswap block with the hash of `cid`.
pub fn bitswap_peer_wants_cid(&self, peer_id: &PeerId, cid: &tiny_cid::Cid) -> bool {
self.bitswap.peers_want(cid).find(|id| **id == *peer_id).is_some()
}

/// Send a bitswap block to a specific peer, regardless of whether they want it or not.
pub fn bitswap_send_block(&mut self, peer_id: &PeerId, cid: tiny_cid::Cid, block: Box<[u8]>) {
self.bitswap.send_block(peer_id, cid, block)
}

/// Send a bitswap block to all connected peers who want it.
pub fn bitswap_send_block_all(&mut self, cid: &tiny_cid::Cid, block: &[u8]) {
self.bitswap.send_block_all(cid, block)
}

/// Send out a bitswap message telling connected peers that we want the block with the hash of
/// `cid`. We can also say how this block should be prioritised.
pub fn bitswap_want_block(&mut self, cid: tiny_cid::Cid, priority: libp2p_bitswap::Priority) {
self.bitswap.want_block(cid, priority)
}

/// Send out a bitswap message cancelling a previously wanted block with the hash of `cid`.
pub fn bitswap_cancel_block(&mut self, cid: &tiny_cid::Cid) {
self.bitswap.cancel_block(cid)
}
}

fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Roles) -> ObservedRole {
Expand Down Expand Up @@ -537,6 +588,25 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<libp2p_bitswap::BitswapEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: libp2p_bitswap::BitswapEvent) {
match event {
libp2p_bitswap::BitswapEvent::ReceivedBlock(peer_id, cid, data) => {
self.events.push_back(
BehaviourOut::Bitswap(BitswapEvent::ReceivedBlock(peer_id, cid, data))
);
},
libp2p_bitswap::BitswapEvent::ReceivedWant(peer_id, cid, priority) => {
self.events.push_back(
BehaviourOut::Bitswap(BitswapEvent::ReceivedWant(peer_id, cid, priority))
);
}
libp2p_bitswap::BitswapEvent::ReceivedCancel(..) => {},
}
}
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
if let Some(event) = self.events.pop_front() {
Expand Down
1 change: 1 addition & 0 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub mod network_state;

#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use behaviour::BitswapEvent;
pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo};
pub use service::{
NetworkService, NetworkWorker, RequestFailure, OutboundFailure, NotificationSender,
Expand Down
4 changes: 4 additions & 0 deletions client/network/src/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use libp2p::core::PeerId;
use libp2p::kad::record::Key;
use sp_runtime::ConsensusEngineId;

pub use crate::behaviour::BitswapEvent;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
#[must_use]
Expand All @@ -45,6 +47,8 @@ pub enum DhtEvent {
pub enum Event {
/// Event generated by a DHT.
Dht(DhtEvent),
/// Event generated by Bitswap.
Bitswap(BitswapEvent),

/// Opened a substream with the given node with the given notifications protocol.
///
Expand Down
66 changes: 60 additions & 6 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,16 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.service.add_reserved_peer(peer)
}

/// Count the number of connected peers who want the bitswap block with the hash of `cid`.
pub fn bitswap_num_peers_want(&self, cid: &tiny_cid::Cid) -> usize {
self.network_service.bitswap_num_peers_want(cid)
}

/// Determine whether a specific peer wants a bitswap block with the hash of `cid`.
pub fn bitswap_peer_wants_cid(&self, peer_id: &PeerId, cid: &tiny_cid::Cid) -> bool {
self.network_service.bitswap_peer_wants_cid(peer_id, cid)
}
}

impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
Expand Down Expand Up @@ -1027,6 +1037,35 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
.to_worker
.unbounded_send(ServiceToWorkerMsg::OwnBlockImported(hash, number));
}

/// Send a bitswap block to a specific peer, regardless of whether they want it or not.
pub fn bitswap_send_block(&self, peer_id: PeerId, cid: tiny_cid::Cid, data: Box<[u8]>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapSendBlock(peer_id, cid, data));
}

/// Send a bitswap block to all connected peers who want it.
pub fn bitswap_send_block_all(&self, cid: tiny_cid::Cid, data: Box<[u8]>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapSendBlockAll(cid, data));
}

/// Send out a bitswap message telling connected peers that we want the block with the hash of
/// `cid`. We can also say how this block should be prioritised.
pub fn bitswap_want_block(&self, cid: tiny_cid::Cid, priority: libp2p_bitswap::Priority) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapWantBlock(cid, priority));
}

/// Send out a bitswap message cancelling a previously wanted block with the hash of `cid`.
pub fn bitswap_cancel_block(&self, cid: tiny_cid::Cid) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::BitswapCancelBlock(cid));
}
}

impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle
Expand Down Expand Up @@ -1159,6 +1198,10 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
DisconnectPeer(PeerId),
UpdateChain,
OwnBlockImported(B::Hash, NumberFor<B>),
BitswapSendBlock(PeerId, tiny_cid::Cid, Box<[u8]>),
BitswapSendBlockAll(tiny_cid::Cid, Box<[u8]>),
BitswapWantBlock(tiny_cid::Cid, libp2p_bitswap::Priority),
BitswapCancelBlock(tiny_cid::Cid),
}

/// Main network worker. Must be polled in order for the network to advance.
Expand Down Expand Up @@ -1301,6 +1344,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().update_chain(),
ServiceToWorkerMsg::OwnBlockImported(hash, number) =>
this.network_service.user_protocol_mut().own_block_imported(hash, number),
ServiceToWorkerMsg::BitswapSendBlock(peer_id, cid, block) =>
this.network_service.bitswap_send_block(&peer_id, cid, block),
ServiceToWorkerMsg::BitswapSendBlockAll(cid, block) =>
this.network_service.bitswap_send_block_all(&cid, &block),
ServiceToWorkerMsg::BitswapWantBlock(cid, priority) =>
this.network_service.bitswap_want_block(cid, priority),
ServiceToWorkerMsg::BitswapCancelBlock(cid) =>
this.network_service.bitswap_cancel_block(&cid),
}
}

Expand Down Expand Up @@ -1506,6 +1557,9 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {

this.event_streams.send(Event::Dht(event));
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Bitswap(ev))) => {
this.event_streams.send(Event::Bitswap(ev));
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);

Expand All @@ -1531,14 +1585,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout))))))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError))))))))))) => "force-closed",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged)))))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
Expand Down
10 changes: 10 additions & 0 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ impl Metrics {
self.events_total
.with_label_values(&["dht", "sent", name])
.inc_by(num);
},
Event::Bitswap(_) => {
self.events_total
.with_label_values(&["bitswap", "sent", name])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
Expand Down Expand Up @@ -258,6 +263,11 @@ impl Metrics {
.with_label_values(&["dht", "received", name])
.inc();
}
Event::Bitswap(_) => {
self.events_total
.with_label_values(&["bitswap", "received", name])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name])
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ fn notifications_state_consistent() {
// Add new events here.
future::Either::Left(Event::Dht(_)) => {}
future::Either::Right(Event::Dht(_)) => {}
future::Either::Left(Event::Bitswap(_)) => {}
future::Either::Right(Event::Bitswap(_)) => {}
};
}
});
Expand Down