Skip to content

Commit

Permalink
rumqttc: resume session only if CONNACK with session present 1 (#864)
Browse files Browse the repository at this point in the history
* Check if session present to restore pending publishes.

* Modify changelog.

* remove changes that don't seem to be related

* refactor: improve readability

* feat: apply changes to v4

* Remove session_expiry_interval related code.

* test: set clean session

* test: broker saved session

* test: fix resume reconnect

---------

Co-authored-by: Devdutt Shenoi <devdutt@outlook.in>
Co-authored-by: Devdutt Shenoi <devdutt@bytebeam.io>
  • Loading branch information
3 people authored May 21, 2024
1 parent a63379e commit 83d8f77
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 41 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes
* Resume session only if broker sends `CONNACK` with `session_present == 1`.

### Security

Expand Down
20 changes: 12 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,24 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

return Ok(Event::Incoming(connack));
return Ok(Event::Incoming(Packet::ConnAck(connack)));
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -294,14 +300,14 @@ impl EventLoop {
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
let connack = mqtt_connect(mqtt_options, &mut network).await?;

Ok((network, packet))
Ok((network, connack))
}

pub(crate) async fn socket_connect(
Expand Down Expand Up @@ -469,7 +475,7 @@ async fn network_connect(
async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();
Expand All @@ -485,9 +491,7 @@ async fn mqtt_connect(

// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
Ok(Packet::ConnAck(connack))
}
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack),
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(packet)),
}
Expand Down
27 changes: 14 additions & 13 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,25 @@ impl EventLoop {
connect(&mut self.options),
)
.await??;
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive)));
}

self.state.handle_incoming_packet(connack)?;
self.state
.handle_incoming_packet(Incoming::ConnAck(connack))?;
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -263,19 +270,14 @@ impl EventLoop {
/// the stream.
/// This function (for convenience) includes internal delays for users to perform internal sleeps
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
async fn connect(options: &mut MqttOptions) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(options, &mut network).await?;

// Last session might contain packets which aren't acked. MQTT says these packets should be
// republished in the next session
// move pending messages from state to eventloop
// let pending = self.state.clean();
// self.pending = pending.into_iter();
Ok((network, packet))
let connack = mqtt_connect(options, &mut network).await?;

Ok((network, connack))
}

