Skip to content

Commit

Permalink
test: working of sub/unsub promises
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Oct 8, 2024
1 parent 165d169 commit bfeb44d
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 20 deletions.
52 changes: 47 additions & 5 deletions rumqttc/tests/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,39 @@ use tokio::{task, time};

use bytes::BytesMut;
use flume::{bounded, Receiver, Sender};
use rumqttc::{Event, Incoming, Outgoing, Packet};
use rumqttc::{Incoming, Packet};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

#[derive(Debug, PartialEq)]
pub enum Event {
Incoming(Packet),
Outgoing(Outgoing),
}

#[derive(Debug, PartialEq)]
pub enum Outgoing {
/// Publish packet with packet identifier. 0 implies QoS 0
Publish(u16),
/// SubAck packet with packet identifier
SubAck(u16),
/// UnsubAck packet with packet identifier
UnsubAck(u16),
/// PubAck packet
PubAck(u16),
/// PubRec packet
PubRec(u16),
/// PubRel packet
PubRel(u16),
/// PubComp packet
PubComp(u16),
/// Ping request packet
PingReq,
/// Ping response packet
PingResp,
/// Disconnect packet
Disconnect,
}

pub struct Broker {
pub(crate) framed: Network,
pub(crate) incoming: VecDeque<Packet>,
Expand Down Expand Up @@ -116,8 +146,8 @@ impl Broker {
}
}

