Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rumqttc): Replace Vec with FixedBitSet for QoS 2 packet trac… #869

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ url = { version = "2", default-features = false, optional = true }
# proxy
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }
tokio-stream = "0.1.15"
fixedbitset = "0.5.7"

[dev-dependencies]
bincode = "1.3.3"
Expand Down
56 changes: 21 additions & 35 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{Event, Incoming, Outgoing, Request};

use crate::mqttbytes::v4::*;
use crate::mqttbytes::{self, *};
use fixedbitset::FixedBitSet;
use std::collections::VecDeque;
use std::{io, time::Instant};

Expand Down Expand Up @@ -62,9 +63,9 @@ pub struct MqttState {
/// Outgoing QoS 1, 2 publishes which aren't acked yet
pub(crate) outgoing_pub: Vec<Option<Publish>>,
/// Packet ids of released QoS 2 publishes
pub(crate) outgoing_rel: Vec<Option<u16>>,
pub(crate) outgoing_rel: FixedBitSet,
/// Packet ids on incoming QoS 2 publishes
pub(crate) incoming_pub: Vec<Option<u16>>,
pub(crate) incoming_pub: FixedBitSet,
/// Last collision due to broker not acking in order
pub collision: Option<Publish>,
/// Buffered incoming packets
Expand All @@ -89,8 +90,8 @@ impl MqttState {
max_inflight,
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1),
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1),
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand All @@ -113,17 +114,14 @@ impl MqttState {
}

