Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
wangwen-4220 committed May 22, 2024
2 parents 8ebcd37 + 83d8f77 commit 1809ef0
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 42 deletions.
4 changes: 4 additions & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
* `Auth` packet as per MQTT5 standards

### Changed

* rename `N` as `AsyncReadWrite` to describe usage.
* use `Framed` to encode/decode MQTT packets.
* use `Login` to store credentials
* Made `DisconnectProperties` struct public.

### Deprecated

Expand All @@ -29,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes
* Filter PUBACK in pending save requests to fix unexpected PUBACK sent to reconnected broker.
* Resume session only if broker sends `CONNACK` with `session_present == 1`.

### Security

Expand Down
1 change: 1 addition & 0 deletions rumqttc/examples/async_manual_acks_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ fn create_conn() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_session_expiry_interval(u32::MAX.into())
.set_manual_acks(true)
.set_clean_start(false);

Expand Down
20 changes: 12 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,24 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

return Ok(Event::Incoming(connack));
return Ok(Event::Incoming(Packet::ConnAck(connack)));
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -302,14 +308,14 @@ impl EventLoop {
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
let connack = mqtt_connect(mqtt_options, &mut network).await?;

Ok((network, packet))
Ok((network, connack))
}

pub(crate) async fn socket_connect(
Expand Down Expand Up @@ -477,7 +483,7 @@ async fn network_connect(
async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();
Expand All @@ -493,9 +499,7 @@ async fn mqtt_connect(

// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
Ok(Packet::ConnAck(connack))
}
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack),
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(packet)),
}
Expand Down
32 changes: 19 additions & 13 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,25 @@ impl EventLoop {
connect(&mut self.options),
)
.await??;
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive)));
}

self.state.handle_incoming_packet(connack)?;
self.state
.handle_incoming_packet(Incoming::ConnAck(connack))?;
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -271,19 +278,14 @@ impl EventLoop {
/// the stream.
/// This function (for convenience) includes internal delays for users to perform internal sleeps
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
async fn connect(options: &mut MqttOptions) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(options, &mut network).await?;

// Last session might contain packets which aren't acked. MQTT says these packets should be
// republished in the next session
// move pending messages from state to eventloop
// let pending = self.state.clean();
// self.pending = pending.into_iter();
Ok((network, packet))
let connack = mqtt_connect(options, &mut network).await?;

Ok((network, connack))
}

async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionError> {
Expand Down Expand Up @@ -395,7 +397,7 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
async fn mqtt_connect(
options: &mut MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_start = options.clean_start();
let client_id = options.client_id();
Expand All @@ -414,14 +416,18 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
// Override local keep_alive value if set by server.
if let Some(props) = &connack.properties {
if let Some(keep_alive) = props.server_keep_alive {
options.keep_alive = Duration::from_secs(keep_alive as u64);
}
network.set_max_outgoing_size(props.max_packet_size);

// Override local session_expiry_interval value if set by server.
if props.session_expiry_interval.is_some() {
options.set_session_expiry_interval(props.session_expiry_interval);
}
}
Ok(Packet::ConnAck(connack))
Ok(connack)
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
Expand Down
21 changes: 21 additions & 0 deletions rumqttc/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,27 @@ impl MqttOptions {
self.connect_properties.clone()
}

/// set session expiry interval on connection properties
pub fn set_session_expiry_interval(&mut self, interval: Option<u32>) -> &mut Self {
if let Some(conn_props) = &mut self.connect_properties {
conn_props.session_expiry_interval = interval;
self
} else {
let mut conn_props = ConnectProperties::new();
conn_props.session_expiry_interval = interval;
self.set_connect_properties(conn_props)
}
}

/// get session expiry interval on connection properties
pub fn session_expiry_interval(&self) -> Option<u32> {
if let Some(conn_props) = &self.connect_properties {
conn_props.session_expiry_interval
} else {
None
}
}

/// set receive maximum on connection properties
pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
if let Some(conn_props) = &mut self.connect_properties {
Expand Down
Loading

0 comments on commit 1809ef0

Please sign in to comment.