/// Sends an acknowledgement
pub async fn ack(&mut self, pkid: u16) {
/// Sends a publish acknowledgement
pub async fn puback(&mut self, pkid: u16) {
let packet = Packet::PubAck(PubAck::new(pkid));
self.framed.write(packet).await.unwrap();
}
Expand All @@ -134,6 +164,18 @@ impl Broker {
self.framed.write(packet).await.unwrap();
}

/// Sends a subscribe acknowledgement
pub async fn suback(&mut self, pkid: u16, qos: QoS) {
let packet = Packet::SubAck(SubAck::new(pkid, vec![SubscribeReasonCode::Success(qos)]));
self.framed.write(packet).await.unwrap();
}

/// Sends an unsubscribe acknowledgement
pub async fn unsuback(&mut self, pkid: u16) {
let packet = Packet::UnsubAck(UnsubAck::new(pkid));
self.framed.write(packet).await.unwrap();
}

/// Sends an acknowledgement
pub async fn pingresp(&mut self) {
let packet = Packet::PingResp;
Expand Down Expand Up @@ -308,8 +350,8 @@ fn outgoing(packet: &Packet) -> Outgoing {
Packet::PubRec(pubrec) => Outgoing::PubRec(pubrec.pkid),
Packet::PubRel(pubrel) => Outgoing::PubRel(pubrel.pkid),
Packet::PubComp(pubcomp) => Outgoing::PubComp(pubcomp.pkid),
Packet::Subscribe(subscribe) => Outgoing::Subscribe(subscribe.pkid),
Packet::Unsubscribe(unsubscribe) => Outgoing::Unsubscribe(unsubscribe.pkid),
Packet::SubAck(suback) => Outgoing::SubAck(suback.pkid),
Packet::UnsubAck(unsuback) => Outgoing::UnsubAck(unsuback.pkid),
Packet::PingReq => Outgoing::PingReq,
Packet::PingResp => Outgoing::PingResp,
Packet::Disconnect => Outgoing::Disconnect,
Expand Down
147 changes: 132 additions & 15 deletions rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() {
loop {
let event = broker.tick().await;

if event == Event::Incoming(Incoming::PingReq) {
if event == broker::Event::Incoming(Incoming::PingReq) {
// wait for 3 pings
count += 1;
if count == 3 {
Expand Down Expand Up @@ -218,7 +218,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() {
loop {
let event = broker.tick().await;

if event == Event::Incoming(Incoming::PingReq) {
if event == broker::Event::Incoming(Incoming::PingReq) {
// wait for 3 pings
count += 1;
if count == 3 {
Expand Down Expand Up @@ -320,12 +320,12 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() {
assert!(broker.read_publish().await.is_none());

// ack packet 1 and client would produce packet 4
broker.ack(1).await;
broker.puback(1).await;
assert!(broker.read_publish().await.is_some());
assert!(broker.read_publish().await.is_none());

// ack packet 2 and client would produce packet 5
broker.ack(2).await;
broker.puback(2).await;
assert!(broker.read_publish().await.is_some());
assert!(broker.read_publish().await.is_none());
}
Expand Down Expand Up @@ -353,18 +353,18 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
}

// out of order ack
broker.ack(3).await;
broker.ack(4).await;
broker.puback(3).await;
broker.puback(4).await;
time::sleep(Duration::from_secs(5)).await;
broker.ack(1).await;
broker.ack(2).await;
broker.puback(1).await;
broker.puback(2).await;

// read and ack remaining packets in order
for i in 5..=15 {
let packet = broker.read_publish().await;
let packet = packet.unwrap();
assert_eq!(packet.payload[0], i);
broker.ack(packet.pkid).await;
broker.puback(packet.pkid).await;
}

time::sleep(Duration::from_secs(10)).await;
Expand All @@ -376,7 +376,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
// Poll until there is collision.
loop {
match eventloop.poll().await.unwrap() {
Event::Outgoing(Outgoing::AwaitAck(1)) => break,
rumqttc::Event::Outgoing(rumqttc::Outgoing::AwaitAck(1)) => break,
v => {
println!("Poll = {v:?}");
continue;
Expand All @@ -390,7 +390,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
println!("Poll = {event:?}");

match event {
Event::Outgoing(Outgoing::Publish(ack)) => {
rumqttc::Event::Outgoing(rumqttc::Outgoing::Publish(ack)) => {
if ack == 1 {
let elapsed = start.elapsed().as_millis() as i64;
let deviation_millis: i64 = (5000 - elapsed).abs();
Expand Down Expand Up @@ -466,7 +466,7 @@ async fn next_poll_after_connect_failure_reconnects() {
}

match eventloop.poll().await {
Ok(Event::Incoming(Packet::ConnAck(ConnAck {
Ok(rumqttc::Event::Incoming(Packet::ConnAck(ConnAck {
code: ConnectReturnCode::Success,
session_present: false,
}))) => (),
Expand Down Expand Up @@ -498,7 +498,7 @@ async fn reconnection_resumes_from_the_previous_state() {
for i in 1..=2 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
broker.ack(packet.pkid).await;
broker.puback(packet.pkid).await;
}

// NOTE: An interesting thing to notice here is that reassigning a new broker
Expand All @@ -512,7 +512,7 @@ async fn reconnection_resumes_from_the_previous_state() {
for i in 3..=4 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
broker.ack(packet.pkid).await;
broker.puback(packet.pkid).await;
}
}

Expand Down Expand Up @@ -696,7 +696,7 @@ async fn resolve_on_qos1_ack_from_broker() {
.unwrap_err();

// Finally ack the packet
broker.ack(1).await;
broker.puback(1).await;

// Token shouldn't resolve until packet is acked
assert_eq!(
Expand Down Expand Up @@ -780,3 +780,120 @@ async fn resolve_on_qos2_ack_from_broker() {
1
);
}

#[tokio::test]
async fn resolve_on_sub_ack_from_broker() {
let options = MqttOptions::new("dummy", "127.0.0.1", 3006);
let (client, mut eventloop) = AsyncClient::new(options, 5);

task::spawn(async move {
let res = run(&mut eventloop, false).await;
if let Err(e) = res {
match e {
ConnectionError::FlushTimeout => {
assert!(eventloop.network.is_none());
println!("State is being clean properly");
}
_ => {
println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer.");
}
}
}
});

let mut broker = Broker::new(3006, 0, false).await;

let mut token = client
.subscribe("hello/world", QoS::AtLeastOnce)
.await
.unwrap();

// Token shouldn't resolve before reaching broker
timeout(Duration::from_secs(1), &mut token)
.await
.unwrap_err();

let Packet::Subscribe(Subscribe { pkid, filters, .. }) = broker.read_packet().await.unwrap()
else {
unreachable!()
};
assert_eq!(
filters,
[SubscribeFilter {
path: "hello/world".to_owned(),
qos: QoS::AtLeastOnce
}]
);
assert_eq!(pkid, 1);

// Token shouldn't resolve until packet is acked
timeout(Duration::from_secs(1), &mut token)
.await
.unwrap_err();

// Finally ack the packet
broker.suback(1, QoS::AtLeastOnce).await;

// Token shouldn't resolve until packet is acked
assert_eq!(
timeout(Duration::from_secs(1), &mut token)
.await
.unwrap()
.unwrap(),
1
);
}

#[tokio::test]
async fn resolve_on_unsub_ack_from_broker() {
let options = MqttOptions::new("dummy", "127.0.0.1", 3006);
let (client, mut eventloop) = AsyncClient::new(options, 5);

task::spawn(async move {
let res = run(&mut eventloop, false).await;
if let Err(e) = res {
match e {
ConnectionError::FlushTimeout => {
assert!(eventloop.network.is_none());
println!("State is being clean properly");
}
_ => {
println!("Couldn't fill the TCP send buffer to run this test properly. Try reducing the size of buffer.");
}
}
}
});

let mut broker = Broker::new(3006, 0, false).await;

let mut token = client.unsubscribe("hello/world").await.unwrap();

// Token shouldn't resolve before reaching broker
timeout(Duration::from_secs(1), &mut token)
.await
.unwrap_err();

let Packet::Unsubscribe(Unsubscribe { topics, pkid, .. }) = broker.read_packet().await.unwrap()
else {
unreachable!()
};
assert_eq!(topics, vec!["hello/world"]);
assert_eq!(pkid, 1);

// Token shouldn't resolve until packet is acked
timeout(Duration::from_secs(1), &mut token)
.await
.unwrap_err();

// Finally ack the packet
broker.unsuback(1).await;

// Token shouldn't resolve until packet is acked
assert_eq!(
timeout(Duration::from_secs(1), &mut token)
.await
.unwrap()
.unwrap(),
1
);
}

0 comments on commit bfeb44d

Please sign in to comment.