// remove and collect pending releases
for rel in self.outgoing_rel.iter_mut() {
if let Some(pkid) = rel.take() {
let request = Request::PubRel(PubRel::new(pkid));
pending.push(request);
}
for pkid in self.outgoing_rel.ones() {
let request = Request::PubRel(PubRel::new(pkid as u16));
pending.push(request);
}
self.outgoing_rel.clear();

// remove packed ids of incoming qos2 publishes
for id in self.incoming_pub.iter_mut() {
id.take();
}
// remove packet ids of incoming qos2 publishes
self.incoming_pub.clear();

self.await_pingresp = false;
self.collision_ping_count = 0;
Expand Down Expand Up @@ -210,7 +208,7 @@ impl MqttState {
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
self.incoming_pub[pkid as usize] = Some(pkid);
self.incoming_pub.insert(pkid as usize);

if !self.manual_acks {
let pubrec = PubRec::new(pkid);
Expand Down Expand Up @@ -261,7 +259,7 @@ impl MqttState {
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
self.outgoing_rel.insert(pubrec.pkid as usize);
let pubrel = PubRel { pkid: pubrec.pkid };
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);
Expand All @@ -270,16 +268,12 @@ impl MqttState {
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;

if publish.take().is_none() {
if !self.incoming_pub.contains(pubrel.pkid as usize) {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}

self.incoming_pub.set(pubrel.pkid as usize, false);
let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
let pubcomp = PubComp { pkid: pubrel.pkid };
self.events.push_back(event);
Expand All @@ -288,17 +282,12 @@ impl MqttState {
}

fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<Option<Packet>, StateError> {
if self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?
.take()
.is_none()
{
if !self.outgoing_rel.contains(pubcomp.pkid as usize) {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}

self.outgoing_rel.set(pubcomp.pkid as usize, false);
self.inflight -= 1;
let packet = self.check_collision(pubcomp.pkid).map(|publish| {
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
Expand Down Expand Up @@ -486,7 +475,7 @@ impl MqttState {
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
self.outgoing_rel.insert(pubrel.pkid as usize);
self.inflight += 1;
Ok(pubrel)
}
Expand Down Expand Up @@ -610,10 +599,8 @@ mod test {
mqtt.handle_incoming_publish(&publish2).unwrap();
mqtt.handle_incoming_publish(&publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();

// only qos2 publish should be add to queue
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));
}

#[test]
Expand Down Expand Up @@ -656,8 +643,7 @@ mod test {
mqtt.handle_incoming_publish(&publish2).unwrap();
mqtt.handle_incoming_publish(&publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));

assert!(mqtt.events.is_empty());
}
Expand Down Expand Up @@ -725,7 +711,7 @@ mod test {
assert_eq!(backup.unwrap().pkid, 1);

// check if the qos2 element's release pkid is 2
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
assert!(mqtt.outgoing_rel.contains(2));
}

#[test]
Expand Down
53 changes: 20 additions & 33 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::mqttbytes::{self, Error as MqttError, QoS};
use super::{Event, Incoming, Outgoing, Request};

use bytes::Bytes;
use fixedbitset::FixedBitSet;
use std::collections::{HashMap, VecDeque};
use std::{io, time::Instant};

Expand Down Expand Up @@ -104,9 +105,9 @@ pub struct MqttState {
/// Outgoing QoS 1, 2 publishes which aren't acked yet
pub(crate) outgoing_pub: Vec<Option<Publish>>,
/// Packet ids of released QoS 2 publishes
pub(crate) outgoing_rel: Vec<Option<u16>>,
pub(crate) outgoing_rel: FixedBitSet,
/// Packet ids on incoming QoS 2 publishes
pub(crate) incoming_pub: Vec<Option<u16>>,
pub(crate) incoming_pub: FixedBitSet,
/// Last collision due to broker not acking in order
pub collision: Option<Publish>,
/// Buffered incoming packets
Expand Down Expand Up @@ -137,8 +138,8 @@ impl MqttState {
inflight: 0,
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1),
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1),
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand All @@ -163,17 +164,14 @@ impl MqttState {
}

// remove and collect pending releases
for rel in self.outgoing_rel.iter_mut() {
if let Some(pkid) = rel.take() {
let request = Request::PubRel(PubRel::new(pkid, None));
pending.push(request);
}
for pkid in self.outgoing_rel.ones() {
let request = Request::PubRel(PubRel::new(pkid as u16, None));
pending.push(request);
}
self.outgoing_rel.clear();

// remove packed ids of incoming qos2 publishes
for id in self.incoming_pub.iter_mut() {
id.take();
}
self.incoming_pub.clear();

self.await_pingresp = false;
self.collision_ping_count = 0;
Expand Down Expand Up @@ -349,7 +347,7 @@ impl MqttState {
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
self.incoming_pub[pkid as usize] = Some(pkid);
self.incoming_pub.insert(pkid as usize);

if !self.manual_acks {
let pubrec = PubRec::new(pkid, None);
Expand Down Expand Up @@ -416,23 +414,19 @@ impl MqttState {
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
self.outgoing_rel.insert(pubrec.pkid as usize);
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None))))
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;

if publish.take().is_none() {
if !self.incoming_pub.contains(pubrel.pkid as usize) {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}
self.incoming_pub.set(pubrel.pkid as usize, false);

if pubrel.reason != PubRelReason::Success {
return Err(StateError::PubRelFail {
Expand All @@ -456,14 +450,11 @@ impl MqttState {
Packet::Publish(publish)
});

let pubrel = self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?;
if pubrel.take().is_none() {
if !self.outgoing_rel.contains(pubcomp.pkid as usize) {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}
self.outgoing_rel.set(pubcomp.pkid as usize, false);

if pubcomp.reason != PubCompReason::Success {
return Err(StateError::PubCompFail {
Expand Down Expand Up @@ -668,7 +659,7 @@ impl MqttState {
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
self.outgoing_rel.insert(pubrel.pkid as usize);
self.inflight += 1;
Ok(pubrel)
}
Expand Down Expand Up @@ -824,10 +815,8 @@ mod test {
mqtt.handle_incoming_publish(&mut publish2).unwrap();
mqtt.handle_incoming_publish(&mut publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();

// only qos2 publish should be add to queue
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));
}

#[test]
Expand Down Expand Up @@ -870,9 +859,7 @@ mod test {
mqtt.handle_incoming_publish(&mut publish2).unwrap();
mqtt.handle_incoming_publish(&mut publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();
assert_eq!(pkid, 3);

assert!(mqtt.incoming_pub.contains(3));
assert!(mqtt.events.is_empty());
}

Expand Down Expand Up @@ -940,7 +927,7 @@ mod test {
assert_eq!(backup.unwrap().pkid, 1);

// check if the qos2 element's release pkid is 2
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
assert!(mqtt.outgoing_rel.contains(2));
}

#[test]
Expand Down