Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rumqttc): Replace Vec with FixedBitSet for QoS 2 packet trac… #869

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ url = { version = "2", default-features = false, optional = true }
# proxy
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }
tokio-stream = "0.1.15"
fixedbitset = "0.5.7"

[dev-dependencies]
bincode = "1.3.3"
Expand Down
60 changes: 23 additions & 37 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{Event, Incoming, Outgoing, Request};

use crate::mqttbytes::v4::*;
use crate::mqttbytes::{self, *};
use crate::mqttbytes::*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to be unrelated to the core of this PR

use fixedbitset::FixedBitSet;
use std::collections::VecDeque;
use std::{io, time::Instant};

Expand All @@ -28,7 +29,7 @@ pub enum StateError {
#[error("A Subscribe packet must contain atleast one filter")]
EmptySubscription,
#[error("Mqtt serialization/deserialization error: {0}")]
Deserialization(#[from] mqttbytes::Error),
Deserialization(#[from] Error),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have placed things this way to make it clear where that particular error is trickling down from.

#[error("Connection closed by peer abruptly")]
ConnectionAborted,
}
Expand Down Expand Up @@ -62,9 +63,9 @@ pub struct MqttState {
/// Outgoing QoS 1, 2 publishes which aren't acked yet
pub(crate) outgoing_pub: Vec<Option<Publish>>,
/// Packet ids of released QoS 2 publishes
pub(crate) outgoing_rel: Vec<Option<u16>>,
pub(crate) outgoing_rel: FixedBitSet,
/// Packet ids on incoming QoS 2 publishes
pub(crate) incoming_pub: Vec<Option<u16>>,
pub(crate) incoming_pub: FixedBitSet,
/// Last collision due to broker not acking in order
pub collision: Option<Publish>,
/// Buffered incoming packets
Expand All @@ -89,8 +90,8 @@ impl MqttState {
max_inflight,
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1),
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1),
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand All @@ -113,17 +114,14 @@ impl MqttState {
}

// remove and collect pending releases
for rel in self.outgoing_rel.iter_mut() {
if let Some(pkid) = rel.take() {
let request = Request::PubRel(PubRel::new(pkid));
pending.push(request);
}
for pkid in self.outgoing_rel.ones() {
let request = Request::PubRel(PubRel::new(pkid as u16));
pending.push(request);
}
self.outgoing_rel.clear();

// remove packed ids of incoming qos2 publishes
for id in self.incoming_pub.iter_mut() {
id.take();
}
// remove packet ids of incoming qos2 publishes
self.incoming_pub.clear();

self.await_pingresp = false;
self.collision_ping_count = 0;
Expand Down Expand Up @@ -210,7 +208,7 @@ impl MqttState {
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
self.incoming_pub[pkid as usize] = Some(pkid);
self.incoming_pub.insert(pkid as usize);

if !self.manual_acks {
let pubrec = PubRec::new(pkid);
Expand Down Expand Up @@ -261,7 +259,7 @@ impl MqttState {
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
self.outgoing_rel.insert(pubrec.pkid as usize);
let pubrel = PubRel { pkid: pubrec.pkid };
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);
Expand All @@ -270,16 +268,12 @@ impl MqttState {
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;

if publish.take().is_none() {
if !self.incoming_pub.contains(pubrel.pkid as usize) {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}

self.incoming_pub.set(pubrel.pkid as usize, false);
let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
let pubcomp = PubComp { pkid: pubrel.pkid };
self.events.push_back(event);
Expand All @@ -288,17 +282,12 @@ impl MqttState {
}

fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<Option<Packet>, StateError> {
if self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?
.take()
.is_none()
{
if !self.outgoing_rel.contains(pubcomp.pkid as usize) {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}

self.outgoing_rel.set(pubcomp.pkid as usize, false);
self.inflight -= 1;
let packet = self.check_collision(pubcomp.pkid).map(|publish| {
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
Expand Down Expand Up @@ -486,7 +475,7 @@ impl MqttState {
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
self.outgoing_rel.insert(pubrel.pkid as usize);
self.inflight += 1;
Ok(pubrel)
}
Expand Down Expand Up @@ -610,10 +599,8 @@ mod test {
mqtt.handle_incoming_publish(&publish2).unwrap();
mqtt.handle_incoming_publish(&publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();

// only qos2 publish should be add to queue
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));
}

#[test]
Expand Down Expand Up @@ -656,8 +643,7 @@ mod test {
mqtt.handle_incoming_publish(&publish2).unwrap();
mqtt.handle_incoming_publish(&publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));

assert!(mqtt.events.is_empty());
}
Expand Down Expand Up @@ -725,7 +711,7 @@ mod test {
assert_eq!(backup.unwrap().pkid, 1);

// check if the qos2 element's release pkid is 2
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
assert!(mqtt.outgoing_rel.contains(2));
}

#[test]
Expand Down