Skip to content

Commit

Permalink
Relay v2 (#8)
Browse files Browse the repository at this point in the history
* protocols/relay: Implement circuit relay v2 protocol

This commit adds an implementation for the circuit relay v2 protocol to
be used as a relay server, i.e. it supports incoming HOP requests and
outgoing STOP requests. Future commits will add support for clients,
i.e. outgoing HOP requests and incoming STOP requests.

The existing circuit relay v1 protocol implementation is moved to
protocols/relay/src/v1.

* misc/multistream-select: Ignore simultaneous open 'iamclient'

* protocols/relay: Ensure connections of HOP connect are kept alive

* protocols/relay: Improve documentation

* protocols/relay: Implement v2 client logic

* protocols/relay: Handle dial failure

* protocols/relay: Reuse connection

* protocols/relay: Rename Connection to RelayedConnection

* protocols/relay: Update transport doc examples

* protocols/relay: Pass relay addr to transport

* protocols/relay: Implement inbound stop denial

* protocols/relay: Renew reservations

* protocols/relay: Handle invalid expiration in the past

* protocols/relay: Handle in and outbound failure

* protocols/relay: Implement client handler keep alive

* protocols/relay: Handle handler listener closed channel

* protocols/relay: Handle handler to listener failure

* protocols/relay: Return all new listener addresses

* protocols/relay/v2: Update to latest protobuf definition

* Revert "misc/multistream-select: Ignore simultaneous open 'iamclient'"

This reverts commit 125e3c3.

* protocols/relay/v2: Report back to transport

* protocols/relay/v2: Disconnect when stop protocol not supported

* protocols/relay/v2: Document max_duration not exceed u32::MAX

* protocols/relay/v2: Don't append p2p-circuit as relay

* protocols/relay/v2: Implement rate limiter

* protocols/relay/v2: Document caveats on rate limiter with high volume

* protocols/relay: Prevent possible false positive in quickcheck

* protocols/relay: Reword Prost error message

* protocols/relay: Allow users to specify generic rate limiters

* protocols/relay: Move rate limiting logic into module

* protocols/relay: Prevent reservation and connection over relayed conn

* protocols/relay: Add circuit src rate limiting

* protocols/relay/v2: Simplify example

* protocols/relay: Add myself to authors

* protocols/relay: Use thiserror

* protocols/relay/v2: Set rate limits

* protocols/relay: Use wasm_timer::Instant

* protocols/relay/v2: Apply clippy suggestions

* protocols/relay: Fix intra doc link

* protocols/relay: Fix clippy warnings

* misc/metrics: Add basic instrumentation for libp2p-relay

* protocols/relay: Return NetworkBehaviourAction::NotifyHandler right away

* protocols/relay: Run rust fmt

* protocols/relay/src/v2/relay: Accept mutable config

* protocols/relay/examples: Structure command line args

- Allow deterministic peer identity.
- Choose between ipv6 and ipv4.

* .github/workflow: Use ubuntu-18.04 fixing missing protoc binary  (libp2p#2368)

`prost-build` is failing due to a missing `protoc` binary. Neither the OS
supplies one, nor can the bundled binaries be used. This commit downgrades the
OS used. The older OS is compatible with the bundled `protoc` binaries.

* *: Fix clippy errors from upgrade to Rust 1.57 (libp2p#2365)

* core: Mark "unused" field with "_"

We need to keep this marker type to ensure that the type continues
to be required to be pinned.

* tranports/noise: Derive `Default` for `Config`

`false` is the default for `bool`, we can derive this.

* protocols/request-response: Remove unused fields

These are already included the `RequestResponseMessage::Request`
variant.

* *: Allow clippy's large-enum-variant lint

Tackling these suggestions would require performance measurement
which we don't want to do at this stage.

Co-authored-by: Max Inden <mail@max-inden.de>

* protocols/mdns/: Generate peer expiry and fix IPv6 support (libp2p#2359)

Co-authored-by: Victor Ermolaev <victorermolaev@gmail.com>
Co-authored-by: Max Inden <mail@max-inden.de>

* protocols/floodsub: Propagate messages only to target peers (libp2p#2360)

Propagate messages only to the target peers and not all connected peers.

Co-authored-by: Victor Ermolaev <victorermolaev@gmail.com>
Co-authored-by: Max Inden <mail@max-inden.de>

* examples/*: Migrate to async await (libp2p#2356)

* Adapt examples to async style loop
* Adapt async style loop for chat.rs
* Adapt async style loop for distributed-key-value-store.rs
* Adapt async style loop for gossibsub-chat.rs
* Adapt async style loop for ipfs-private.rs
* Adapt ping to use async
* Update tutorial crate to reflect new changes

Co-authored-by: Max Inden <mail@max-inden.de>

* protocols/relay/src/v2/client: Return NetworkBehaviourAction on ListenReq

* Refactor suggestion

* protocols/relay/src/v2/client/handler: Check status of lend out substreams

* protocols/relay/src/v2/client: Use void::Void for drop_notifer

* protocols/relay/src/v2/client/handler: Log dropped oneshot Sender to transport

* protocols/relay/src/v2/client/handler: Await send call to transport listener

* protocols/relay/src/v2/client/handler: Remove unnecessary boxing

Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: ronzigelman <ronzigelman@gmail.com>
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Victor Ermolaev <16148931+vnermolaev@users.noreply.github.com>
Co-authored-by: Victor Ermolaev <victorermolaev@gmail.com>
Co-authored-by: Gerardo Enrique Arriaga Rendon <53304516+JerryHue@users.noreply.github.com>
Co-authored-by: Marco Munizaga <git@marcopolo.io>
  • Loading branch information
8 people authored Dec 7, 2021
1 parent 245b056 commit 6e07e29
Show file tree
Hide file tree
Showing 60 changed files with 5,714 additions and 492 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
test-desktop:
name: Build and test
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
strategy:
matrix:
args: [
Expand All @@ -34,7 +34,7 @@ jobs:

test-wasm:
name: Build on WASM
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
strategy:
matrix:
toolchain: [
Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:

check-rustdoc-links:
name: Check rustdoc intra-doc links
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
container:
image: rust
steps:
Expand All @@ -101,7 +101,7 @@ jobs:
run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items

check-clippy:
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
steps:

- name: Cancel Previous Runs
Expand All @@ -127,7 +127,7 @@ jobs:

integration-test:
name: Integration tests
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
container:
image: rust
steps:
Expand All @@ -145,7 +145,7 @@ jobs:
run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad

rustfmt:
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
steps:

- name: Cancel Previous Runs
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ noise = ["libp2p-noise"]
ping = ["libp2p-ping", "libp2p-metrics/ping"]
plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"]
relay = ["libp2p-relay"]
relay = ["libp2p-relay", "libp2p-metrics/relay"]
request-response = ["libp2p-request-response"]
rendezvous = ["libp2p-rendezvous"]
tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"]
Expand Down
6 changes: 3 additions & 3 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where
let future = AndThenFuture {
inner: Either::Left(Box::pin(dialed_fut)),
args: Some((self.fun, ConnectedPoint::Dialer { address: addr })),
marker: PhantomPinned,
_marker: PhantomPinned,
};
Ok(future)
}
Expand Down Expand Up @@ -132,7 +132,7 @@ where
upgrade: AndThenFuture {
inner: Either::Left(Box::pin(upgrade)),
args: Some((this.fun.clone(), point)),
marker: PhantomPinned,
_marker: PhantomPinned,
},
local_addr,
remote_addr,
Expand All @@ -159,7 +159,7 @@ where
pub struct AndThenFuture<TFut, TMap, TMapOut> {
inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
args: Option<(TMap, ConnectedPoint)>,
marker: PhantomPinned,
_marker: PhantomPinned,
}

impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
Expand Down
61 changes: 26 additions & 35 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@
//! The two nodes then connect.

use async_std::{io, task};
use futures::{future, prelude::*};
use futures::{
prelude::{stream::StreamExt, *},
select,
};
use libp2p::{
floodsub::{self, Floodsub, FloodsubEvent},
identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::SwarmEvent,
Multiaddr, NetworkBehaviour, PeerId, Swarm,
};
use std::{
error::Error,
task::{Context, Poll},
};
use std::error::Error;

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -133,50 +133,44 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm
.behaviour_mut()
.floodsub
.publish(floodsub_topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => {
loop {
select! {
line = stdin.select_next_some() => swarm
.behaviour_mut()
.floodsub
.publish(floodsub_topic.clone(), line.expect("Stdin not to close").as_bytes()),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Floodsub(
FloodsubEvent::Message(message),
)))) => {
SwarmEvent::Behaviour(OutEvent::Floodsub(
FloodsubEvent::Message(message)
)) => {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns(
MdnsEvent::Discovered(list),
)))) => {
SwarmEvent::Behaviour(OutEvent::Mdns(
MdnsEvent::Discovered(list)
)) => {
for (peer, _) in list {
swarm
.behaviour_mut()
.floodsub
.add_node_to_partial_view(peer);
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired(
list,
))))) => {
SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired(
list
))) => {
for (peer, _) in list {
if !swarm.behaviour_mut().mdns.has_node(&peer) {
swarm
Expand All @@ -185,12 +179,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.remove_node_from_partial_view(&peer);
}
}
}
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
},
_ => {}
}
}
Poll::Pending
}))
}
}
39 changes: 12 additions & 27 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
//! 4. Close with Ctrl-c.

use async_std::{io, task};
use futures::prelude::*;
use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{
record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,
Expand All @@ -53,10 +53,7 @@ use libp2p::{
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
NetworkBehaviour, PeerId, Swarm,
};
use std::{
error::Error,
task::{Context, Poll},
};
use std::error::Error;

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -157,35 +154,23 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

// Kick it off.
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => {
handle_input_line(&mut swarm.behaviour_mut().kademlia, line)
}
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
loop {
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
_ => {}
}
}
Poll::Pending
}))
}
}

fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
Expand Down
70 changes: 29 additions & 41 deletions examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,18 @@
//!
//! The two nodes should then connect.

use async_std::{io, task};
use async_std::io;
use env_logger::{Builder, Env};
use futures::prelude::*;
use futures::{prelude::*, select};
use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{
GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode,
};
use libp2p::{gossipsub, identity, swarm::SwarmEvent, Multiaddr, PeerId};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::{
error::Error,
task::{Context, Poll},
};

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -130,44 +127,35 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();

// Kick it off
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm
loop {
select! {
line = stdin.select_next_some() => {
if let Err(e) = swarm
.behaviour_mut()
.publish(topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
} {
println!("Publish error: {:?}", e);
}
}

loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
SwarmEvent::Behaviour(GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
}) => println!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
peer_id
),
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
_ => {}
},
Poll::Ready(None) | Poll::Pending => break,
.publish(topic.clone(), line.expect("Stdin not to close").as_bytes())
{
println!("Publish error: {:?}", e);
}
},
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
}) => println!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
peer_id
),
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
_ => {}
}
}

Poll::Pending
}))
}
}
Loading

0 comments on commit 6e07e29

Please sign in to comment.