diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index e996824209c..25263e37307 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.38.0 [unreleased] + +- Remove `prost` and add `protobuf`. See [PR 3050]. + +[PR 3050]: https://github.com/libp2p/rust-libp2p/pull/3050 + # 0.37.0 - Implement `Hash` and `Ord` for `PublicKey`. See [PR 2915]. diff --git a/core/Cargo.toml b/core/Cargo.toml index 027cd6bd824..f3eaff0e6ba 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -28,7 +28,7 @@ multistream-select = { version = "0.12", path = "../misc/multistream-select" } p256 = { version = "0.11.1", default-features = false, features = ["ecdsa"], optional = true } parking_lot = "0.12.0" pin-project = "1.0.0" -prost = "0.11" +protobuf = "3.2" rand = "0.8" rw-stream-sink = { version = "0.3.0", path = "../misc/rw-stream-sink" } sha2 = "0.10.0" @@ -53,7 +53,7 @@ rmp-serde = "1.0" serde_json = "1.0" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" [features] secp256k1 = [ "libsecp256k1" ] diff --git a/core/build.rs b/core/build.rs index f0c09f93abf..eedd1cccd75 100644 --- a/core/build.rs +++ b/core/build.rs @@ -19,13 +19,16 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos( - &[ + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .inputs(&[ "src/keys.proto", "src/envelope.proto", "src/peer_record.proto", - ], - &["src"], - ) - .unwrap(); + ]) + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/core/src/identity.rs b/core/src/identity.rs index af5dceb69ee..d26064ad283 100644 --- a/core/src/identity.rs +++ b/core/src/identity.rs @@ -147,13 +147,15 @@ impl Keypair { /// Encode a private key as protobuf structure. pub fn to_protobuf_encoding(&self) -> Result, DecodingError> { - use prost::Message; + use protobuf::Message; let pk = match self { - Self::Ed25519(data) => keys_proto::PrivateKey { - r#type: keys_proto::KeyType::Ed25519.into(), - data: data.encode().into(), - }, + Self::Ed25519(data) => { + let mut key = keys_proto::PrivateKey::new(); + key.set_Type(keys_proto::KeyType::Ed25519); + key.set_Data(data.encode().into()); + key + } #[cfg(all(feature = "rsa", not(target_arch = "wasm32")))] Self::Rsa(_) => { return Err(DecodingError::new( @@ -174,32 +176,35 @@ impl Keypair { } }; - Ok(pk.encode_to_vec()) + pk.write_to_bytes() + .map_err(|e| DecodingError::new("Failed to decode.").source(e)) } /// Decode a private key from a protobuf structure and parse it as a [`Keypair`]. pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { - use prost::Message; + use protobuf::Enum; + use protobuf::Message; - let mut private_key = keys_proto::PrivateKey::decode(bytes) + let mut private_key = keys_proto::PrivateKey::parse_from_bytes(bytes) .map_err(|e| DecodingError::new("Protobuf").source(e)) .map(zeroize::Zeroizing::new)?; - let key_type = keys_proto::KeyType::from_i32(private_key.r#type).ok_or_else(|| { - DecodingError::new(format!("unknown key type: {}", private_key.r#type)) - })?; + let key_type = + keys_proto::KeyType::from_i32(private_key.Type().value()).ok_or_else(|| { + DecodingError::new(format!("unknown key type: {:?}", private_key.Type())) + })?; match key_type { keys_proto::KeyType::Ed25519 => { - ed25519::Keypair::decode(&mut private_key.data).map(Keypair::Ed25519) + ed25519::Keypair::decode(private_key.mut_Data()).map(Keypair::Ed25519) } - keys_proto::KeyType::Rsa => Err(DecodingError::new( + keys_proto::KeyType::RSA => Err(DecodingError::new( "Decoding RSA key from Protobuf is unsupported.", )), keys_proto::KeyType::Secp256k1 => Err(DecodingError::new( "Decoding Secp256k1 key from Protobuf is unsupported.", )), - keys_proto::KeyType::Ecdsa => Err(DecodingError::new( + keys_proto::KeyType::ECDSA => Err(DecodingError::new( "Decoding ECDSA key from Protobuf is unsupported.", )), } @@ -208,8 +213,8 @@ impl Keypair { impl zeroize::Zeroize for keys_proto::PrivateKey { fn zeroize(&mut self) { - self.r#type.zeroize(); - self.data.zeroize(); + use protobuf::Message; + self.clear() } } @@ -251,23 +256,21 @@ impl PublicKey { /// Encode the public key into a protobuf structure for storage or /// exchange with other nodes. pub fn to_protobuf_encoding(&self) -> Vec { - use prost::Message; + use protobuf::Message; let public_key = keys_proto::PublicKey::from(self); - let mut buf = Vec::with_capacity(public_key.encoded_len()); public_key - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + .write_to_bytes() + .expect("All fields to be initialized.") } /// Decode a public key from a protobuf structure, e.g. read from storage /// or received from another node. pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { - use prost::Message; + use protobuf::Message; - let pubkey = keys_proto::PublicKey::decode(bytes) + let pubkey = keys_proto::PublicKey::parse_from_bytes(bytes) .map_err(|e| DecodingError::new("Protobuf").source(e))?; pubkey.try_into() @@ -282,25 +285,33 @@ impl PublicKey { impl From<&PublicKey> for keys_proto::PublicKey { fn from(key: &PublicKey) -> Self { match key { - PublicKey::Ed25519(key) => keys_proto::PublicKey { - r#type: keys_proto::KeyType::Ed25519 as i32, - data: key.encode().to_vec(), - }, + PublicKey::Ed25519(key) => { + let mut pubkey = keys_proto::PublicKey::new(); + pubkey.set_Type(keys_proto::KeyType::Ed25519); + pubkey.set_Data(key.encode().to_vec()); + pubkey + } #[cfg(all(feature = "rsa", not(target_arch = "wasm32")))] - PublicKey::Rsa(key) => keys_proto::PublicKey { - r#type: keys_proto::KeyType::Rsa as i32, - data: key.encode_x509(), - }, + PublicKey::Rsa(key) => { + let mut pubkey = keys_proto::PublicKey::new(); + pubkey.set_Type(keys_proto::KeyType::RSA); + pubkey.set_Data(key.encode_x509()); + pubkey + } #[cfg(feature = "secp256k1")] - PublicKey::Secp256k1(key) => keys_proto::PublicKey { - r#type: keys_proto::KeyType::Secp256k1 as i32, - data: key.encode().to_vec(), - }, + PublicKey::Secp256k1(key) => { + let mut pubkey = keys_proto::PublicKey::new(); + pubkey.set_Type(keys_proto::KeyType::Secp256k1); + pubkey.set_Data(key.encode().to_vec()); + pubkey + } #[cfg(feature = "ecdsa")] - PublicKey::Ecdsa(key) => keys_proto::PublicKey { - r#type: keys_proto::KeyType::Ecdsa as i32, - data: key.encode_der(), - }, + PublicKey::Ecdsa(key) => { + let mut pubkey = keys_proto::PublicKey::new(); + pubkey.set_Type(keys_proto::KeyType::ECDSA); + pubkey.set_Data(key.encode_der()); + pubkey + } } } } @@ -309,25 +320,22 @@ impl TryFrom for PublicKey { type Error = DecodingError; fn try_from(pubkey: keys_proto::PublicKey) -> Result { - let key_type = keys_proto::KeyType::from_i32(pubkey.r#type) - .ok_or_else(|| DecodingError::new(format!("unknown key type: {}", pubkey.r#type)))?; - - match key_type { + match pubkey.Type() { keys_proto::KeyType::Ed25519 => { - ed25519::PublicKey::decode(&pubkey.data).map(PublicKey::Ed25519) + ed25519::PublicKey::decode(pubkey.Data()).map(PublicKey::Ed25519) } #[cfg(all(feature = "rsa", not(target_arch = "wasm32")))] - keys_proto::KeyType::Rsa => { - rsa::PublicKey::decode_x509(&pubkey.data).map(PublicKey::Rsa) + keys_proto::KeyType::RSA => { + rsa::PublicKey::decode_x509(pubkey.Data()).map(PublicKey::Rsa) } #[cfg(any(not(feature = "rsa"), target_arch = "wasm32"))] - keys_proto::KeyType::Rsa => { + keys_proto::KeyType::RSA => { log::debug!("support for RSA was disabled at compile-time"); Err(DecodingError::new("Unsupported")) } #[cfg(feature = "secp256k1")] keys_proto::KeyType::Secp256k1 => { - secp256k1::PublicKey::decode(&pubkey.data).map(PublicKey::Secp256k1) + secp256k1::PublicKey::decode(pubkey.Data()).map(PublicKey::Secp256k1) } #[cfg(not(feature = "secp256k1"))] keys_proto::KeyType::Secp256k1 => { @@ -335,11 +343,11 @@ impl TryFrom for PublicKey { Err(DecodingError::new("Unsupported")) } #[cfg(feature = "ecdsa")] - keys_proto::KeyType::Ecdsa => { - ecdsa::PublicKey::decode_der(&pubkey.data).map(PublicKey::Ecdsa) + keys_proto::KeyType::ECDSA => { + ecdsa::PublicKey::decode_der(pubkey.Data()).map(PublicKey::Ecdsa) } #[cfg(not(feature = "ecdsa"))] - keys_proto::KeyType::Ecdsa => { + keys_proto::KeyType::ECDSA => { log::debug!("support for ECDSA was disabled at compile-time"); Err(DecodingError::new("Unsupported")) } diff --git a/core/src/lib.rs b/core/src/lib.rs index 88c6cc0df7c..1bf79d3a5f1 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -38,18 +38,13 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #[allow(clippy::derive_partial_eq_without_eq)] -mod keys_proto { - include!(concat!(env!("OUT_DIR"), "/keys_proto.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } -mod envelope_proto { - include!(concat!(env!("OUT_DIR"), "/envelope_proto.rs")); -} - -#[allow(clippy::derive_partial_eq_without_eq)] -mod peer_record_proto { - include!(concat!(env!("OUT_DIR"), "/peer_record_proto.rs")); -} +use protos::envelope as envelope_proto; +use protos::keys as keys_proto; +use protos::peer_record as peer_record_proto; /// Multi-address re-export. pub use multiaddr; diff --git a/core/src/peer_record.rs b/core/src/peer_record.rs index d0d8e21a4b5..a3c2f6ada33 100644 --- a/core/src/peer_record.rs +++ b/core/src/peer_record.rs @@ -30,11 +30,12 @@ impl PeerRecord { /// /// If this function succeeds, the [`SignedEnvelope`] contained a peer record with a valid signature and can hence be considered authenticated. pub fn from_signed_envelope(envelope: SignedEnvelope) -> Result { - use prost::Message; + use protobuf::Message; let (payload, signing_key) = envelope.payload_and_signing_key(String::from(DOMAIN_SEP), PAYLOAD_TYPE.as_bytes())?; - let record = peer_record_proto::PeerRecord::decode(payload)?; + let record = peer_record_proto::PeerRecord::parse_from_bytes(payload) + .map_err(FromEnvelopeError::from)?; let peer_id = PeerId::from_bytes(&record.peer_id)?; @@ -61,7 +62,7 @@ impl PeerRecord { /// /// This is the same key that is used for authenticating every libp2p connection of your application, i.e. what you use when setting up your [`crate::transport::Transport`]. pub fn new(key: &Keypair, addresses: Vec) -> Result { - use prost::Message; + use protobuf::Message; let seq = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -70,22 +71,21 @@ impl PeerRecord { let peer_id = key.public().to_peer_id(); let payload = { - let record = peer_record_proto::PeerRecord { - peer_id: peer_id.to_bytes(), - seq, - addresses: addresses - .iter() - .map(|m| peer_record_proto::peer_record::AddressInfo { - multiaddr: m.to_vec(), - }) - .collect(), - }; + let mut record = peer_record_proto::PeerRecord::new(); + record.peer_id = peer_id.to_bytes(); + record.seq = seq; + record.addresses = addresses + .iter() + .map(|m| { + let mut addr_info = peer_record_proto::peer_record::AddressInfo::new(); + addr_info.multiaddr = m.to_vec(); + addr_info + }) + .collect(); - let mut buf = Vec::with_capacity(record.encoded_len()); record - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + .write_to_bytes() + .expect("All fields to be initialized.") }; let envelope = SignedEnvelope::new( @@ -129,7 +129,7 @@ pub enum FromEnvelopeError { /// Failed to extract the payload from the envelope. BadPayload(signed_envelope::ReadPayloadError), /// Failed to decode the provided bytes as a [`PeerRecord`]. - InvalidPeerRecord(prost::DecodeError), + InvalidPeerRecord(protobuf::Error), /// Failed to decode the peer ID. InvalidPeerId(multihash::Error), /// The signer of the envelope is different than the peer id in the record. @@ -144,8 +144,8 @@ impl From for FromEnvelopeError { } } -impl From for FromEnvelopeError { - fn from(e: prost::DecodeError) -> Self { +impl From for FromEnvelopeError { + fn from(e: protobuf::Error) -> Self { Self::InvalidPeerRecord(e) } } @@ -213,7 +213,7 @@ mod tests { #[test] fn mismatched_signature() { - use prost::Message; + use protobuf::Message; let addr: Multiaddr = HOME.parse().unwrap(); @@ -227,14 +227,12 @@ mod tests { seq: 0, addresses: vec![peer_record_proto::peer_record::AddressInfo { multiaddr: addr.to_vec(), + ..peer_record_proto::peer_record::AddressInfo::default() }], + ..peer_record_proto::PeerRecord::default() }; - let mut buf = Vec::with_capacity(record.encoded_len()); - record - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + record.write_to_bytes().unwrap() }; SignedEnvelope::new( diff --git a/core/src/signed_envelope.rs b/core/src/signed_envelope.rs index 33bfdf2d4f4..c63ce49b76f 100644 --- a/core/src/signed_envelope.rs +++ b/core/src/signed_envelope.rs @@ -73,32 +73,29 @@ impl SignedEnvelope { /// Encode this [`SignedEnvelope`] using the protobuf encoding specified in the RFC. pub fn into_protobuf_encoding(self) -> Vec { - use prost::Message; + use protobuf::Message; - let envelope = crate::envelope_proto::Envelope { - public_key: Some((&self.key).into()), - payload_type: self.payload_type, - payload: self.payload, - signature: self.signature, - }; + let mut envelope = crate::envelope_proto::Envelope::new(); + envelope.public_key = protobuf::MessageField::some((&self.key).into()); + envelope.payload_type = self.payload_type; + envelope.payload = self.payload; + envelope.signature = self.signature; - let mut buf = Vec::with_capacity(envelope.encoded_len()); envelope - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - - buf + .write_to_bytes() + .expect("All fields to be initialized.") } /// Decode a [`SignedEnvelope`] using the protobuf encoding specified in the RFC. pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { - use prost::Message; + use protobuf::Message; - let envelope = crate::envelope_proto::Envelope::decode(bytes)?; + let envelope = crate::envelope_proto::Envelope::parse_from_bytes(bytes)?; Ok(Self { key: envelope .public_key + .into_option() .ok_or(DecodingError::MissingPublicKey)? .try_into()?, payload_type: envelope.payload_type, @@ -143,15 +140,15 @@ fn signature_payload(domain_separation: String, payload_type: &[u8], payload: &[ #[derive(Debug)] pub enum DecodingError { /// Decoding the provided bytes as a signed envelope failed. - InvalidEnvelope(prost::DecodeError), + InvalidEnvelope(protobuf::Error), /// The public key in the envelope could not be converted to our internal public key type. InvalidPublicKey(identity::error::DecodingError), /// The public key in the envelope could not be converted to our internal public key type. MissingPublicKey, } -impl From for DecodingError { - fn from(e: prost::DecodeError) -> Self { +impl From for DecodingError { + fn from(e: protobuf::Error) -> Self { Self::InvalidEnvelope(e) } } diff --git a/misc/prost-codec/CHANGELOG.md b/misc/prost-codec/CHANGELOG.md index d9380ea34ca..8cb05248b7e 100644 --- a/misc/prost-codec/CHANGELOG.md +++ b/misc/prost-codec/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.2.1 [unreleased] + +- Remove `prost` and add `protobuf`. See [PR 3050]. + +[PR 3050]: https://github.com/libp2p/rust-libp2p/pull/3050 + # 0.2.0 - Update to prost(-build) `v0.11`. See [PR 2788]. diff --git a/misc/prost-codec/Cargo.toml b/misc/prost-codec/Cargo.toml index 34a5c451cc5..d77340374c2 100644 --- a/misc/prost-codec/Cargo.toml +++ b/misc/prost-codec/Cargo.toml @@ -13,13 +13,10 @@ categories = ["asynchronous"] [dependencies] asynchronous-codec = { version = "0.6" } bytes = { version = "1" } -prost = "0.11" +protobuf = "3.2" thiserror = "1.0" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } -[dev-dependencies] -prost-build = "0.11" - # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] diff --git a/misc/prost-codec/src/lib.rs b/misc/prost-codec/src/lib.rs index f8efa147a39..e85f4283296 100644 --- a/misc/prost-codec/src/lib.rs +++ b/misc/prost-codec/src/lib.rs @@ -1,15 +1,14 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use asynchronous_codec::{Decoder, Encoder}; -use bytes::BytesMut; -use prost::Message; -use std::io::Cursor; +use bytes::Bytes; +use protobuf::Message; use std::marker::PhantomData; use thiserror::Error; use unsigned_varint::codec::UviBytes; /// [`Codec`] implements [`Encoder`] and [`Decoder`], uses [`unsigned_varint`] -/// to prefix messages with their length and uses [`prost`] and a provided +/// to prefix messages with their length and uses [`protobuf`] and a provided /// `struct` implementing [`Message`] to do the encoding. pub struct Codec { uvi: UviBytes, @@ -41,11 +40,9 @@ impl Encoder for Codec { item: Self::Item, dst: &mut asynchronous_codec::BytesMut, ) -> Result<(), Self::Error> { - let mut encoded_msg = BytesMut::new(); - item.encode(&mut encoded_msg) - .expect("BytesMut to have sufficient capacity."); + let bytes = item.write_to_bytes().expect("Failed to write to bytes."); self.uvi - .encode(encoded_msg.freeze(), dst) + .encode(Bytes::from(bytes), dst) .map_err(|e| e.into()) } } @@ -61,7 +58,7 @@ impl Decoder for Codec { Ok(self .uvi .decode(src)? - .map(|msg| Message::decode(Cursor::new(msg))) + .map(|msg| Message::parse_from_bytes(msg.as_ref())) .transpose()?) } } @@ -72,7 +69,7 @@ pub enum Error { Decode( #[from] #[source] - prost::DecodeError, + protobuf::Error, ), #[error("Io error {0}")] Io( diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index 6f55afa4c06..c24ce7eae34 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.8.1 [unreleased] + +- Remove `prost` and add `protobuf`. See [PR 3050]. + +[PR 3050]: https://github.com/libp2p/rust-libp2p/pull/3050 + # 0.8.0 - Update to `libp2p-core` `v0.37.0`. diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index 68ff289a424..1f5df0ba8e4 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" [dependencies] async-trait = "0.1" @@ -23,7 +23,7 @@ libp2p-swarm = { version = "0.40.0", path = "../../swarm" } libp2p-request-response = { version = "0.22.0", path = "../request-response" } log = "0.4" rand = "0.8" -prost = "0.11" +protobuf = "3.2" [dev-dependencies] async-std = { version = "1.10", features = ["attributes"] } diff --git a/protocols/autonat/build.rs b/protocols/autonat/build.rs index d3714fdec14..0a73bca1b05 100644 --- a/protocols/autonat/build.rs +++ b/protocols/autonat/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/structs.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/structs.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/autonat/src/lib.rs b/protocols/autonat/src/lib.rs index d0fd5c04346..986436604d9 100644 --- a/protocols/autonat/src/lib.rs +++ b/protocols/autonat/src/lib.rs @@ -35,6 +35,8 @@ pub use self::{ pub use libp2p_request_response::{InboundFailure, OutboundFailure}; #[allow(clippy::derive_partial_eq_without_eq)] -mod structs_proto { - include!(concat!(env!("OUT_DIR"), "/structs.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } + +use protos::structs as structs_proto; diff --git a/protocols/autonat/src/protocol.rs b/protocols/autonat/src/protocol.rs index f39c22aa25f..3ec2242b00e 100644 --- a/protocols/autonat/src/protocol.rs +++ b/protocols/autonat/src/protocol.rs @@ -23,7 +23,7 @@ use async_trait::async_trait; use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_request_response::{ProtocolName, RequestResponseCodec}; -use prost::Message; +use protobuf::{Enum, Message, MessageField}; use std::{convert::TryFrom, io}; #[derive(Clone, Debug)] @@ -108,18 +108,19 @@ pub struct DialRequest { impl DialRequest { pub fn from_bytes(bytes: &[u8]) -> Result { - let msg = structs_proto::Message::decode(bytes) + let msg = structs_proto::Message::parse_from_bytes(bytes) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; - if msg.r#type != Some(structs_proto::message::MessageType::Dial as _) { + if msg.type_() != structs_proto::message::MessageType::DIAL as _ { return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type")); } - let (peer_id, addrs) = if let Some(structs_proto::message::Dial { - peer: - Some(structs_proto::message::PeerInfo { - id: Some(peer_id), - addrs, - }), - }) = msg.dial + let (peer_id, addrs) = if let Some(structs_proto::message::PeerInfo { + id: Some(peer_id), + addrs, + .. + }) = msg + .dial + .into_option() + .and_then(|dial| dial.peer.into_option()) { (peer_id, addrs) } else { @@ -157,21 +158,19 @@ impl DialRequest { .map(|addr| addr.to_vec()) .collect(); - let msg = structs_proto::Message { - r#type: Some(structs_proto::message::MessageType::Dial as _), - dial: Some(structs_proto::message::Dial { - peer: Some(structs_proto::message::PeerInfo { - id: Some(peer_id), - addrs, - }), - }), - dial_response: None, - }; + let mut peer_info = structs_proto::message::PeerInfo::new(); + peer_info.set_id(peer_id); + peer_info.addrs = addrs; + + let mut dial = structs_proto::message::Dial::new(); + dial.peer = MessageField::some(peer_info); - let mut bytes = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut bytes) - .expect("Vec provides capacity as needed"); - bytes + let mut msg = structs_proto::Message::new(); + msg.set_type(structs_proto::message::MessageType::DIAL as _); + msg.dial = MessageField::some(dial); + msg.dialResponse = MessageField::none(); + + msg.write_to_bytes().expect("All fields to be initialized.") } } @@ -199,13 +198,15 @@ impl TryFrom for ResponseError { fn try_from(value: structs_proto::message::ResponseStatus) -> Result { match value { - structs_proto::message::ResponseStatus::EDialError => Ok(ResponseError::DialError), - structs_proto::message::ResponseStatus::EDialRefused => Ok(ResponseError::DialRefused), - structs_proto::message::ResponseStatus::EBadRequest => Ok(ResponseError::BadRequest), - structs_proto::message::ResponseStatus::EInternalError => { + structs_proto::message::ResponseStatus::E_DIAL_ERROR => Ok(ResponseError::DialError), + structs_proto::message::ResponseStatus::E_DIAL_REFUSED => { + Ok(ResponseError::DialRefused) + } + structs_proto::message::ResponseStatus::E_BAD_REQUEST => Ok(ResponseError::BadRequest), + structs_proto::message::ResponseStatus::E_INTERNAL_ERROR => { Ok(ResponseError::InternalError) } - structs_proto::message::ResponseStatus::Ok => { + structs_proto::message::ResponseStatus::OK => { log::debug!("Received response with status code OK but expected error."); Err(io::Error::new( io::ErrorKind::InvalidData, @@ -224,37 +225,43 @@ pub struct DialResponse { impl DialResponse { pub fn from_bytes(bytes: &[u8]) -> Result { - let msg = structs_proto::Message::decode(bytes) + let msg = structs_proto::Message::parse_from_bytes(bytes) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; - if msg.r#type != Some(structs_proto::message::MessageType::DialResponse as _) { + if msg.type_() != structs_proto::message::MessageType::DIAL_RESPONSE as _ { return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type")); } - - Ok(match msg.dial_response { + Ok(match msg.dialResponse.into_option() { Some(structs_proto::message::DialResponse { status: Some(status), - status_text, + statusText, addr: Some(addr), - }) if structs_proto::message::ResponseStatus::from_i32(status) - == Some(structs_proto::message::ResponseStatus::Ok) => + .. + }) if structs_proto::message::ResponseStatus::from_i32(status.value()) + == Some(structs_proto::message::ResponseStatus::OK) => { let addr = Multiaddr::try_from(addr) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; Self { - status_text, + status_text: statusText, result: Ok(addr), } } Some(structs_proto::message::DialResponse { status: Some(status), - status_text, + statusText, addr: None, + .. }) => Self { - status_text, + status_text: statusText, result: Err(ResponseError::try_from( - structs_proto::message::ResponseStatus::from_i32(status).ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidData, "invalid response status code") - })?, + structs_proto::message::ResponseStatus::from_i32(status.value()).ok_or_else( + || { + io::Error::new( + io::ErrorKind::InvalidData, + "invalid response status code", + ) + }, + )?, )?), }, _ => { @@ -269,28 +276,33 @@ impl DialResponse { pub fn into_bytes(self) -> Vec { let dial_response = match self.result { - Ok(addr) => structs_proto::message::DialResponse { - status: Some(0), - status_text: self.status_text, - addr: Some(addr.to_vec()), - }, - Err(error) => structs_proto::message::DialResponse { - status: Some(error.into()), - status_text: self.status_text, - addr: None, - }, - }; + Ok(addr) => { + let mut res = structs_proto::message::DialResponse::new(); + res.set_status(structs_proto::message::ResponseStatus::OK); + res.set_addr(addr.to_vec()); + res.statusText = self.status_text; + + res + } + Err(error) => { + let mut res = structs_proto::message::DialResponse::new(); + res.clear_addr(); + res.set_status( + structs_proto::message::ResponseStatus::from_i32(error.into()) + .expect("Valid ResponseError."), + ); + res.statusText = self.status_text; - let msg = structs_proto::Message { - r#type: Some(structs_proto::message::MessageType::DialResponse as _), - dial: None, - dial_response: Some(dial_response), + res + } }; - let mut bytes = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut bytes) - .expect("Vec provides capacity as needed"); - bytes + let mut msg = structs_proto::Message::new(); + msg.set_type(structs_proto::message::MessageType::DIAL_RESPONSE as _); + msg.dial = MessageField::none(); + msg.dialResponse = MessageField::some(dial_response); + + msg.write_to_bytes().expect("All fields to be initialized.") } } diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 0d49f90d008..15c788ae214 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.7.1 [unreleased] + +- Remove `prost` and add `protobuf`. See [PR 3050]. + +[PR 3050]: https://github.com/libp2p/rust-libp2p/pull/3050 + # 0.7.0 - Update to `libp2p-core` `v0.37.0`. diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index c90af2f7bf7..f1196ba2ebe 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -21,12 +21,12 @@ libp2p-core = { version = "0.37.0", path = "../../core" } libp2p-swarm = { version = "0.40.0", path = "../../swarm" } log = "0.4" prost-codec = { version = "0.2", path = "../../misc/prost-codec" } -prost = "0.11" +protobuf = "3.2" thiserror = "1.0" void = "1" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" [dev-dependencies] env_logger = "0.9.0" diff --git a/protocols/dcutr/build.rs b/protocols/dcutr/build.rs index b159bb4c817..357dca2f12b 100644 --- a/protocols/dcutr/build.rs +++ b/protocols/dcutr/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/message.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 738af5300b1..65df2b5cda6 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -33,6 +33,8 @@ pub use protocol::{ }; #[allow(clippy::derive_partial_eq_without_eq)] -mod message_proto { - include!(concat!(env!("OUT_DIR"), "/holepunch.pb.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } + +use protos::message as message_proto; diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index cfb28ca57cb..075f1fca7fb 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -50,31 +50,30 @@ impl upgrade::InboundUpgrade for Upgrade { ); async move { - let HolePunch { r#type, obs_addrs } = + let hole_punch: HolePunch = substream.next().await.ok_or(UpgradeError::StreamClosed)??; - let obs_addrs = if obs_addrs.is_empty() { + match hole_punch.type_() { + hole_punch::Type::CONNECT => {} + hole_punch::Type::SYNC => return Err(UpgradeError::UnexpectedTypeSync), + } + + if hole_punch.ObsAddrs.is_empty() { return Err(UpgradeError::NoAddresses); - } else { - obs_addrs - .into_iter() - .map(Multiaddr::try_from) - // Filter out relayed addresses. - .filter(|a| match a { - Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), - Err(_) => true, - }) - .collect::, _>>() - .map_err(|_| UpgradeError::InvalidAddrs)? - }; - - let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; - - match r#type { - hole_punch::Type::Connect => {} - hole_punch::Type::Sync => return Err(UpgradeError::UnexpectedTypeSync), } + let obs_addrs = hole_punch + .ObsAddrs + .into_iter() + .map(Multiaddr::try_from) + // Filter out relayed addresses. + .filter(|a| match a { + Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), + Err(_) => true, + }) + .collect::, _>>() + .map_err(|_| UpgradeError::InvalidAddrs)?; + Ok(PendingConnect { substream, remote_obs_addrs: obs_addrs, @@ -94,22 +93,21 @@ impl PendingConnect { mut self, local_obs_addrs: Vec, ) -> Result, UpgradeError> { - let msg = HolePunch { - r#type: hole_punch::Type::Connect.into(), - obs_addrs: local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(), - }; + let mut msg = HolePunch::new(); + msg.ObsAddrs = local_obs_addrs.into_iter().map(|a| a.to_vec()).collect(); + msg.set_type(hole_punch::Type::CONNECT); self.substream.send(msg).await?; - let HolePunch { r#type, .. } = self + let r#type = self .substream .next() .await - .ok_or(UpgradeError::StreamClosed)??; + .ok_or(UpgradeError::StreamClosed)?? + .type_(); - let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; match r#type { - hole_punch::Type::Connect => return Err(UpgradeError::UnexpectedTypeConnect), - hole_punch::Type::Sync => {} + hole_punch::Type::CONNECT => return Err(UpgradeError::UnexpectedTypeConnect), + hole_punch::Type::SYNC => {} } Ok(self.remote_obs_addrs) diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index a0ea8449fe4..612503a131b 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -59,46 +59,43 @@ impl upgrade::OutboundUpgrade for Upgrade { prost_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), ); - let msg = HolePunch { - r#type: hole_punch::Type::Connect.into(), - obs_addrs: self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(), - }; + let mut msg = HolePunch::new(); + msg.ObsAddrs = self.obs_addrs.into_iter().map(|a| a.to_vec()).collect(); + msg.set_type(hole_punch::Type::CONNECT); async move { substream.send(msg).await?; let sent_time = Instant::now(); - let HolePunch { r#type, obs_addrs } = + let hole_punch: HolePunch = substream.next().await.ok_or(UpgradeError::StreamClosed)??; let rtt = sent_time.elapsed(); - let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; - match r#type { - hole_punch::Type::Connect => {} - hole_punch::Type::Sync => return Err(UpgradeError::UnexpectedTypeSync), + match hole_punch.type_() { + hole_punch::Type::CONNECT => {} + hole_punch::Type::SYNC => return Err(UpgradeError::UnexpectedTypeSync), } - let obs_addrs = if obs_addrs.is_empty() { + if hole_punch.ObsAddrs.is_empty() { return Err(UpgradeError::NoAddresses); - } else { - obs_addrs - .into_iter() - .map(Multiaddr::try_from) - // Filter out relayed addresses. - .filter(|a| match a { - Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), - Err(_) => true, - }) - .collect::, _>>() - .map_err(|_| UpgradeError::InvalidAddrs)? - }; - - let msg = HolePunch { - r#type: hole_punch::Type::Sync.into(), - obs_addrs: vec![], - }; + } + + let obs_addrs = hole_punch + .ObsAddrs + .into_iter() + .map(Multiaddr::try_from) + // Filter out relayed addresses. + .filter(|a| match a { + Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit), + Err(_) => true, + }) + .collect::, _>>() + .map_err(|_| UpgradeError::InvalidAddrs)?; + + let mut msg = HolePunch::new(); + msg.set_type(hole_punch::Type::SYNC); substream.send(msg).await?; diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index cfbf03ebd46..de7df133543 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -17,12 +17,12 @@ futures = "0.3.1" libp2p-core = { version = "0.37.0", path = "../../core" } libp2p-swarm = { version = "0.40.0", path = "../../swarm" } log = "0.4" -prost = "0.11" +protobuf = "3.2" rand = "0.8" smallvec = "1.6.1" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/floodsub/build.rs b/protocols/floodsub/build.rs index a3de99880dc..fd4236aeb80 100644 --- a/protocols/floodsub/build.rs +++ b/protocols/floodsub/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/rpc.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 6eb0af276e2..1f0d65d8ef8 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -25,16 +25,18 @@ use libp2p_core::PeerId; +#[allow(clippy::derive_partial_eq_without_eq)] +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); +} + +use protos::rpc as rpc_proto; + pub mod protocol; mod layer; mod topic; -#[allow(clippy::derive_partial_eq_without_eq)] -mod rpc_proto { - include!(concat!(env!("OUT_DIR"), "/floodsub.pb.rs")); -} - pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::topic::Topic; diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index df694b2e06d..35b3ce70b21 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -25,7 +25,7 @@ use futures::{ AsyncWriteExt, Future, }; use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; -use prost::Message; +use protobuf::Message; use std::{error, fmt, io, iter, pin::Pin}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. @@ -59,7 +59,7 @@ where fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?; - let rpc = rpc_proto::Rpc::decode(&packet[..])?; + let rpc = rpc_proto::RPC::parse_from_bytes(&packet[..])?; let mut messages = Vec::with_capacity(rpc.publish.len()); for publish in rpc.publish.into_iter() { @@ -97,7 +97,7 @@ pub enum FloodsubDecodeError { /// Error when reading the packet from the socket. ReadError(io::Error), /// Error when decoding the raw buffer into a protobuf. - ProtobufError(prost::DecodeError), + ProtobufError(protobuf::Error), /// Error when parsing the `PeerId` in the message. InvalidPeerId, } @@ -108,8 +108,8 @@ impl From for FloodsubDecodeError { } } -impl From for FloodsubDecodeError { - fn from(err: prost::DecodeError) -> Self { +impl From for FloodsubDecodeError { + fn from(err: protobuf::Error) -> Self { FloodsubDecodeError::ProtobufError(err) } } @@ -181,32 +181,31 @@ where impl FloodsubRpc { /// Turns this `FloodsubRpc` into a message that can be sent to a substream. fn into_bytes(self) -> Vec { - let rpc = rpc_proto::Rpc { - publish: self - .messages - .into_iter() - .map(|msg| rpc_proto::Message { - from: Some(msg.source.to_bytes()), - data: Some(msg.data), - seqno: Some(msg.sequence_number), - topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(), - }) - .collect(), - - subscriptions: self - .subscriptions - .into_iter() - .map(|topic| rpc_proto::rpc::SubOpts { - subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe), - topic_id: Some(topic.topic.into()), - }) - .collect(), - }; - - let mut buf = Vec::with_capacity(rpc.encoded_len()); - rpc.encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + let mut rpc = rpc_proto::RPC::new(); + rpc.publish = self + .messages + .into_iter() + .map(|msg| { + let mut m = rpc_proto::Message::new(); + m.set_from(msg.source.to_bytes()); + m.set_data(msg.data); + m.set_seqno(msg.sequence_number); + m.topic_ids = msg.topics.into_iter().map(|topic| topic.into()).collect(); + m + }) + .collect(); + rpc.subscriptions = self + .subscriptions + .into_iter() + .map(|topic| { + let mut sub_opts = rpc_proto::rpc::SubOpts::new(); + sub_opts.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe); + sub_opts.set_topic_id(topic.topic.into()); + sub_opts + }) + .collect(); + + rpc.write_to_bytes().expect("All fields to be initialized.") } } diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index cc337869ccf..a8e25219e7c 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -24,7 +24,7 @@ log = "0.4.11" sha2 = "0.10.0" base64 = "0.13.0" smallvec = "1.6.1" -prost = "0.11" +protobuf = "3.2" hex_fmt = "0.3.0" regex = "1.5.5" serde = { version = "1", optional = true, features = ["derive"] } @@ -42,7 +42,7 @@ hex = "0.4.2" derive_builder = "0.11.1" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/gossipsub/build.rs b/protocols/gossipsub/build.rs index a0c81052bdc..7b145088750 100644 --- a/protocols/gossipsub/build.rs +++ b/protocols/gossipsub/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/rpc.proto", "src/compat.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .inputs(&["src/rpc.proto", "src/compat.proto"]) + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 43f2f79466d..598a7c8b2f3 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -33,7 +33,7 @@ use std::{ use futures::StreamExt; use log::{debug, error, trace, warn}; use prometheus_client::registry::Registry; -use prost::Message; +use protobuf::Message; use rand::{seq::SliceRandom, thread_rng}; use libp2p_core::{ @@ -618,8 +618,12 @@ where .into_protobuf(); // check that the size doesn't exceed the max transmission size - if event.encoded_len() > self.config.max_transmit_size() { - return Err(PublishError::MessageTooLarge); + match usize::try_from(event.compute_size()) { + Ok(v) if v > self.config.max_transmit_size() => { + return Err(PublishError::MessageTooLarge) + } + Err(_) => return Err(PublishError::MessageTooLarge), + _ => {} } // Check the if the message has been published before @@ -729,13 +733,16 @@ where } // Send to peers we know are subscribed to the topic. - let msg_bytes = event.encoded_len(); + let msg_bytes = event.compute_size(); for peer_id in recipient_peers.iter() { trace!("Sending message to peer: {:?}", peer_id); self.send_message(*peer_id, event.clone())?; if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&topic_hash, msg_bytes); + m.msg_sent( + &topic_hash, + usize::try_from(msg_bytes).expect("Size of sent messages fit in usize."), + ); } } @@ -1348,14 +1355,17 @@ where } .into_protobuf(); - let msg_bytes = message.encoded_len(); + let msg_bytes = message.compute_size(); if self.send_message(*peer_id, message).is_err() { error!("Failed to send cached messages. Messages too large"); } else if let Some(m) = self.metrics.as_mut() { // Sending of messages succeeded, register them on the internal metrics. for topic in topics.iter() { - m.msg_sent(topic, msg_bytes); + m.msg_sent( + topic, + usize::try_from(msg_bytes).expect("Size of sent messages fit in usize."), + ); } } } @@ -2744,12 +2754,15 @@ where } .into_protobuf(); - let msg_bytes = event.encoded_len(); + let msg_bytes = event.compute_size(); for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); self.send_message(*peer, event.clone())?; if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&message.topic, msg_bytes); + m.msg_sent( + &message.topic, + usize::try_from(msg_bytes).expect("Size of sent messages fit in usize."), + ) } } debug!("Completed forwarding message"); @@ -2775,23 +2788,19 @@ where let sequence_number: u64 = rand::random(); let signature = { - let message = rpc_proto::Message { - from: Some(author.clone().to_bytes()), - data: Some(data.clone()), - seqno: Some(sequence_number.to_be_bytes().to_vec()), - topic: topic.clone().into_string(), - signature: None, - key: None, - }; - - let mut buf = Vec::with_capacity(message.encoded_len()); - message - .encode(&mut buf) - .expect("Buffer has sufficient capacity"); + let mut message = rpc_proto::Message::new(); + message.set_from(author.clone().to_bytes()); + message.set_data(data.clone()); + message.set_seqno(sequence_number.to_be_bytes().to_vec()); + message.set_topic(topic.clone().into_string()); // the signature is over the bytes "libp2p-pubsub:" let mut signature_bytes = SIGNING_PREFIX.to_vec(); - signature_bytes.extend_from_slice(&buf); + signature_bytes.extend_from_slice( + &message + .write_to_bytes() + .expect("All required fields to be initialized."), + ); Some(keypair.sign(&signature_bytes)?) }; @@ -2889,7 +2898,7 @@ where fn send_message( &mut self, peer_id: PeerId, - message: rpc_proto::Rpc, + message: rpc_proto::RPC, ) -> Result<(), PublishError> { // If the message is oversized, try and fragment it. If it cannot be fragmented, log an // error and drop the message (all individual messages should be small enough to fit in the @@ -2910,16 +2919,17 @@ where // If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC // messages to be sent. - fn fragment_message(&self, rpc: rpc_proto::Rpc) -> Result, PublishError> { - if rpc.encoded_len() < self.config.max_transmit_size() { - return Ok(vec![rpc]); + fn fragment_message(&self, rpc: rpc_proto::RPC) -> Result, PublishError> { + match usize::try_from(rpc.compute_size()) { + Ok(size) => { + if size < self.config.max_transmit_size() { + return Ok(vec![rpc]); + } + } + Err(_) => return Err(PublishError::MessageTooLarge), } - let new_rpc = rpc_proto::Rpc { - subscriptions: Vec::new(), - publish: Vec::new(), - control: None, - }; + let new_rpc = rpc_proto::RPC::new(); let mut rpc_list = vec![new_rpc.clone()]; @@ -2931,7 +2941,9 @@ where // create a new RPC if the new object plus 5% of its size (for length prefix // buffers) exceeds the max transmit size. - if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize + let msg_size = usize::try_from(rpc_list[list_index].compute_size()) + .expect("Message size has already been validated."); + if msg_size + (($object_size as f64) * 1.05) as usize > self.config.max_transmit_size() && rpc_list[list_index] != new_rpc { @@ -2943,7 +2955,8 @@ where macro_rules! add_item { ($object: ident, $type: ident ) => { - let object_size = $object.encoded_len(); + let object_size = usize::try_from($object.compute_size()) + .expect("Message size has already been validated."); if object_size + 2 > self.config.max_transmit_size() { // This should not be possible. All received and published messages have already @@ -2973,56 +2986,68 @@ where // fragmenting, otherwise, fragment the control messages let empty_control = rpc_proto::ControlMessage::default(); if let Some(control) = rpc.control.as_ref() { - if control.encoded_len() + 2 > self.config.max_transmit_size() { + let control_size = usize::try_from(control.compute_size()) + .expect("Message size has already been validated."); + if control_size + 2 > self.config.max_transmit_size() { // fragment the RPC for ihave in &control.ihave { - let len = ihave.encoded_len(); + let len = usize::try_from(ihave.compute_size()) + .expect("Message size has already been validated."); create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .ihave - .push(ihave.clone()); + let mut last_rpc = rpc_list.last_mut().expect("Always an element"); + if last_rpc.control.as_ref().is_none() { + last_rpc.control = protobuf::MessageField::some(empty_control.clone()); + } + + if let Some(c) = last_rpc.control.as_mut() { + c.ihave.push(ihave.clone()) + } } for iwant in &control.iwant { - let len = iwant.encoded_len(); + let len = usize::try_from(iwant.compute_size()) + .expect("Message size has already been validated."); create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .iwant - .push(iwant.clone()); + let mut last_rpc = rpc_list.last_mut().expect("Always an element"); + if last_rpc.control.as_ref().is_none() { + last_rpc.control = protobuf::MessageField::some(empty_control.clone()); + } + + if let Some(c) = last_rpc.control.as_mut() { + c.iwant.push(iwant.clone()) + } } for graft in &control.graft { - let len = graft.encoded_len(); + let len = usize::try_from(graft.compute_size()) + .expect("Message size has already been validated."); create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .graft - .push(graft.clone()); + let mut last_rpc = rpc_list.last_mut().expect("Always an element"); + if last_rpc.control.as_ref().is_none() { + last_rpc.control = protobuf::MessageField::some(empty_control.clone()); + } + + if let Some(c) = last_rpc.control.as_mut() { + c.graft.push(graft.clone()) + } } for prune in &control.prune { - let len = prune.encoded_len(); + let len = usize::try_from(prune.compute_size()) + .expect("Message size has already been validated."); create_or_add_rpc!(len); - rpc_list - .last_mut() - .expect("Always an element") - .control - .get_or_insert_with(|| empty_control.clone()) - .prune - .push(prune.clone()); + let mut last_rpc = rpc_list.last_mut().expect("Always an element"); + if last_rpc.control.as_ref().is_none() { + last_rpc.control = protobuf::MessageField::some(empty_control.clone()); + } + + if let Some(c) = last_rpc.control.as_mut() { + c.prune.push(prune.clone()) + } } } else { - let len = control.encoded_len(); + let len = usize::try_from(control.compute_size()) + .expect("Message size has already been validated."); create_or_add_rpc!(len); - rpc_list.last_mut().expect("Always an element").control = Some(control.clone()); + rpc_list.last_mut().expect("Always an element").control = + protobuf::MessageField::some(control.clone()); } } @@ -3665,6 +3690,12 @@ mod local_test { use asynchronous_codec::Encoder; use quickcheck::*; + macro_rules! to_usize { + ($object_size: expr ) => {{ + usize::try_from($object_size).unwrap() + }}; + } + fn empty_rpc() -> GossipsubRpc { GossipsubRpc { subscriptions: Vec::new(), @@ -3741,7 +3772,7 @@ mod local_test { // Messages over the limit should be split - while rpc_proto.encoded_len() < max_transmit_size { + while to_usize!(rpc_proto.compute_size()) < max_transmit_size { rpc.messages.push(test_message()); rpc_proto = rpc.clone().into_protobuf(); } @@ -3758,7 +3789,7 @@ mod local_test { // all fragmented messages should be under the limit for message in fragmented_messages { assert!( - message.encoded_len() < max_transmit_size, + to_usize!(message.compute_size()) < max_transmit_size, "all messages should be less than the transmission size" ); } @@ -3785,7 +3816,7 @@ mod local_test { .fragment_message(rpc_proto.clone()) .expect("Messages must be valid"); - if rpc_proto.encoded_len() < max_transmit_size { + if to_usize!(rpc_proto.compute_size()) < max_transmit_size { assert_eq!( fragmented_messages.len(), 1, @@ -3801,12 +3832,12 @@ mod local_test { // all fragmented messages should be under the limit for message in fragmented_messages { assert!( - message.encoded_len() < max_transmit_size, - "all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size + to_usize!(message.compute_size()) < max_transmit_size, + "all messages should be less than the transmission size: list size {} max size{}", to_usize!(message.compute_size()), max_transmit_size ); // ensure they can all be encoded - let mut buf = bytes::BytesMut::with_capacity(message.encoded_len()); + let mut buf = bytes::BytesMut::with_capacity(to_usize!(message.compute_size())); codec.encode(message, &mut buf).unwrap() } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 71f4aae9b50..82de8dda73f 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -247,7 +247,7 @@ where } // Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue. -fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { +fn proto_to_message(rpc: &crate::rpc_proto::RPC) -> GossipsubRpc { // Store valid messages. let mut messages = Vec::with_capacity(rpc.publish.len()); let rpc = rpc.clone(); @@ -256,14 +256,14 @@ fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { source: message.from.map(|x| PeerId::from_bytes(&x).unwrap()), data: message.data.unwrap_or_default(), sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap()), signature: message.signature, // don't inform the application key: None, validated: false, }); } let mut control_msgs = Vec::new(); - if let Some(rpc_control) = rpc.control { + if let Some(rpc_control) = rpc.control.into_option() { // Collect the gossipsub control messages let ihave_msgs: Vec = rpc_control .ihave diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 9ea2e6ea49f..e14aed7a2b3 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -66,7 +66,7 @@ pub enum HandlerEvent { #[derive(Debug, Clone)] pub enum GossipsubHandlerIn { /// A gossipsub message to send. - Message(crate::rpc_proto::Rpc), + Message(crate::rpc_proto::RPC), /// The peer has joined the mesh. JoinedMesh, /// The peer has left the mesh. @@ -92,7 +92,7 @@ pub struct GossipsubHandler { inbound_substream: Option, /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[crate::rpc_proto::Rpc; 16]>, + send_queue: SmallVec<[crate::rpc_proto::RPC; 16]>, /// Flag indicating that an outbound substream is being established to prevent duplicate /// requests. @@ -150,7 +150,7 @@ enum OutboundSubstreamState { /// Waiting to send a message to the remote. PendingSend( Framed, - crate::rpc_proto::Rpc, + crate::rpc_proto::RPC, ), /// Waiting to flush the substream so that the data arrives to the remote. PendingFlush(Framed), @@ -188,7 +188,7 @@ impl ConnectionHandler for GossipsubHandler { type Error = GossipsubHandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; - type OutboundOpenInfo = crate::rpc_proto::Rpc; + type OutboundOpenInfo = crate::rpc_proto::RPC; type OutboundProtocol = ProtocolConfig; fn listen_protocol(&self) -> SubstreamProtocol { diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index c6aa2bdd56b..8c10c008534 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -37,7 +37,7 @@ use libp2p_core::{ identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo, }; use log::{debug, warn}; -use prost::Message as ProtobufMessage; +use protobuf::Message as ProtobufMessage; use std::{borrow::Cow, pin::Pin}; use unsigned_varint::codec; @@ -253,29 +253,31 @@ impl GossipsubCodec { let mut message_sig = message.clone(); message_sig.signature = None; message_sig.key = None; - let mut buf = Vec::with_capacity(message_sig.encoded_len()); - message_sig - .encode(&mut buf) - .expect("Buffer has sufficient capacity"); + let mut signature_bytes = SIGNING_PREFIX.to_vec(); - signature_bytes.extend_from_slice(&buf); + signature_bytes.extend_from_slice( + &message_sig + .write_to_bytes() + .expect("All fields to be initialized."), + ); public_key.verify(&signature_bytes, signature) } } impl Encoder for GossipsubCodec { - type Item = rpc_proto::Rpc; + type Item = rpc_proto::RPC; type Error = GossipsubHandlerError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - let mut buf = Vec::with_capacity(item.encoded_len()); - - item.encode(&mut buf) - .expect("Buffer has sufficient capacity"); - // length prefix the protobuf message, ensuring the max limit is not hit self.length_codec - .encode(Bytes::from(buf), dst) + .encode( + Bytes::from( + item.write_to_bytes() + .expect("All fields to be initialized."), + ), + dst, + ) .map_err(|_| GossipsubHandlerError::MaxTransmissionSize) } } @@ -296,7 +298,7 @@ impl Decoder for GossipsubCodec { None => return Ok(None), }; - let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(std::io::Error::from)?; + let rpc = rpc_proto::RPC::parse_from_bytes(&packet[..]).map_err(std::io::Error::from)?; // Store valid messages. let mut messages = Vec::with_capacity(rpc.publish.len()); @@ -351,7 +353,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap_or_default()), signature: None, // don't inform the application key: message.key, validated: false, @@ -371,7 +373,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap_or_default()), signature: None, // don't inform the application key: message.key, validated: false, @@ -396,7 +398,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap_or_default()), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -415,7 +417,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number: None, // don't inform the application - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap_or_default()), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -441,7 +443,7 @@ impl Decoder for GossipsubCodec { source: None, // don't bother inform the application data: message.data.unwrap_or_default(), sequence_number, - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap_or_default()), signature: message.signature, // don't inform the application key: message.key, validated: false, @@ -465,7 +467,7 @@ impl Decoder for GossipsubCodec { source, data: message.data.unwrap_or_default(), sequence_number, - topic: TopicHash::from_raw(message.topic), + topic: TopicHash::from_raw(message.topic.unwrap_or_default()), signature: message.signature, key: message.key, validated: false, @@ -474,7 +476,7 @@ impl Decoder for GossipsubCodec { let mut control_msgs = Vec::new(); - if let Some(rpc_control) = rpc.control { + if let Some(rpc_control) = rpc.control.into_option() { // Collect the gossipsub control messages let ihave_msgs: Vec = rpc_control .ihave diff --git a/protocols/gossipsub/src/rpc_proto.rs b/protocols/gossipsub/src/rpc_proto.rs index 7b952ef0926..c859dd50c60 100644 --- a/protocols/gossipsub/src/rpc_proto.rs +++ b/protocols/gossipsub/src/rpc_proto.rs @@ -18,19 +18,23 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. #![allow(clippy::derive_partial_eq_without_eq)] +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); +} -include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); +pub use protos::rpc::*; #[cfg(test)] mod test { use crate::IdentTopic as Topic; use libp2p_core::PeerId; - use prost::Message; + use protobuf::Message; use rand::Rng; - mod compat_proto { - include!(concat!(env!("OUT_DIR"), "/compat.pb.rs")); + mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } + use protos::compat as compat_proto; #[test] fn test_multi_topic_message_compatibility() { @@ -41,9 +45,10 @@ mod test { from: Some(PeerId::random().to_bytes()), data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()), - topic: topic1.clone().into_string(), + topic: Some(topic1.clone().into_string()), signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + ..super::Message::default() }; let old_message1 = compat_proto::Message { from: Some(PeerId::random().to_bytes()), @@ -52,6 +57,7 @@ mod test { topic_ids: vec![topic1.clone().into_string()], signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + ..compat_proto::Message::default() }; let old_message2 = compat_proto::Message { from: Some(PeerId::random().to_bytes()), @@ -60,24 +66,22 @@ mod test { topic_ids: vec![topic1.clone().into_string(), topic2.clone().into_string()], signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + ..compat_proto::Message::default() }; - let mut new_message1b = Vec::with_capacity(new_message1.encoded_len()); - new_message1.encode(&mut new_message1b).unwrap(); + let new_message1b = new_message1.write_to_bytes().unwrap(); - let mut old_message1b = Vec::with_capacity(old_message1.encoded_len()); - old_message1.encode(&mut old_message1b).unwrap(); + let old_message1b = old_message1.write_to_bytes().unwrap(); - let mut old_message2b = Vec::with_capacity(old_message2.encoded_len()); - old_message2.encode(&mut old_message2b).unwrap(); + let old_message2b = old_message2.write_to_bytes().unwrap(); - let new_message = super::Message::decode(&old_message1b[..]).unwrap(); - assert_eq!(new_message.topic, topic1.clone().into_string()); + let new_message = super::Message::parse_from_bytes(&old_message1b[..]).unwrap(); + assert_eq!(new_message.topic.unwrap(), topic1.clone().into_string()); - let new_message = super::Message::decode(&old_message2b[..]).unwrap(); - assert_eq!(new_message.topic, topic2.into_string()); + let new_message = super::Message::parse_from_bytes(&old_message2b[..]).unwrap(); + assert_eq!(new_message.topic.unwrap(), topic2.into_string()); - let old_message = compat_proto::Message::decode(&new_message1b[..]).unwrap(); + let old_message = compat_proto::Message::parse_from_bytes(&new_message1b[..]).unwrap(); assert_eq!(old_message.topic_ids, vec![topic1.into_string()]); } } diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 1de587a7632..ee8b20b672d 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -21,7 +21,7 @@ use crate::rpc_proto; use base64::encode; use prometheus_client::encoding::text::Encode; -use prost::Message; +use protobuf::Message; use sha2::{Digest, Sha256}; use std::fmt; @@ -47,15 +47,12 @@ impl Hasher for Sha256Hash { /// Creates a [`TopicHash`] by SHA256 hashing the topic then base64 encoding the /// hash. fn hash(topic_string: String) -> TopicHash { - let topic_descripter = rpc_proto::TopicDescriptor { - name: Some(topic_string), - auth: None, - enc: None, - }; - let mut bytes = Vec::with_capacity(topic_descripter.encoded_len()); - topic_descripter - .encode(&mut bytes) - .expect("buffer is large enough"); + let mut topic_descripter = rpc_proto::TopicDescriptor::new(); + topic_descripter.set_name(topic_string); + + let bytes = topic_descripter + .write_to_bytes() + .expect("All required fields to be initialized."); let hash = encode(Sha256::digest(&bytes).as_slice()); TopicHash { hash } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 29c72d1f044..658bdd1a94a 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -23,7 +23,7 @@ use crate::rpc_proto; use crate::TopicHash; use libp2p_core::{connection::ConnectionId, PeerId}; use prometheus_client::encoding::text::Encode; -use prost::Message; +use protobuf::Message; use std::fmt; use std::fmt::Debug; @@ -135,15 +135,15 @@ pub struct RawGossipsubMessage { impl RawGossipsubMessage { /// Calculates the encoded length of this message (used for calculating metrics). pub fn raw_protobuf_len(&self) -> usize { - let message = rpc_proto::Message { - from: self.source.map(|m| m.to_bytes()), - data: Some(self.data.clone()), - seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()), - topic: TopicHash::into_string(self.topic.clone()), - signature: self.signature.clone(), - key: self.key.clone(), - }; - message.encoded_len() + let mut message = rpc_proto::Message::new(); + message.from = self.source.map(|m| m.to_bytes()); + message.seqno = self.sequence_number.map(|s| s.to_be_bytes().to_vec()); + message.signature = self.signature.clone(); + message.key = self.key.clone(); + message.set_data(self.data.clone()); + message.set_topic(TopicHash::into_string(self.topic.clone())); + + usize::try_from(message.compute_size()).expect("Size of message fits in usize.") } } @@ -249,47 +249,43 @@ pub struct GossipsubRpc { impl GossipsubRpc { /// Converts the GossipsubRPC into its protobuf format. // A convenience function to avoid explicitly specifying types. - pub fn into_protobuf(self) -> rpc_proto::Rpc { + pub fn into_protobuf(self) -> rpc_proto::RPC { self.into() } } -impl From for rpc_proto::Rpc { +impl From for rpc_proto::RPC { /// Converts the RPC into protobuf format. fn from(rpc: GossipsubRpc) -> Self { // Messages let mut publish = Vec::new(); for message in rpc.messages.into_iter() { - let message = rpc_proto::Message { - from: message.source.map(|m| m.to_bytes()), - data: Some(message.data), - seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()), - topic: TopicHash::into_string(message.topic), - signature: message.signature, - key: message.key, - }; - - publish.push(message); + let mut msg = rpc_proto::Message::new(); + msg.from = message.source.map(|m| m.to_bytes()); + msg.seqno = message.sequence_number.map(|s| s.to_be_bytes().to_vec()); + msg.signature = message.signature; + msg.key = message.key; + msg.set_data(message.data); + msg.set_topic(TopicHash::into_string(message.topic)); + + publish.push(msg); } // subscriptions let subscriptions = rpc .subscriptions .into_iter() - .map(|sub| rpc_proto::rpc::SubOpts { - subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe), - topic_id: Some(sub.topic_hash.into_string()), + .map(|sub| { + let mut sub_opts = rpc_proto::rpc::SubOpts::new(); + sub_opts.set_subscribe(sub.action == GossipsubSubscriptionAction::Subscribe); + sub_opts.set_topic_id(sub.topic_hash.into_string()); + sub_opts }) .collect::>(); // control messages - let mut control = rpc_proto::ControlMessage { - ihave: Vec::new(), - iwant: Vec::new(), - graft: Vec::new(), - prune: Vec::new(), - }; + let mut control = rpc_proto::ControlMessage::new(); let empty_control_msg = rpc.control_msgs.is_empty(); @@ -300,22 +296,24 @@ impl From for rpc_proto::Rpc { topic_hash, message_ids, } => { - let rpc_ihave = rpc_proto::ControlIHave { - topic_id: Some(topic_hash.into_string()), - message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), - }; + let mut rpc_ihave = rpc_proto::ControlIHave::new(); + rpc_ihave.set_topic_id(topic_hash.into_string()); + rpc_ihave.message_ids = + message_ids.into_iter().map(|msg_id| msg_id.0).collect(); + control.ihave.push(rpc_ihave); } GossipsubControlAction::IWant { message_ids } => { - let rpc_iwant = rpc_proto::ControlIWant { - message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), - }; + let mut rpc_iwant = rpc_proto::ControlIWant::new(); + rpc_iwant.message_ids = + message_ids.into_iter().map(|msg_id| msg_id.0).collect(); + control.iwant.push(rpc_iwant); } GossipsubControlAction::Graft { topic_hash } => { - let rpc_graft = rpc_proto::ControlGraft { - topic_id: Some(topic_hash.into_string()), - }; + let mut rpc_graft = rpc_proto::ControlGraft::new(); + rpc_graft.set_topic_id(topic_hash.into_string()); + control.graft.push(rpc_graft); } GossipsubControlAction::Prune { @@ -323,32 +321,35 @@ impl From for rpc_proto::Rpc { peers, backoff, } => { - let rpc_prune = rpc_proto::ControlPrune { - topic_id: Some(topic_hash.into_string()), - peers: peers - .into_iter() - .map(|info| rpc_proto::PeerInfo { - peer_id: info.peer_id.map(|id| id.to_bytes()), - /// TODO, see https://github.com/libp2p/specs/pull/217 - signed_peer_record: None, - }) - .collect(), - backoff, - }; + let mut rpc_prune = rpc_proto::ControlPrune::new(); + + rpc_prune.set_topic_id(topic_hash.into_string()); + rpc_prune.backoff = backoff; + rpc_prune.peers = peers + .into_iter() + .map(|info| { + let mut peer = rpc_proto::PeerInfo::new(); + peer.peer_id = info.peer_id.map(|id| id.to_bytes()); + // TODO, see https://github.com/libp2p/specs/pull/217 + peer.signed_peer_record = None; + peer + }) + .collect(); + control.prune.push(rpc_prune); } } } - rpc_proto::Rpc { - subscriptions, - publish, - control: if empty_control_msg { - None - } else { - Some(control) - }, - } + let mut rpc = rpc_proto::RPC::new(); + rpc.subscriptions = subscriptions; + rpc.publish = publish; + rpc.control = if empty_control_msg { + protobuf::MessageField::none() + } else { + protobuf::MessageField::some(control) + }; + rpc } } diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index d52155bae1c..254c1129066 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -19,7 +19,7 @@ libp2p-swarm = { version = "0.40.0", path = "../../swarm" } log = "0.4.1" lru = "0.8.0" prost-codec = { version = "0.2", path = "../../misc/prost-codec" } -prost = "0.11" +protobuf = "3.2" smallvec = "1.6.1" thiserror = "1.0" void = "1.0" @@ -30,7 +30,7 @@ env_logger = "0.9" libp2p = { path = "../..", features = ["full"] } [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/identify/build.rs b/protocols/identify/build.rs index 56c7b20121a..e00accae79b 100644 --- a/protocols/identify/build.rs +++ b/protocols/identify/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/structs.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/structs.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index e4748eba07c..6cd63466371 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -70,6 +70,8 @@ mod handler; mod protocol; #[allow(clippy::derive_partial_eq_without_eq)] -mod structs_proto { - include!(concat!(env!("OUT_DIR"), "/structs.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } + +use protos::structs as structs_proto; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 470e926d1b8..442b3314a59 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -185,14 +185,13 @@ where let pubkey_bytes = info.public_key.to_protobuf_encoding(); - let message = structs_proto::Identify { - agent_version: Some(info.agent_version), - protocol_version: Some(info.protocol_version), - public_key: Some(pubkey_bytes), - listen_addrs, - observed_addr: Some(info.observed_addr.to_vec()), - protocols: info.protocols, - }; + let mut message = structs_proto::Identify::new(); + message.set_protocolVersion(info.protocol_version); + message.set_agentVersion(info.agent_version); + message.set_publicKey(pubkey_bytes); + message.set_observedAddr(info.observed_addr.to_vec()); + message.protocols = info.protocols; + message.listenAddrs = listen_addrs; let mut framed_io = FramedWrite::new( io, @@ -235,19 +234,19 @@ impl TryFrom for Info { let listen_addrs = { let mut addrs = Vec::new(); - for addr in msg.listen_addrs.into_iter() { + for addr in msg.listenAddrs.into_iter() { addrs.push(parse_multiaddr(addr)?); } addrs }; - let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default())?; + let public_key = PublicKey::from_protobuf_encoding(&msg.publicKey.unwrap_or_default())?; - let observed_addr = parse_multiaddr(msg.observed_addr.unwrap_or_default())?; + let observed_addr = parse_multiaddr(msg.observedAddr.unwrap_or_default())?; let info = Info { public_key, - protocol_version: msg.protocol_version.unwrap_or_default(), - agent_version: msg.agent_version.unwrap_or_default(), + protocol_version: msg.protocolVersion.unwrap_or_default(), + agent_version: msg.agentVersion.unwrap_or_default(), listen_addrs, protocols: msg.protocols, observed_addr, diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 89354c74aa8..fab1350de22 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -20,7 +20,7 @@ futures = "0.3.1" log = "0.4" libp2p-core = { version = "0.37.0", path = "../../core" } libp2p-swarm = { version = "0.40.0", path = "../../swarm" } -prost = "0.11" +protobuf = "3.2" rand = "0.8" sha2 = "0.10.0" smallvec = "1.6.1" @@ -39,7 +39,7 @@ libp2p = { path = "../..", features = ["full"] } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" [features] serde = ["dep:serde", "bytes/serde"] diff --git a/protocols/kad/build.rs b/protocols/kad/build.rs index f05e9e03190..ef38f16421a 100644 --- a/protocols/kad/build.rs +++ b/protocols/kad/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/dht.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/dht.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 67cee19818d..0bffdd35307 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -51,10 +51,12 @@ mod jobs; mod query; #[allow(clippy::derive_partial_eq_without_eq)] -mod dht_proto { - include!(concat!(env!("OUT_DIR"), "/dht.pb.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } +use protos::dht as dht_proto; + pub use addresses::Addresses; pub use behaviour::{ AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult, diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 707edd8fe02..0c5404f92c3 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -35,7 +35,7 @@ use futures::prelude::*; use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::{Multiaddr, PeerId}; -use prost::Message; +use protobuf::Message; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; use unsigned_varint::codec; @@ -63,10 +63,10 @@ impl From for KadConnectionType { fn from(raw: proto::message::ConnectionType) -> KadConnectionType { use proto::message::ConnectionType::*; match raw { - NotConnected => KadConnectionType::NotConnected, - Connected => KadConnectionType::Connected, - CanConnect => KadConnectionType::CanConnect, - CannotConnect => KadConnectionType::CannotConnect, + NOT_CONNECTED => KadConnectionType::NotConnected, + CONNECTED => KadConnectionType::Connected, + CAN_CONNECT => KadConnectionType::CanConnect, + CANNOT_CONNECT => KadConnectionType::CannotConnect, } } } @@ -75,10 +75,10 @@ impl From for proto::message::ConnectionType { fn from(val: KadConnectionType) -> Self { use proto::message::ConnectionType::*; match val { - KadConnectionType::NotConnected => NotConnected, - KadConnectionType::Connected => Connected, - KadConnectionType::CanConnect => CanConnect, - KadConnectionType::CannotConnect => CannotConnect, + KadConnectionType::NotConnected => NOT_CONNECTED, + KadConnectionType::Connected => CONNECTED, + KadConnectionType::CanConnect => CAN_CONNECT, + KadConnectionType::CannotConnect => CANNOT_CONNECT, } } } @@ -110,8 +110,10 @@ impl TryFrom for KadPeer { } debug_assert_eq!(addrs.len(), addrs.capacity()); - let connection_ty = proto::message::ConnectionType::from_i32(peer.connection) - .ok_or_else(|| invalid_data("unknown connection type"))? + let connection_ty = peer + .connection + .enum_value() + .map_err(|_| invalid_data("unknown connection type"))? .into(); Ok(KadPeer { @@ -124,14 +126,11 @@ impl TryFrom for KadPeer { impl From for proto::message::Peer { fn from(peer: KadPeer) -> Self { - proto::message::Peer { - id: peer.node_id.to_bytes(), - addrs: peer.multiaddrs.into_iter().map(|a| a.to_vec()).collect(), - connection: { - let ct: proto::message::ConnectionType = peer.connection_ty.into(); - ct as i32 - }, - } + let mut p = proto::message::Peer::new(); + p.id = peer.node_id.to_bytes(); + p.addrs = peer.multiaddrs.into_iter().map(|a| a.to_vec()).collect(); + p.connection = protobuf::EnumOrUnknown::new(peer.connection_ty.into()); + p } } @@ -200,14 +199,13 @@ where .err_into() .with::<_, _, fn(_) -> _, _>(|response| { let proto_struct = resp_msg_to_proto(response); - let mut buf = Vec::with_capacity(proto_struct.encoded_len()); - proto_struct - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - future::ready(Ok(io::Cursor::new(buf))) + let bytes = proto_struct + .write_to_bytes() + .expect("All required fields are initialized."); + future::ready(Ok(io::Cursor::new(bytes))) }) .and_then::<_, fn(_) -> _>(|bytes| { - let request = match proto::Message::decode(bytes) { + let request = match proto::Message::parse_from_bytes(bytes.as_ref()) { Ok(r) => r, Err(err) => return future::ready(Err(err.into())), }; @@ -234,14 +232,13 @@ where .err_into() .with::<_, _, fn(_) -> _, _>(|request| { let proto_struct = req_msg_to_proto(request); - let mut buf = Vec::with_capacity(proto_struct.encoded_len()); - proto_struct - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - future::ready(Ok(io::Cursor::new(buf))) + let bytes = proto_struct + .write_to_bytes() + .expect("All required fields to be initialized."); + future::ready(Ok(io::Cursor::new(bytes))) }) .and_then::<_, fn(_) -> _>(|bytes| { - let response = match proto::Message::decode(bytes) { + let response = match proto::Message::parse_from_bytes(bytes.as_ref()) { Ok(r) => r, Err(err) => return future::ready(Err(err.into())), }; @@ -348,38 +345,38 @@ pub enum KadResponseMsg { fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message { match kad_msg { KadRequestMsg::Ping => proto::Message { - r#type: proto::message::MessageType::Ping as i32, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::PING), ..proto::Message::default() }, KadRequestMsg::FindNode { key } => proto::Message { - r#type: proto::message::MessageType::FindNode as i32, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::FIND_NODE), key, - cluster_level_raw: 10, + clusterLevelRaw: 10, ..proto::Message::default() }, KadRequestMsg::GetProviders { key } => proto::Message { - r#type: proto::message::MessageType::GetProviders as i32, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::GET_PROVIDERS), key: key.to_vec(), - cluster_level_raw: 10, + clusterLevelRaw: 10, ..proto::Message::default() }, KadRequestMsg::AddProvider { key, provider } => proto::Message { - r#type: proto::message::MessageType::AddProvider as i32, - cluster_level_raw: 10, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::ADD_PROVIDER), + clusterLevelRaw: 10, key: key.to_vec(), - provider_peers: vec![provider.into()], + providerPeers: vec![provider.into()], ..proto::Message::default() }, KadRequestMsg::GetValue { key } => proto::Message { - r#type: proto::message::MessageType::GetValue as i32, - cluster_level_raw: 10, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::GET_VALUE), + clusterLevelRaw: 10, key: key.to_vec(), ..proto::Message::default() }, KadRequestMsg::PutValue { record } => proto::Message { - r#type: proto::message::MessageType::PutValue as i32, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::PUT_VALUE), key: record.key.to_vec(), - record: Some(record_to_proto(record)), + record: protobuf::MessageField::some(record_to_proto(record)), ..proto::Message::default() }, } @@ -389,39 +386,39 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message { fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message { match kad_msg { KadResponseMsg::Pong => proto::Message { - r#type: proto::message::MessageType::Ping as i32, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::PING), ..proto::Message::default() }, KadResponseMsg::FindNode { closer_peers } => proto::Message { - r#type: proto::message::MessageType::FindNode as i32, - cluster_level_raw: 9, - closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(), + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::FIND_NODE), + clusterLevelRaw: 9, + closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(), ..proto::Message::default() }, KadResponseMsg::GetProviders { closer_peers, provider_peers, } => proto::Message { - r#type: proto::message::MessageType::GetProviders as i32, - cluster_level_raw: 9, - closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(), - provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(), + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::GET_PROVIDERS), + clusterLevelRaw: 9, + closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(), + providerPeers: provider_peers.into_iter().map(KadPeer::into).collect(), ..proto::Message::default() }, KadResponseMsg::GetValue { record, closer_peers, } => proto::Message { - r#type: proto::message::MessageType::GetValue as i32, - cluster_level_raw: 9, - closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(), - record: record.map(record_to_proto), + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::GET_VALUE), + clusterLevelRaw: 9, + closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(), + record: protobuf::MessageField::from_option(record.map(record_to_proto)), ..proto::Message::default() }, KadResponseMsg::PutValue { key, value } => proto::Message { - r#type: proto::message::MessageType::PutValue as i32, + type_: protobuf::EnumOrUnknown::new(proto::message::MessageType::PUT_VALUE), key: key.to_vec(), - record: Some(proto::Record { + record: protobuf::MessageField::some(proto::Record { key: key.to_vec(), value, ..proto::Record::default() @@ -435,28 +432,30 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message { /// /// Fails if the protobuf message is not a valid and supported Kademlia request message. fn proto_to_req_msg(message: proto::Message) -> Result { - let msg_type = proto::message::MessageType::from_i32(message.r#type) - .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?; + let msg_type = message + .type_ + .enum_value() + .map_err(|ty| invalid_data(format!("unknown message type: {}", ty)))?; match msg_type { - proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping), - proto::message::MessageType::PutValue => { + proto::message::MessageType::PING => Ok(KadRequestMsg::Ping), + proto::message::MessageType::PUT_VALUE => { let record = record_from_proto(message.record.unwrap_or_default())?; Ok(KadRequestMsg::PutValue { record }) } - proto::message::MessageType::GetValue => Ok(KadRequestMsg::GetValue { + proto::message::MessageType::GET_VALUE => Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key), }), - proto::message::MessageType::FindNode => Ok(KadRequestMsg::FindNode { key: message.key }), - proto::message::MessageType::GetProviders => Ok(KadRequestMsg::GetProviders { + proto::message::MessageType::FIND_NODE => Ok(KadRequestMsg::FindNode { key: message.key }), + proto::message::MessageType::GET_PROVIDERS => Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key), }), - proto::message::MessageType::AddProvider => { + proto::message::MessageType::ADD_PROVIDER => { // TODO: for now we don't parse the peer properly, so it is possible that we get // parsing errors for peers even when they are valid; we ignore these // errors for now, but ultimately we should just error altogether let provider = message - .provider_peers + .providerPeers .into_iter() .find_map(|peer| KadPeer::try_from(peer).ok()); @@ -474,20 +473,22 @@ fn proto_to_req_msg(message: proto::Message) -> Result /// /// Fails if the protobuf message is not a valid and supported Kademlia response message. fn proto_to_resp_msg(message: proto::Message) -> Result { - let msg_type = proto::message::MessageType::from_i32(message.r#type) - .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?; + let msg_type = message + .type_ + .enum_value() + .map_err(|ty| invalid_data(format!("unknown message type: {}", ty)))?; match msg_type { - proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong), - proto::message::MessageType::GetValue => { - let record = if let Some(r) = message.record { + proto::message::MessageType::PING => Ok(KadResponseMsg::Pong), + proto::message::MessageType::GET_VALUE => { + let record = if let Some(r) = message.record.into_option() { Some(record_from_proto(r)?) } else { None }; let closer_peers = message - .closer_peers + .closerPeers .into_iter() .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect(); @@ -498,9 +499,9 @@ fn proto_to_resp_msg(message: proto::Message) -> Result { + proto::message::MessageType::FIND_NODE => { let closer_peers = message - .closer_peers + .closerPeers .into_iter() .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect(); @@ -508,15 +509,15 @@ fn proto_to_resp_msg(message: proto::Message) -> Result { + proto::message::MessageType::GET_PROVIDERS => { let closer_peers = message - .closer_peers + .closerPeers .into_iter() .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect(); let provider_peers = message - .provider_peers + .providerPeers .into_iter() .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect(); @@ -527,10 +528,11 @@ fn proto_to_resp_msg(message: proto::Message) -> Result { + proto::message::MessageType::PUT_VALUE => { let key = record::Key::from(message.key); let rec = message .record + .into_option() .ok_or_else(|| invalid_data("received PutValue message with no record"))?; Ok(KadResponseMsg::PutValue { @@ -539,7 +541,7 @@ fn proto_to_resp_msg(message: proto::Message) -> Result { + proto::message::MessageType::ADD_PROVIDER => { Err(invalid_data("received an unexpected AddProvider message")) } } @@ -587,7 +589,8 @@ fn record_to_proto(record: Record) -> proto::Record { } }) .unwrap_or(0), - time_received: String::new(), + timeReceived: String::new(), + ..proto::Record::default() } } diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index f4b12ccb991..971b89d9b2a 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -22,7 +22,7 @@ libp2p-swarm = { version = "0.40.0", path = "../../swarm" } log = "0.4" pin-project = "1" prost-codec = { version = "0.2", path = "../../misc/prost-codec" } -prost = "0.11" +protobuf = "3.2" rand = "0.8.4" smallvec = "1.6.1" static_assertions = "1" @@ -30,7 +30,7 @@ thiserror = "1.0" void = "1" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" [dev-dependencies] env_logger = "0.9.0" diff --git a/protocols/relay/build.rs b/protocols/relay/build.rs index 7859f6171f7..7a8be5bba36 100644 --- a/protocols/relay/build.rs +++ b/protocols/relay/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/v2/message.proto"], &["src/v2"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src/v2"]) + .input("src/v2/message.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/relay/src/v2.rs b/protocols/relay/src/v2.rs index dcfcdf609fb..ff655069d91 100644 --- a/protocols/relay/src/v2.rs +++ b/protocols/relay/src/v2.rs @@ -22,10 +22,12 @@ //! specification](https://github.com/libp2p/specs/issues/314). #[allow(clippy::derive_partial_eq_without_eq)] -mod message_proto { - include!(concat!(env!("OUT_DIR"), "/message_v2.pb.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } +use protos::message as message_proto; + pub mod client; mod copy_future; mod protocol; diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index a346a4eaeb3..ecb90870405 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -267,7 +267,7 @@ impl ConnectionHandler for Handler { .circuit_deny_futs .insert( src_peer_id, - inbound_circuit.deny(Status::NoReservation).boxed(), + inbound_circuit.deny(Status::NO_RESERVATION).boxed(), ) .is_some() { diff --git a/protocols/relay/src/v2/protocol/inbound_hop.rs b/protocols/relay/src/v2/protocol/inbound_hop.rs index 2a6cf01c565..3d8efa58d25 100644 --- a/protocols/relay/src/v2/protocol/inbound_hop.rs +++ b/protocols/relay/src/v2/protocol/inbound_hop.rs @@ -55,31 +55,41 @@ impl upgrade::InboundUpgrade for Upgrade { async move { let HopMessage { - r#type, + type_, peer, reservation: _, limit: _, status: _, + .. } = substream .next() .await .ok_or(FatalUpgradeError::StreamClosed)??; - let r#type = - hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; - let req = match r#type { - hop_message::Type::Reserve => Req::Reserve(ReservationReq { + let ty = type_ + .ok_or(FatalUpgradeError::ParseTypeField)? + .enum_value() + .or(Err(FatalUpgradeError::ParseTypeField))?; + + let req = match ty { + hop_message::Type::RESERVE => Req::Reserve(ReservationReq { substream, reservation_duration: self.reservation_duration, max_circuit_duration: self.max_circuit_duration, max_circuit_bytes: self.max_circuit_bytes, }), - hop_message::Type::Connect => { - let dst = PeerId::from_bytes(&peer.ok_or(FatalUpgradeError::MissingPeer)?.id) - .map_err(|_| FatalUpgradeError::ParsePeerId)?; + hop_message::Type::CONNECT => { + let dst = PeerId::from_bytes( + &peer + .into_option() + .ok_or(FatalUpgradeError::MissingPeer)? + .id + .ok_or(FatalUpgradeError::ParsePeerId)?, + ) + .map_err(|_| FatalUpgradeError::ParsePeerId)?; Req::Connect(CircuitReq { dst, substream }) } - hop_message::Type::Status => { + hop_message::Type::STATUS => { return Err(FatalUpgradeError::UnexpectedTypeStatus.into()) } }; @@ -136,40 +146,37 @@ pub struct ReservationReq { impl ReservationReq { pub async fn accept(self, addrs: Vec) -> Result<(), UpgradeError> { - let msg = HopMessage { - r#type: hop_message::Type::Status.into(), - peer: None, - reservation: Some(Reservation { - addrs: addrs.into_iter().map(|a| a.to_vec()).collect(), - expire: (SystemTime::now() + self.reservation_duration) + let mut msg = HopMessage::new(); + msg.set_type(hop_message::Type::STATUS); + msg.reservation = protobuf::MessageField::some(Reservation { + addrs: addrs.into_iter().map(|a| a.to_vec()).collect(), + expire: Some( + (SystemTime::now() + self.reservation_duration) .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(), - voucher: None, - }), - limit: Some(Limit { - duration: Some( - self.max_circuit_duration - .as_secs() - .try_into() - .expect("`max_circuit_duration` not to exceed `u32::MAX`."), - ), - data: Some(self.max_circuit_bytes), - }), - status: Some(Status::Ok.into()), - }; + ), + ..Reservation::default() + }); + msg.limit = protobuf::MessageField::some(Limit { + duration: Some( + self.max_circuit_duration + .as_secs() + .try_into() + .expect("`max_circuit_duration` not to exceed `u32::MAX`."), + ), + data: Some(self.max_circuit_bytes), + ..Limit::default() + }); + msg.set_status(Status::OK); self.send(msg).await } pub async fn deny(self, status: Status) -> Result<(), UpgradeError> { - let msg = HopMessage { - r#type: hop_message::Type::Status.into(), - peer: None, - reservation: None, - limit: None, - status: Some(status.into()), - }; + let mut msg = HopMessage::new(); + msg.set_type(hop_message::Type::STATUS); + msg.set_status(status); self.send(msg).await } @@ -194,13 +201,9 @@ impl CircuitReq { } pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { - let msg = HopMessage { - r#type: hop_message::Type::Status.into(), - peer: None, - reservation: None, - limit: None, - status: Some(Status::Ok.into()), - }; + let mut msg = HopMessage::new(); + msg.set_type(hop_message::Type::STATUS); + msg.set_status(Status::OK); self.send(msg).await?; @@ -219,13 +222,10 @@ impl CircuitReq { } pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { - let msg = HopMessage { - r#type: hop_message::Type::Status.into(), - peer: None, - reservation: None, - limit: None, - status: Some(status.into()), - }; + let mut msg = HopMessage::new(); + msg.set_type(hop_message::Type::STATUS); + msg.set_status(status); + self.send(msg).await?; self.substream.close().await.map_err(Into::into) } diff --git a/protocols/relay/src/v2/protocol/inbound_stop.rs b/protocols/relay/src/v2/protocol/inbound_stop.rs index 8a23f34e7fa..fb19f3a4dae 100644 --- a/protocols/relay/src/v2/protocol/inbound_stop.rs +++ b/protocols/relay/src/v2/protocol/inbound_stop.rs @@ -49,29 +49,37 @@ impl upgrade::InboundUpgrade for Upgrade { async move { let StopMessage { - r#type, + type_, peer, limit, status: _, + .. } = substream .next() .await .ok_or(FatalUpgradeError::StreamClosed)??; - let r#type = - stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; - match r#type { - stop_message::Type::Connect => { - let src_peer_id = - PeerId::from_bytes(&peer.ok_or(FatalUpgradeError::MissingPeer)?.id) - .map_err(|_| FatalUpgradeError::ParsePeerId)?; + let ty = type_ + .ok_or(FatalUpgradeError::ParseTypeField)? + .enum_value() + .or(Err(FatalUpgradeError::ParseTypeField))?; + match ty { + stop_message::Type::CONNECT => { + let src_peer_id = PeerId::from_bytes( + &peer + .into_option() + .ok_or(FatalUpgradeError::MissingPeer)? + .id + .ok_or(FatalUpgradeError::ParsePeerId)?, + ) + .map_err(|_| FatalUpgradeError::ParsePeerId)?; Ok(Circuit { substream, src_peer_id, - limit: limit.map(Into::into), + limit: limit.into_option().map(Into::into), }) } - stop_message::Type::Status => Err(FatalUpgradeError::UnexpectedTypeStatus.into()), + stop_message::Type::STATUS => Err(FatalUpgradeError::UnexpectedTypeStatus.into()), } } .boxed() @@ -126,12 +134,9 @@ impl Circuit { } pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { - let msg = StopMessage { - r#type: stop_message::Type::Status.into(), - peer: None, - limit: None, - status: Some(Status::Ok.into()), - }; + let mut msg = StopMessage::new(); + msg.set_type(stop_message::Type::STATUS); + msg.set_status(Status::OK); self.send(msg).await?; @@ -150,12 +155,9 @@ impl Circuit { } pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { - let msg = StopMessage { - r#type: stop_message::Type::Status.into(), - peer: None, - limit: None, - status: Some(status.into()), - }; + let mut msg = StopMessage::new(); + msg.set_type(stop_message::Type::STATUS); + msg.set_status(status); self.send(msg).await.map_err(Into::into) } diff --git a/protocols/relay/src/v2/protocol/outbound_hop.rs b/protocols/relay/src/v2/protocol/outbound_hop.rs index c8381df31a5..6a004be8d52 100644 --- a/protocols/relay/src/v2/protocol/outbound_hop.rs +++ b/protocols/relay/src/v2/protocol/outbound_hop.rs @@ -52,23 +52,20 @@ impl upgrade::OutboundUpgrade for Upgrade { fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { let msg = match self { - Upgrade::Reserve => HopMessage { - r#type: hop_message::Type::Reserve.into(), - peer: None, - reservation: None, - limit: None, - status: None, - }, - Upgrade::Connect { dst_peer_id } => HopMessage { - r#type: hop_message::Type::Connect.into(), - peer: Some(Peer { - id: dst_peer_id.to_bytes(), - addrs: vec![], - }), - reservation: None, - limit: None, - status: None, - }, + Upgrade::Reserve => { + let mut msg = HopMessage::new(); + msg.set_type(hop_message::Type::RESERVE); + msg + } + Upgrade::Connect { dst_peer_id } => { + let mut msg = HopMessage::new(); + msg.set_type(hop_message::Type::CONNECT); + msg.peer = protobuf::MessageField::some(Peer { + id: Some(dst_peer_id.to_bytes()), + ..Peer::default() + }); + msg + } }; let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); @@ -76,62 +73,62 @@ impl upgrade::OutboundUpgrade for Upgrade { async move { substream.send(msg).await?; let HopMessage { - r#type, + type_, peer: _, reservation, limit, status, + .. } = substream .next() .await .ok_or(FatalUpgradeError::StreamClosed)??; - let r#type = - hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; - match r#type { - hop_message::Type::Connect => { + let ty = type_ + .ok_or(FatalUpgradeError::ParseTypeField)? + .enum_value() + .or(Err(FatalUpgradeError::ParseTypeField))?; + + match ty { + hop_message::Type::CONNECT => { return Err(FatalUpgradeError::UnexpectedTypeConnect.into()) } - hop_message::Type::Reserve => { + hop_message::Type::RESERVE => { return Err(FatalUpgradeError::UnexpectedTypeReserve.into()) } - hop_message::Type::Status => {} + hop_message::Type::STATUS => {} } - let status = Status::from_i32(status.ok_or(FatalUpgradeError::MissingStatusField)?) - .ok_or(FatalUpgradeError::ParseStatusField)?; + let status = status + .ok_or(FatalUpgradeError::MissingStatusField)? + .enum_value() + .or(Err(FatalUpgradeError::ParseStatusField))?; - let limit = limit.map(Into::into); + let limit = limit.into_option().map(Into::into); let output = match self { Upgrade::Reserve => { match status { - Status::Ok => {} - Status::ReservationRefused => { + Status::OK => {} + Status::RESERVATION_REFUSED => { return Err(ReservationFailedReason::Refused.into()) } - Status::ResourceLimitExceeded => { + Status::RESOURCE_LIMIT_EXCEEDED => { return Err(ReservationFailedReason::ResourceLimitExceeded.into()) } s => return Err(FatalUpgradeError::UnexpectedStatus(s).into()), } - let reservation = - reservation.ok_or(FatalUpgradeError::MissingReservationField)?; + let reservation = reservation + .into_option() + .ok_or(FatalUpgradeError::MissingReservationField)?; if reservation.addrs.is_empty() { return Err(FatalUpgradeError::NoAddressesInReservation.into()); } - let addrs = reservation - .addrs - .into_iter() - .map(TryFrom::try_from) - .collect::, _>>() - .map_err(|_| FatalUpgradeError::InvalidReservationAddrs)?; - let renewal_timeout = reservation - .expire + .expire() .checked_sub( SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -144,6 +141,13 @@ impl upgrade::OutboundUpgrade for Upgrade { .map(Delay::new) .ok_or(FatalUpgradeError::InvalidReservationExpiration)?; + let addrs = reservation + .addrs + .into_iter() + .map(TryFrom::try_from) + .collect::, _>>() + .map_err(|_| FatalUpgradeError::InvalidReservationAddrs)?; + substream.close().await?; Output::Reservation { @@ -154,17 +158,17 @@ impl upgrade::OutboundUpgrade for Upgrade { } Upgrade::Connect { .. } => { match status { - Status::Ok => {} - Status::ResourceLimitExceeded => { + Status::OK => {} + Status::RESOURCE_LIMIT_EXCEEDED => { return Err(CircuitFailedReason::ResourceLimitExceeded.into()) } - Status::ConnectionFailed => { + Status::CONNECTION_FAILED => { return Err(CircuitFailedReason::ConnectionFailed.into()) } - Status::NoReservation => { + Status::NO_RESERVATION => { return Err(CircuitFailedReason::NoReservation.into()) } - Status::PermissionDenied => { + Status::PERMISSION_DENIED => { return Err(CircuitFailedReason::PermissionDenied.into()) } s => return Err(FatalUpgradeError::UnexpectedStatus(s).into()), diff --git a/protocols/relay/src/v2/protocol/outbound_stop.rs b/protocols/relay/src/v2/protocol/outbound_stop.rs index 26b51407ec0..81b21d69c78 100644 --- a/protocols/relay/src/v2/protocol/outbound_stop.rs +++ b/protocols/relay/src/v2/protocol/outbound_stop.rs @@ -51,55 +51,62 @@ impl upgrade::OutboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - let msg = StopMessage { - r#type: stop_message::Type::Connect.into(), - peer: Some(Peer { - id: self.relay_peer_id.to_bytes(), - addrs: vec![], - }), - limit: Some(Limit { - duration: Some( - self.max_circuit_duration - .as_secs() - .try_into() - .expect("`max_circuit_duration` not to exceed `u32::MAX`."), - ), - data: Some(self.max_circuit_bytes), - }), - status: None, - }; + let mut msg = StopMessage::new(); + msg.set_type(stop_message::Type::CONNECT); + msg.peer = protobuf::MessageField::some(Peer { + id: Some(self.relay_peer_id.to_bytes()), + addrs: vec![], + ..Peer::default() + }); + msg.limit = protobuf::MessageField::some(Limit { + duration: Some( + self.max_circuit_duration + .as_secs() + .try_into() + .expect("`max_circuit_duration` not to exceed `u32::MAX`."), + ), + data: Some(self.max_circuit_bytes), + ..Limit::default() + }); let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { substream.send(msg).await?; let StopMessage { - r#type, + type_, peer: _, limit: _, status, + .. } = substream .next() .await .ok_or(FatalUpgradeError::StreamClosed)??; - let r#type = - stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; - match r#type { - stop_message::Type::Connect => { + let ty = type_ + .ok_or(FatalUpgradeError::ParseTypeField)? + .enum_value() + .or(Err(FatalUpgradeError::ParseTypeField))?; + + match ty { + stop_message::Type::CONNECT => { return Err(FatalUpgradeError::UnexpectedTypeConnect.into()) } - stop_message::Type::Status => {} + stop_message::Type::STATUS => {} } - let status = Status::from_i32(status.ok_or(FatalUpgradeError::MissingStatusField)?) - .ok_or(FatalUpgradeError::ParseStatusField)?; + let status = status + .ok_or(FatalUpgradeError::MissingStatusField)? + .enum_value() + .or(Err(FatalUpgradeError::ParseStatusField))?; + match status { - Status::Ok => {} - Status::ResourceLimitExceeded => { + Status::OK => {} + Status::RESOURCE_LIMIT_EXCEEDED => { return Err(CircuitFailedReason::ResourceLimitExceeded.into()) } - Status::PermissionDenied => { + Status::PERMISSION_DENIED => { return Err(CircuitFailedReason::PermissionDenied.into()) } s => return Err(FatalUpgradeError::UnexpectedStatus(s).into()), diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index 5b1eb810f60..654ab5a3ee8 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -315,7 +315,7 @@ impl NetworkBehaviour for Relay { peer_id: event_source, event: Either::Left(handler::In::DenyReservationReq { inbound_reservation_req, - status: message_proto::Status::ResourceLimitExceeded, + status: message_proto::Status::RESOURCE_LIMIT_EXCEEDED, }), } .into() @@ -430,7 +430,7 @@ impl NetworkBehaviour for Relay { event: Either::Left(handler::In::DenyCircuitReq { circuit_id: None, inbound_circuit_req, - status: message_proto::Status::ResourceLimitExceeded, + status: message_proto::Status::RESOURCE_LIMIT_EXCEEDED, }), } } else if let Some(dst_conn) = self @@ -466,7 +466,7 @@ impl NetworkBehaviour for Relay { event: Either::Left(handler::In::DenyCircuitReq { circuit_id: None, inbound_circuit_req, - status: message_proto::Status::NoReservation, + status: message_proto::Status::NO_RESERVATION, }), } }; diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 1c6987692fa..f12a3ade9aa 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -652,10 +652,10 @@ impl ConnectionHandler for Handler { ) { let (non_fatal_error, status) = match error { ConnectionHandlerUpgrErr::Timeout => { - (ConnectionHandlerUpgrErr::Timeout, Status::ConnectionFailed) + (ConnectionHandlerUpgrErr::Timeout, Status::CONNECTION_FAILED) } ConnectionHandlerUpgrErr::Timer => { - (ConnectionHandlerUpgrErr::Timer, Status::ConnectionFailed) + (ConnectionHandlerUpgrErr::Timer, Status::CONNECTION_FAILED) } ConnectionHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::Failed, @@ -685,10 +685,10 @@ impl ConnectionHandler for Handler { outbound_stop::UpgradeError::CircuitFailed(error) => { let status = match error { outbound_stop::CircuitFailedReason::ResourceLimitExceeded => { - Status::ResourceLimitExceeded + Status::RESOURCE_LIMIT_EXCEEDED } outbound_stop::CircuitFailedReason::PermissionDenied => { - Status::PermissionDenied + Status::PERMISSION_DENIED } }; ( diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 378ec567636..7cf1522fa9c 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] asynchronous-codec = "0.6" libp2p-core = { version = "0.37.0", path = "../../core" } libp2p-swarm = { version = "0.40.0", path = "../../swarm" } -prost = "0.11" +protobuf = "3.2" void = "1" log = "0.4" futures = { version = "0.3", default-features = false, features = ["std"] } @@ -34,7 +34,7 @@ rand = "0.8" tokio = { version = "1.15", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] } [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/rendezvous/build.rs b/protocols/rendezvous/build.rs index fa982fa3d90..adc33c01589 100644 --- a/protocols/rendezvous/build.rs +++ b/protocols/rendezvous/build.rs @@ -1,3 +1,10 @@ fn main() { - prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/rpc.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/protocols/rendezvous/src/codec.rs b/protocols/rendezvous/src/codec.rs index 375ebd6c228..abecece1474 100644 --- a/protocols/rendezvous/src/codec.rs +++ b/protocols/rendezvous/src/codec.rs @@ -219,18 +219,16 @@ impl Encoder for RendezvousCodec { type Error = Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - use prost::Message; + use protobuf::Message; let message = wire::Message::from(item); - let mut buf = Vec::with_capacity(message.encoded_len()); - - message - .encode(&mut buf) - .expect("Buffer has sufficient capacity"); + let bytes = message + .write_to_bytes() + .expect("All required fields to be initialized."); // Length prefix the protobuf message, ensuring the max limit is not hit - self.length_codec.encode(Bytes::from(buf), dst)?; + self.length_codec.encode(Bytes::from(bytes), dst)?; Ok(()) } @@ -241,14 +239,14 @@ impl Decoder for RendezvousCodec { type Error = Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - use prost::Message; + use protobuf::Message; let message = match self.length_codec.decode(src)? { Some(p) => p, None => return Ok(None), }; - let message = wire::Message::decode(message)?; + let message = wire::Message::parse_from_bytes(message.as_ref()).map_err(Error::Decode)?; Ok(Some(message.try_into()?)) } @@ -257,9 +255,9 @@ impl Decoder for RendezvousCodec { #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Failed to encode message as bytes")] - Encode(#[from] prost::EncodeError), + Encode(protobuf::Error), #[error("Failed to decode message from bytes")] - Decode(#[from] prost::DecodeError), + Decode(protobuf::Error), #[error("Failed to read/write")] Io(#[from] std::io::Error), #[error("Failed to convert wire message to internal data model")] @@ -275,106 +273,93 @@ impl From for wire::Message { namespace, record, ttl, - }) => wire::Message { - r#type: Some(MessageType::Register.into()), - register: Some(Register { + }) => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::REGISTER); + msg.register = protobuf::MessageField::some(Register { ns: Some(namespace.into()), ttl, - signed_peer_record: Some( - record.into_signed_envelope().into_protobuf_encoding(), - ), - }), - register_response: None, - unregister: None, - discover: None, - discover_response: None, - }, - Message::RegisterResponse(Ok(ttl)) => wire::Message { - r#type: Some(MessageType::RegisterResponse.into()), - register_response: Some(RegisterResponse { - status: Some(ResponseStatus::Ok.into()), - status_text: None, - ttl: Some(ttl), - }), - register: None, - discover: None, - unregister: None, - discover_response: None, - }, - Message::RegisterResponse(Err(error)) => wire::Message { - r#type: Some(MessageType::RegisterResponse.into()), - register_response: Some(RegisterResponse { - status: Some(ResponseStatus::from(error).into()), - status_text: None, - ttl: None, - }), - register: None, - discover: None, - unregister: None, - discover_response: None, - }, - Message::Unregister(namespace) => wire::Message { - r#type: Some(MessageType::Unregister.into()), - unregister: Some(Unregister { + signedPeerRecord: Some(record.into_signed_envelope().into_protobuf_encoding()), + ..Register::default() + }); + msg + } + Message::RegisterResponse(Ok(ttl)) => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::REGISTER_RESPONSE); + msg.registerResponse = protobuf::MessageField::some({ + let mut res = RegisterResponse::new(); + res.set_status(ResponseStatus::OK); + res.set_ttl(ttl); + res + }); + msg + } + Message::RegisterResponse(Err(error)) => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::REGISTER_RESPONSE); + msg.registerResponse = protobuf::MessageField::some({ + let mut res = RegisterResponse::new(); + res.set_status(ResponseStatus::from(error)); + res + }); + msg + } + Message::Unregister(namespace) => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::UNREGISTER); + msg.unregister = protobuf::MessageField::some(Unregister { ns: Some(namespace.into()), id: None, - }), - register: None, - register_response: None, - discover: None, - discover_response: None, - }, + ..Unregister::default() + }); + msg + } Message::Discover { namespace, cookie, limit, - } => wire::Message { - r#type: Some(MessageType::Discover.into()), - discover: Some(Discover { + } => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::DISCOVER); + msg.discover = protobuf::MessageField::some(Discover { ns: namespace.map(|ns| ns.into()), cookie: cookie.map(|cookie| cookie.into_wire_encoding()), limit, - }), - register: None, - register_response: None, - unregister: None, - discover_response: None, - }, - Message::DiscoverResponse(Ok((registrations, cookie))) => wire::Message { - r#type: Some(MessageType::DiscoverResponse.into()), - discover_response: Some(DiscoverResponse { + ..Discover::default() + }); + msg + } + Message::DiscoverResponse(Ok((registrations, cookie))) => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::DISCOVER_RESPONSE); + msg.discoverResponse = protobuf::MessageField::some(DiscoverResponse { registrations: registrations .into_iter() .map(|reggo| Register { ns: Some(reggo.namespace.into()), ttl: Some(reggo.ttl), - signed_peer_record: Some( + signedPeerRecord: Some( reggo.record.into_signed_envelope().into_protobuf_encoding(), ), + ..Register::default() }) .collect(), - status: Some(ResponseStatus::Ok.into()), - status_text: None, + status: Some(ResponseStatus::OK.into()), cookie: Some(cookie.into_wire_encoding()), - }), - register: None, - discover: None, - unregister: None, - register_response: None, - }, - Message::DiscoverResponse(Err(error)) => wire::Message { - r#type: Some(MessageType::DiscoverResponse.into()), - discover_response: Some(DiscoverResponse { - registrations: Vec::new(), + ..DiscoverResponse::default() + }); + msg + } + Message::DiscoverResponse(Err(error)) => { + let mut msg = wire::Message::new(); + msg.set_type(MessageType::DISCOVER_RESPONSE); + msg.discoverResponse = protobuf::MessageField::some(DiscoverResponse { status: Some(ResponseStatus::from(error).into()), - status_text: None, - cookie: None, - }), - register: None, - discover: None, - unregister: None, - register_response: None, - }, + ..DiscoverResponse::default() + }); + msg + } } } } @@ -385,118 +370,125 @@ impl TryFrom for Message { fn try_from(message: wire::Message) -> Result { use wire::message::*; - let message = match message { - wire::Message { - r#type: Some(0), - register: - Some(Register { - ns, - ttl, - signed_peer_record: Some(signed_peer_record), - }), - .. - } => Message::Register(NewRegistration { - namespace: ns - .map(Namespace::new) - .transpose()? - .ok_or(ConversionError::MissingNamespace)?, - ttl, - record: PeerRecord::from_signed_envelope(SignedEnvelope::from_protobuf_encoding( - &signed_peer_record, - )?)?, - }), - wire::Message { - r#type: Some(1), - register_response: - Some(RegisterResponse { - status: Some(0), - ttl, - .. - }), - .. - } => Message::RegisterResponse(Ok(ttl.ok_or(ConversionError::MissingTtl)?)), - wire::Message { - r#type: Some(3), - discover: Some(Discover { ns, limit, cookie }), - .. - } => Message::Discover { - namespace: ns.map(Namespace::new).transpose()?, - cookie: cookie.map(Cookie::from_wire_encoding).transpose()?, - limit, - }, - wire::Message { - r#type: Some(4), - discover_response: - Some(DiscoverResponse { - registrations, - status: Some(0), - cookie: Some(cookie), - .. - }), - .. - } => { - let registrations = registrations - .into_iter() - .map(|reggo| { - Ok(Registration { - namespace: reggo - .ns - .map(Namespace::new) - .transpose()? - .ok_or(ConversionError::MissingNamespace)?, - record: PeerRecord::from_signed_envelope( - SignedEnvelope::from_protobuf_encoding( - ®go - .signed_peer_record - .ok_or(ConversionError::MissingSignedPeerRecord)?, - )?, - )?, - ttl: reggo.ttl.ok_or(ConversionError::MissingTtl)?, - }) - }) - .collect::, ConversionError>>()?; - let cookie = Cookie::from_wire_encoding(cookie)?; - - Message::DiscoverResponse(Ok((registrations, cookie))) + let msg_ty = message + .type_ + .ok_or(ConversionError::InconsistentWireMessage)? + .enum_value() + .or(Err(ConversionError::InconsistentWireMessage))?; + + let message = match msg_ty { + MessageType::REGISTER => { + let register = message + .register + .into_option() + .ok_or(ConversionError::InconsistentWireMessage)?; + let signed_peer_record = register + .signedPeerRecord + .ok_or(ConversionError::InconsistentWireMessage)?; + Message::Register(NewRegistration { + namespace: register + .ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + ttl: register.ttl, + record: PeerRecord::from_signed_envelope( + SignedEnvelope::from_protobuf_encoding(&signed_peer_record)?, + )?, + }) } - wire::Message { - r#type: Some(1), - register_response: - Some(RegisterResponse { - status: Some(error_code), - .. - }), - .. - } => { - let error_code = wire::message::ResponseStatus::from_i32(error_code) - .ok_or(ConversionError::BadStatusCode)? - .try_into()?; - Message::RegisterResponse(Err(error_code)) + MessageType::REGISTER_RESPONSE => { + let register_response = message + .registerResponse + .into_option() + .ok_or(ConversionError::InconsistentWireMessage)?; + let status = register_response + .status + .ok_or(ConversionError::InconsistentWireMessage)? + .enum_value() + .or(Err(ConversionError::BadStatusCode))?; + + match status { + ResponseStatus::OK => Message::RegisterResponse(Ok(register_response + .ttl + .ok_or(ConversionError::MissingTtl)?)), + error_code => Message::RegisterResponse(Err(error_code.try_into()?)), + } } - wire::Message { - r#type: Some(2), - unregister: Some(Unregister { ns, .. }), - .. - } => Message::Unregister( - ns.map(Namespace::new) - .transpose()? - .ok_or(ConversionError::MissingNamespace)?, - ), - wire::Message { - r#type: Some(4), - discover_response: - Some(DiscoverResponse { - status: Some(error_code), - .. - }), - .. - } => { - let error = wire::message::ResponseStatus::from_i32(error_code) - .ok_or(ConversionError::BadStatusCode)? - .try_into()?; - Message::DiscoverResponse(Err(error)) + MessageType::UNREGISTER => { + let unregister = message + .unregister + .into_option() + .ok_or(ConversionError::InconsistentWireMessage)?; + + Message::Unregister( + unregister + .ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + ) + } + MessageType::DISCOVER => { + let discover = message + .discover + .into_option() + .ok_or(ConversionError::InconsistentWireMessage)?; + + Message::Discover { + namespace: discover.ns.map(Namespace::new).transpose()?, + cookie: discover + .cookie + .map(Cookie::from_wire_encoding) + .transpose()?, + limit: discover.limit, + } + } + MessageType::DISCOVER_RESPONSE => { + let discover_response = message + .discoverResponse + .into_option() + .ok_or(ConversionError::InconsistentWireMessage)?; + let status = discover_response + .status + .ok_or(ConversionError::InconsistentWireMessage)? + .enum_value() + .or(Err(ConversionError::BadStatusCode))?; + + match status { + ResponseStatus::OK => { + let registrations = discover_response + .registrations + .into_iter() + .map(|reggo| { + Ok(Registration { + namespace: reggo + .ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + record: PeerRecord::from_signed_envelope( + SignedEnvelope::from_protobuf_encoding( + ®go + .signedPeerRecord + .ok_or(ConversionError::MissingSignedPeerRecord)?, + )?, + )?, + ttl: reggo.ttl.ok_or(ConversionError::MissingTtl)?, + }) + }) + .collect::, ConversionError>>()?; + let cookie = Cookie::from_wire_encoding( + discover_response + .cookie + .ok_or(ConversionError::InconsistentWireMessage)?, + )?; + + Message::DiscoverResponse(Ok((registrations, cookie))) + } + error_code => Message::DiscoverResponse(Err(error_code.try_into()?)), + } } - _ => return Err(ConversionError::InconsistentWireMessage), }; Ok(message) @@ -554,14 +546,14 @@ impl TryFrom for ErrorCode { use wire::message::ResponseStatus::*; let code = match value { - Ok => return Err(UnmappableStatusCode(value)), - EInvalidNamespace => ErrorCode::InvalidNamespace, - EInvalidSignedPeerRecord => ErrorCode::InvalidSignedPeerRecord, - EInvalidTtl => ErrorCode::InvalidTtl, - EInvalidCookie => ErrorCode::InvalidCookie, - ENotAuthorized => ErrorCode::NotAuthorized, - EInternalError => ErrorCode::InternalError, - EUnavailable => ErrorCode::Unavailable, + OK => return Err(UnmappableStatusCode(value)), + E_INVALID_NAMESPACE => ErrorCode::InvalidNamespace, + E_INVALID_SIGNED_PEER_RECORD => ErrorCode::InvalidSignedPeerRecord, + E_INVALID_TTL => ErrorCode::InvalidTtl, + E_INVALID_COOKIE => ErrorCode::InvalidCookie, + E_NOT_AUTHORIZED => ErrorCode::NotAuthorized, + E_INTERNAL_ERROR => ErrorCode::InternalError, + E_UNAVAILABLE => ErrorCode::Unavailable, }; Result::Ok(code) @@ -573,13 +565,13 @@ impl From for wire::message::ResponseStatus { use wire::message::ResponseStatus::*; match error_code { - ErrorCode::InvalidNamespace => EInvalidNamespace, - ErrorCode::InvalidSignedPeerRecord => EInvalidSignedPeerRecord, - ErrorCode::InvalidTtl => EInvalidTtl, - ErrorCode::InvalidCookie => EInvalidCookie, - ErrorCode::NotAuthorized => ENotAuthorized, - ErrorCode::InternalError => EInternalError, - ErrorCode::Unavailable => EUnavailable, + ErrorCode::InvalidNamespace => E_INVALID_NAMESPACE, + ErrorCode::InvalidSignedPeerRecord => E_INVALID_SIGNED_PEER_RECORD, + ErrorCode::InvalidTtl => E_INVALID_TTL, + ErrorCode::InvalidCookie => E_INVALID_COOKIE, + ErrorCode::NotAuthorized => E_NOT_AUTHORIZED, + ErrorCode::InternalError => E_INTERNAL_ERROR, + ErrorCode::Unavailable => E_UNAVAILABLE, } } } @@ -595,10 +587,12 @@ impl From for ConversionError { pub struct UnmappableStatusCode(wire::message::ResponseStatus); #[allow(clippy::derive_partial_eq_without_eq)] -mod wire { - include!(concat!(env!("OUT_DIR"), "/rendezvous.pb.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } +use protos::rpc as wire; + #[cfg(test)] mod tests { use super::*; diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 3dd07ee82de..5c03023b5eb 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -15,7 +15,7 @@ futures = "0.3.1" lazy_static = "1.2" libp2p-core = { version = "0.37.0", path = "../../core" } log = "0.4" -prost = "0.11" +protobuf = "3.2" rand = "0.8.3" sha2 = "0.10.0" static_assertions = "1" @@ -37,7 +37,7 @@ libsodium-sys-stable = { version = "1.19.22", features = ["fetch-latest"] } ed25519-compact = "2.0.2" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/transports/noise/build.rs b/transports/noise/build.rs index c9cf60412cd..4856029b436 100644 --- a/transports/noise/build.rs +++ b/transports/noise/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/io/handshake/payload.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/io/handshake/payload.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/transports/noise/src/error.rs b/transports/noise/src/error.rs index 4e1d240fe74..d95a993f3a8 100644 --- a/transports/noise/src/error.rs +++ b/transports/noise/src/error.rs @@ -36,7 +36,7 @@ pub enum NoiseError { /// upgrade failed. AuthenticationFailed, /// A handshake payload is invalid. - InvalidPayload(prost::DecodeError), + InvalidPayload(protobuf::Error), /// A signature was required and could not be created. SigningError(identity::error::SigningError), } @@ -79,8 +79,8 @@ impl From for NoiseError { } } -impl From for NoiseError { - fn from(e: prost::DecodeError) -> Self { +impl From for NoiseError { + fn from(e: protobuf::Error) -> Self { NoiseError::InvalidPayload(e) } } diff --git a/transports/noise/src/io/handshake.rs b/transports/noise/src/io/handshake.rs index c32eef9690f..0797a1c869f 100644 --- a/transports/noise/src/io/handshake.rs +++ b/transports/noise/src/io/handshake.rs @@ -21,10 +21,12 @@ //! Noise protocol handshake I/O. #[allow(clippy::derive_partial_eq_without_eq)] -mod payload_proto { - include!(concat!(env!("OUT_DIR"), "/payload.proto.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } +use protos::payload as payload_proto; + use crate::error::NoiseError; use crate::io::{framed::NoiseFramed, NoiseOutput}; use crate::protocol::{KeypairIdentity, Protocol, PublicKey}; @@ -32,7 +34,7 @@ use crate::LegacyConfig; use bytes::Bytes; use futures::prelude::*; use libp2p_core::identity; -use prost::Message; +use protobuf::Message; use std::io; /// The identity of the remote established during a handshake. @@ -175,7 +177,7 @@ where { let msg = recv(state).await?; - let mut pb_result = payload_proto::NoiseHandshakePayload::decode(&msg[..]); + let mut pb_result = payload_proto::NoiseHandshakePayload::parse_from_bytes(&msg[..]); if pb_result.is_err() && state.legacy.recv_legacy_handshake { // NOTE: This is support for legacy handshake payloads. As long as @@ -196,7 +198,7 @@ where // frame length, because each length is encoded as a `u16`. if usize::from(u16::from_be_bytes(buf)) + 2 == msg.len() { log::debug!("Attempting fallback legacy protobuf decoding."); - payload_proto::NoiseHandshakePayload::decode(&msg[2..]) + payload_proto::NoiseHandshakePayload::parse_from_bytes(&msg[2..]) } else { Err(e) } @@ -239,16 +241,21 @@ where pb.identity_sig = sig.clone() } + let size = usize::try_from(pb.compute_size()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let mut msg = if state.legacy.send_legacy_handshake { - let mut msg = Vec::with_capacity(2 + pb.encoded_len()); - msg.extend_from_slice(&(pb.encoded_len() as u16).to_be_bytes()); + let mut msg = Vec::with_capacity(2 + size); + msg.extend_from_slice(&(size as u16).to_be_bytes()); msg } else { - Vec::with_capacity(pb.encoded_len()) + Vec::with_capacity(size) }; - pb.encode(&mut msg) - .expect("Vec provides capacity as needed"); + msg.extend( + pb.write_to_bytes() + .expect("All required fields to be initialized."), + ); state.io.send(&msg).await?; Ok(()) @@ -265,16 +272,21 @@ where pb.identity_sig = sig.clone() } + let size = usize::try_from(pb.compute_size()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let mut msg = if state.legacy.send_legacy_handshake { - let mut msg = Vec::with_capacity(2 + pb.encoded_len()); - msg.extend_from_slice(&(pb.encoded_len() as u16).to_be_bytes()); + let mut msg = Vec::with_capacity(2 + size); + msg.extend_from_slice(&(size as u16).to_be_bytes()); msg } else { - Vec::with_capacity(pb.encoded_len()) + Vec::with_capacity(size) }; - pb.encode(&mut msg) - .expect("Vec provides capacity as needed"); + msg.extend( + pb.write_to_bytes() + .expect("All required fields to be initialized."), + ); state.io.send(&msg).await?; Ok(()) diff --git a/transports/plaintext/CHANGELOG.md b/transports/plaintext/CHANGELOG.md index 93162dfa26d..7ba5b11256c 100644 --- a/transports/plaintext/CHANGELOG.md +++ b/transports/plaintext/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.37.1 [unreleased] + +- Remove `prost` and add `protobuf`. See [PR 3050]. + +[PR 3050]: https://github.com/libp2p/rust-libp2p/pull/3050 + # 0.37.0 - Update to `libp2p-core` `v0.37.0`. diff --git a/transports/plaintext/Cargo.toml b/transports/plaintext/Cargo.toml index 7c2051d2364..ccd6d1df4c1 100644 --- a/transports/plaintext/Cargo.toml +++ b/transports/plaintext/Cargo.toml @@ -16,7 +16,7 @@ futures = "0.3.1" asynchronous-codec = "0.6" libp2p-core = { version = "0.37.0", path = "../../core" } log = "0.4.8" -prost = "0.11" +protobuf = "3.2" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1.0.2" @@ -26,7 +26,7 @@ quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } rand = "0.8" [build-dependencies] -prost-build = "0.11" +protobuf-codegen = "3.2" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/transports/plaintext/build.rs b/transports/plaintext/build.rs index 56c7b20121a..e00accae79b 100644 --- a/transports/plaintext/build.rs +++ b/transports/plaintext/build.rs @@ -19,5 +19,12 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/structs.proto"], &["src"]).unwrap(); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src"]) + .input("src/structs.proto") + .customize(protobuf_codegen::Customize::default().lite_runtime(true)) + .cargo_out_dir("protos") + .run() + .unwrap() } diff --git a/transports/plaintext/src/error.rs b/transports/plaintext/src/error.rs index 9f512c4f58e..ebb4fe42a99 100644 --- a/transports/plaintext/src/error.rs +++ b/transports/plaintext/src/error.rs @@ -28,7 +28,7 @@ pub enum PlainTextError { IoError(IoError), /// Failed to parse the handshake protobuf message. - InvalidPayload(Option), + InvalidPayload(Option), /// The peer id of the exchange isn't consistent with the remote public key. InvalidPeerId, @@ -65,8 +65,8 @@ impl From for PlainTextError { } } -impl From for PlainTextError { - fn from(err: prost::DecodeError) -> PlainTextError { +impl From for PlainTextError { + fn from(err: protobuf::Error) -> PlainTextError { PlainTextError::InvalidPayload(Some(err)) } } diff --git a/transports/plaintext/src/handshake.rs b/transports/plaintext/src/handshake.rs index 6534c6d7abd..ebd075ee695 100644 --- a/transports/plaintext/src/handshake.rs +++ b/transports/plaintext/src/handshake.rs @@ -27,7 +27,7 @@ use bytes::{Bytes, BytesMut}; use futures::prelude::*; use libp2p_core::{PeerId, PublicKey}; use log::{debug, trace}; -use prost::Message; +use protobuf::Message; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use unsigned_varint::codec::UviBytes; @@ -52,19 +52,16 @@ pub struct Remote { impl HandshakeContext { fn new(config: PlainText2Config) -> Self { - let exchange = Exchange { - id: Some(config.local_public_key.to_peer_id().to_bytes()), - pubkey: Some(config.local_public_key.to_protobuf_encoding()), - }; - let mut buf = Vec::with_capacity(exchange.encoded_len()); - exchange - .encode(&mut buf) - .expect("Vec provides capacity as needed"); + let mut exchange = Exchange::new(); + exchange.set_id(config.local_public_key.to_peer_id().to_bytes()); + exchange.set_pubkey(config.local_public_key.to_protobuf_encoding()); Self { config, state: Local { - exchange_bytes: buf, + exchange_bytes: exchange + .write_to_bytes() + .expect("All fields to be initialized."), }, } } @@ -73,7 +70,7 @@ impl HandshakeContext { self, exchange_bytes: BytesMut, ) -> Result, PlainTextError> { - let prop = match Exchange::decode(exchange_bytes) { + let prop = match Exchange::parse_from_bytes(exchange_bytes.as_ref()) { Ok(prop) => prop, Err(e) => { debug!("failed to parse remote's exchange protobuf message"); diff --git a/transports/plaintext/src/lib.rs b/transports/plaintext/src/lib.rs index da5674b6497..0183e760b55 100644 --- a/transports/plaintext/src/lib.rs +++ b/transports/plaintext/src/lib.rs @@ -35,13 +35,16 @@ use std::{ }; use void::Void; -mod error; -mod handshake; #[allow(clippy::derive_partial_eq_without_eq)] -mod structs_proto { - include!(concat!(env!("OUT_DIR"), "/structs.rs")); +mod protos { + include!(concat!(env!("OUT_DIR"), "/protos/mod.rs")); } +use protos::structs as structs_proto; + +mod error; +mod handshake; + /// `PlainText1Config` is an insecure connection handshake for testing purposes only. /// /// > **Note**: Given that `PlainText1Config` has no notion of exchanging peer identity information it is not compatible