Skip to content

Commit

Permalink
refactor(deflate): implement tests with futures_ringbuf
Browse files Browse the repository at this point in the history
Previously, we used a TCP transport to test the deflate compression. Really, all we need is an in-memory ringbuffer that we can plug in at both ends. This PR uses `futures_ringbuf` for that.

Related #3748.

Pull-Request: #3771.
  • Loading branch information
thomaseizinger authored Apr 26, 2023
1 parent ed392af commit 7fa8bef
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 64 deletions.
96 changes: 88 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions transports/deflate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ libp2p-core = { version = "0.39.0", path = "../../core" }
flate2 = "1.0"

[dev-dependencies]
async-std = "1.6.2"
libp2p-tcp = { path = "../tcp", features = ["async-io"] }
quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" }
rand = "0.8"
futures_ringbuf = "0.3.1"

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
80 changes: 26 additions & 54 deletions transports/deflate/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::{future, prelude::*};
use libp2p_core::{transport::Transport, upgrade};
use futures::prelude::*;
use libp2p_core::OutboundUpgrade;
use libp2p_deflate::DeflateConfig;
use libp2p_tcp as tcp;
use quickcheck::{QuickCheck, TestResult};
use rand::RngCore;

Expand All @@ -31,7 +30,7 @@ fn deflate() {
if message.is_empty() {
return TestResult::discard();
}
async_std::task::block_on(run(message));
futures::executor::block_on(run(message));
TestResult::passed()
}
QuickCheck::new().quickcheck(prop as fn(Vec<u8>) -> TestResult)
Expand All @@ -41,68 +40,41 @@ fn deflate() {
fn lot_of_data() {
let mut v = vec![0; 2 * 1024 * 1024];
rand::thread_rng().fill_bytes(&mut v);
async_std::task::block_on(run(v))
futures::executor::block_on(run(v));
}

async fn run(message1: Vec<u8>) {
let new_transport = || {
tcp::async_io::Transport::default()
.and_then(|conn, endpoint| {
upgrade::apply(
conn,
DeflateConfig::default(),
endpoint,
upgrade::Version::V1,
)
})
.boxed()
};
let mut listener_transport = new_transport();
listener_transport
.listen_on("/ip4/0.0.0.0/tcp/0".parse().expect("multiaddr"))
.expect("listener");

let listen_addr = listener_transport
.next()
.await
.expect("some event")
.into_new_address()
.expect("new address");
let (server, client) = futures_ringbuf::Endpoint::pair(100, 100);

let message2 = message1.clone();

let listener_task = async_std::task::spawn(async move {
let mut conn = listener_transport
.filter(|e| future::ready(e.is_upgrade()))
.next()
let client_task = async move {
let mut client = DeflateConfig::default()
.upgrade_outbound(client, b"")
.await
.expect("some event")
.into_incoming()
.expect("upgrade")
.0
.await
.expect("connection");
.unwrap();

let mut buf = vec![0; message2.len()];
conn.read_exact(&mut buf).await.expect("read_exact");
client.read_exact(&mut buf).await.expect("read_exact");
assert_eq!(&buf[..], &message2[..]);

conn.write_all(&message2).await.expect("write_all");
conn.close().await.expect("close")
});
client.write_all(&message2).await.expect("write_all");
client.close().await.expect("close")
};

let mut dialer_transport = new_transport();
let mut conn = dialer_transport
.dial(listen_addr)
.expect("dialer")
.await
.expect("connection");
conn.write_all(&message1).await.expect("write_all");
conn.close().await.expect("close");
let server_task = async move {
let mut server = DeflateConfig::default()
.upgrade_outbound(server, b"")
.await
.unwrap();

server.write_all(&message1).await.expect("write_all");
server.close().await.expect("close");

let mut buf = Vec::new();
conn.read_to_end(&mut buf).await.expect("read_to_end");
assert_eq!(&buf[..], &message1[..]);
let mut buf = Vec::new();
server.read_to_end(&mut buf).await.expect("read_to_end");
assert_eq!(&buf[..], &message1[..]);
};

listener_task.await
futures::future::join(server_task, client_task).await;
}

0 comments on commit 7fa8bef

Please sign in to comment.