async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionError> {
Expand Down Expand Up @@ -387,7 +389,7 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
async fn mqtt_connect(
options: &mut MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_start = options.clean_start();
let client_id = options.client_id();
Expand All @@ -406,7 +408,6 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
// Override local keep_alive value if set by server.
if let Some(props) = &connack.properties {
if let Some(keep_alive) = props.server_keep_alive {
options.keep_alive = Duration::from_secs(keep_alive as u64);
Expand All @@ -418,7 +419,7 @@ async fn mqtt_connect(
options.set_session_expiry_interval(props.session_expiry_interval);
}
}
Ok(Packet::ConnAck(connack))
Ok(connack)
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
Expand Down
9 changes: 6 additions & 3 deletions rumqttc/tests/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Broker {

impl Broker {
/// Create a new broker which accepts 1 mqtt connection
pub async fn new(port: u16, connack: u8) -> Broker {
pub async fn new(port: u16, connack: u8, session_saved: bool) -> Broker {
let addr = format!("127.0.0.1:{port}");
let listener = TcpListener::bind(&addr).await.unwrap();

Expand All @@ -32,9 +32,12 @@ impl Broker {
framed.readb(&mut incoming).await.unwrap();

match incoming.pop_front().unwrap() {
Packet::Connect(_) => {
Packet::Connect(connect) => {
let connack = match connack {
0 => ConnAck::new(ConnectReturnCode::Success, false),
0 => ConnAck::new(
ConnectReturnCode::Success,
!connect.clean_session && session_saved,
),
1 => ConnAck::new(ConnectReturnCode::BadUserNamePassword, false),
_ => {
return Broker {
Expand Down
38 changes: 21 additions & 17 deletions rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn _tick(
#[tokio::test]
async fn connection_should_timeout_on_time() {
task::spawn(async move {
let _broker = Broker::new(1880, 3).await;
let _broker = Broker::new(1880, 3, false).await;
time::sleep(Duration::from_secs(10)).await;
});

Expand Down Expand Up @@ -125,7 +125,7 @@ async fn idle_connection_triggers_pings_on_time() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(1885, 0).await;
let mut broker = Broker::new(1885, 0, false).await;
let mut count = 0;
let mut start = Instant::now();

Expand Down Expand Up @@ -169,7 +169,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(1886, 0).await;
let mut broker = Broker::new(1886, 0, false).await;
let mut count = 0;
let mut start = Instant::now();

Expand Down Expand Up @@ -204,7 +204,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(2000, 0).await;
let mut broker = Broker::new(2000, 0, false).await;
let mut count = 0;

// Start sending qos 0 publishes to the client. This triggers
Expand Down Expand Up @@ -238,7 +238,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() {

// A broker which consumes packets but doesn't reply
task::spawn(async move {
let mut broker = Broker::new(2001, 0).await;
let mut broker = Broker::new(2001, 0, false).await;
broker.blackhole().await;
});

Expand Down Expand Up @@ -279,7 +279,7 @@ async fn requests_are_blocked_after_max_inflight_queue_size() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(1887, 0).await;
let mut broker = Broker::new(1887, 0, false).await;
for i in 1..=10 {
let packet = broker.read_publish().await;

Expand All @@ -306,7 +306,7 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() {
run(&mut eventloop, true).await.unwrap();
});

let mut broker = Broker::new(1888, 0).await;
let mut broker = Broker::new(1888, 0, false).await;

// packet 1, 2, and 3
assert!(broker.read_publish().await.is_some());
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
});

task::spawn(async move {
let mut broker = Broker::new(1891, 0).await;
let mut broker = Broker::new(1891, 0, false).await;

// read all incoming packets first
for i in 1..=4 {
Expand Down Expand Up @@ -449,8 +449,8 @@ async fn next_poll_after_connect_failure_reconnects() {
let options = MqttOptions::new("dummy", "127.0.0.1", 3000);

task::spawn(async move {
let _broker = Broker::new(3000, 1).await;
let _broker = Broker::new(3000, 0).await;
let _broker = Broker::new(3000, 1, false).await;
let _broker = Broker::new(3000, 0, false).await;
time::sleep(Duration::from_secs(15)).await;
});

Expand All @@ -474,7 +474,9 @@ async fn next_poll_after_connect_failure_reconnects() {
#[tokio::test]
async fn reconnection_resumes_from_the_previous_state() {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 3001);
options.set_keep_alive(Duration::from_secs(5));
options
.set_keep_alive(Duration::from_secs(5))
.set_clean_session(false);

// start sending qos0 publishes. Makes sure that there is out activity but no in activity
let (client, mut eventloop) = AsyncClient::new(options, 5);
Expand All @@ -489,7 +491,7 @@ async fn reconnection_resumes_from_the_previous_state() {
});

// broker connection 1
let mut broker = Broker::new(3001, 0).await;
let mut broker = Broker::new(3001, 0, false).await;
for i in 1..=2 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
Expand All @@ -503,7 +505,7 @@ async fn reconnection_resumes_from_the_previous_state() {
// a block around broker with {} is closing the connection as expected

// broker connection 2
let mut broker = Broker::new(3001, 0).await;
let mut broker = Broker::new(3001, 0, true).await;
for i in 3..=4 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
Expand All @@ -514,7 +516,9 @@ async fn reconnection_resumes_from_the_previous_state() {
#[tokio::test]
async fn reconnection_resends_unacked_packets_from_the_previous_connection_first() {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 3002);
options.set_keep_alive(Duration::from_secs(5));
options
.set_keep_alive(Duration::from_secs(5))
.set_clean_session(false);

// start sending qos0 publishes. this makes sure that there is
// outgoing activity but no incoming activity
Expand All @@ -530,14 +534,14 @@ async fn reconnection_resends_unacked_packets_from_the_previous_connection_first
});

// broker connection 1. receive but don't ack
let mut broker = Broker::new(3002, 0).await;
let mut broker = Broker::new(3002, 0, false).await;
for i in 1..=2 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
}

// broker connection 2 receives from scratch
let mut broker = Broker::new(3002, 0).await;
let mut broker = Broker::new(3002, 0, true).await;
for i in 1..=6 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
Expand All @@ -559,7 +563,7 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly
});

task::spawn(async move {
let mut broker = Broker::new(3004, 0).await;
let mut broker = Broker::new(3004, 0, false).await;
while (broker.read_packet().await).is_some() {
time::sleep(Duration::from_secs_f64(0.5)).await;
}
Expand Down

0 comments on commit 83d8f77

Please sign in to comment.