Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Will messages and will delay interval #686

Merged
merged 9 commits into from
Sep 2, 2023
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ for more information look at rumqttc's [README](https://github.com/bytebeamio/ru
- [x] QoS 0 and 1
- [x] Connection via TLS
- [x] Retransmission after reconnect
- [ ] Last will
- [x] Last will
- [x] Retained messages
- [x] QoS 2
- [ ] MQTT 5
Expand Down
2 changes: 2 additions & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Will delay interval for MQTTv5 (#686)

### Changed

Expand All @@ -17,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Link and its implementation which were deprecated.

### Fixed
- Will Messages
- Retained Messages

### Security
Expand Down
15 changes: 14 additions & 1 deletion rumqttd/src/link/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::protocol::{Filter, LastWill, Packet, Publish, QoS, RetainForwardRule, Subscribe};
use crate::protocol::{
Filter, LastWill, LastWillProperties, Packet, Publish, QoS, RetainForwardRule, Subscribe,
};
use crate::router::Ack;
use crate::router::{
iobufs::{Incoming, Outgoing},
Expand Down Expand Up @@ -41,6 +43,7 @@ pub struct LinkBuilder<'a> {
// true by default
clean_session: bool,
last_will: Option<LastWill>,
last_will_properties: Option<LastWillProperties>,
// false by default
dynamic_filters: bool,
// default to 0, indicating to not use topic alias
Expand All @@ -55,6 +58,7 @@ impl<'a> LinkBuilder<'a> {
tenant_id: None,
clean_session: true,
last_will: None,
last_will_properties: None,
dynamic_filters: false,
topic_alias_max: 0,
}
Expand All @@ -70,6 +74,14 @@ impl<'a> LinkBuilder<'a> {
self
}

pub fn last_will_properties(
mut self,
last_will_properties: Option<LastWillProperties>,
) -> Self {
self.last_will_properties = last_will_properties;
self
}

pub fn topic_alias_max(mut self, max: u16) -> Self {
self.topic_alias_max = max;
self
Expand All @@ -93,6 +105,7 @@ impl<'a> LinkBuilder<'a> {
self.client_id.to_owned(),
self.clean_session,
self.last_will,
self.last_will_properties,
self.dynamic_filters,
self.topic_alias_max,
);
Expand Down
144 changes: 89 additions & 55 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::router::{Event, Notification};
use crate::{ConnectionId, ConnectionSettings};

use flume::{RecvError, SendError, Sender, TrySendError};
use std::cmp::min;
use std::collections::VecDeque;
use std::io;
use std::sync::Arc;
Expand All @@ -31,6 +32,8 @@ pub enum Error {
Send(#[from] SendError<(ConnectionId, Event)>),
#[error("Channel recv error")]
Recv(#[from] RecvError),
#[error("Got new session, disconnecting old one")]
SessionEnd,
#[error("Persistent session requires valid client id")]
InvalidClientId,
#[error("Unexpected router message")]
Expand All @@ -48,84 +51,51 @@ pub enum Error {
/// Orchestrates between Router and Network.
pub struct RemoteLink<P> {
connect: Connect,
pub(crate) client_id: String,
pub(crate) connection_id: ConnectionId,
network: Network<P>,
link_tx: LinkTx,
link_rx: LinkRx,
notifications: VecDeque<Notification>,
pub(crate) will_delay_interval: u32,
}

impl<P: Protocol> RemoteLink<P> {
pub async fn new(
config: Arc<ConnectionSettings>,
router_tx: Sender<(ConnectionId, Event)>,
tenant_id: Option<String>,
mut network: Network<P>,
connect_packet: Packet,
dynamic_filters: bool,
) -> Result<RemoteLink<P>, Error> {
// Wait for MQTT connect packet and error out if it's not received in time to prevent
// DOS attacks by filling total connections that the server can handle with idle open
// connections which results in server rejecting new connections
let connection_timeout_ms = config.connection_timeout_ms.into();
let dynamic_filters = config.dynamic_filters;
let packet = time::timeout(Duration::from_millis(connection_timeout_ms), async {
let packet = network.read().await?;
Ok::<_, network::Error>(packet)
})
.await??;

let (connect, props, lastwill, login) = match packet {
Packet::Connect(connect, props, _lastwill, _, login) => {
Span::current().record("client_id", &connect.client_id);

// Ignore last will
(connect, props, None, login)
}
packet => return Err(Error::NotConnectPacket(packet)),
let Packet::Connect(connect, props, lastwill, lastwill_props, _) = connect_packet else {
return Err(Error::NotConnectPacket(connect_packet));
};

// If authentication is configured in config file check for username and password
if let Some(auths) = &config.auth {
// if authentication is configured and connect packet doesn't have login details return
// an error
if let Some(login) = login {
let is_authenticated = auths
.iter()
.any(|(user, pass)| (user, pass) == (&login.username, &login.password));

if !is_authenticated {
return Err(Error::InvalidAuth);
}
} else {
return Err(Error::InvalidAuth);
}
}

// When keep_alive feature is disabled client can live forever, which is not good in
// distributed broker context so currenlty we don't allow it.
if connect.keep_alive == 0 {
return Err(Error::ZeroKeepAlive);
}

// Register this connection with the router. Router replys with ack which if ok will
// start the link. Router can sometimes reject the connection (ex max connection limit)
let client_id = connect.client_id.clone();
let client_id = &connect.client_id;
let clean_session = connect.clean_session;

if cfg!(feature = "allow-duplicate-clientid") {
if !clean_session && client_id.is_empty() {
return Err(Error::InvalidClientId);
}
} else if client_id.is_empty() {
return Err(Error::InvalidClientId);
}
let topic_alias_max = props.as_ref().and_then(|p| p.topic_alias_max);
let session_expiry = props
.as_ref()
.and_then(|p| p.session_expiry_interval)
.unwrap_or(0);

let delay_interval = lastwill_props
.as_ref()
.and_then(|f| f.delay_interval)
.unwrap_or(0);

let topic_alias_max = props.and_then(|p| p.topic_alias_max);
// The Server delays publishing the Client’s Will Message until
// the Will Delay Interval has passed or the Session ends, whichever happens first
let will_delay_interval = min(session_expiry, delay_interval);

let (link_tx, link_rx, notification) = LinkBuilder::new(&client_id, router_tx)
let (link_tx, link_rx, notification) = LinkBuilder::new(client_id, router_tx)
.tenant_id(tenant_id)
.clean_session(clean_session)
.last_will(lastwill)
.last_will_properties(lastwill_props)
.dynamic_filters(dynamic_filters)
.topic_alias_max(topic_alias_max.unwrap_or(0))
.build()?;
Expand All @@ -139,12 +109,12 @@ impl<P: Protocol> RemoteLink<P> {

Ok(RemoteLink {
connect,
client_id,
connection_id: id,
network,
link_tx,
link_rx,
notifications: VecDeque::with_capacity(100),
will_delay_interval,
})
}

Expand Down Expand Up @@ -191,3 +161,67 @@ impl<P: Protocol> RemoteLink<P> {
}
}
}

/// Read MQTT connect packet from network and verify it.
/// authentication and checks are done here.
pub async fn mqtt_connect<P>(
config: Arc<ConnectionSettings>,
network: &mut Network<P>,
) -> Result<Packet, Error>
where
P: Protocol,
{
// Wait for MQTT connect packet and error out if it's not received in time to prevent
// DOS attacks by filling total connections that the server can handle with idle open
// connections which results in server rejecting new connections
let connection_timeout_ms = config.connection_timeout_ms.into();
let packet = time::timeout(Duration::from_millis(connection_timeout_ms), async {
let packet = network.read().await?;
Ok::<_, network::Error>(packet)
})
.await??;

let (connect, _props, login) = match packet {
Packet::Connect(ref connect, ref props, _, _, ref login) => (connect, props, login),
packet => return Err(Error::NotConnectPacket(packet)),
};

// If authentication is configured in config file check for username and password
if let Some(auths) = &config.auth {
// if authentication is configured and connect packet doesn't have login details return
// an error
if let Some(login) = login {
let is_authenticated = auths
.iter()
.any(|(user, pass)| (user, pass) == (&login.username, &login.password));

if !is_authenticated {
return Err(Error::InvalidAuth);
}
} else {
return Err(Error::InvalidAuth);
}
}

// When keep_alive feature is disabled client can live forever, which is not good in
// distributed broker context so currenlty we don't allow it.
if connect.keep_alive == 0 {
return Err(Error::ZeroKeepAlive);
}

// Register this connection with the router. Router replys with ack which if ok will
// start the link. Router can sometimes reject the connection (ex max connection limit)
let empty_client_id = connect.client_id.is_empty();
let clean_session = connect.clean_session;

if cfg!(feature = "allow-duplicate-clientid") {
if !clean_session && empty_client_id {
return Err(Error::InvalidClientId);
}
} else if empty_client_id {
return Err(Error::InvalidClientId);
}

// Ok((connect, props, lastwill, lastwill_props))
Ok(packet)
}
5 changes: 5 additions & 0 deletions rumqttd/src/router/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use slab::Slab;

use crate::protocol::LastWillProperties;
use crate::Filter;
use crate::{protocol::LastWill, Topic};
use std::collections::{HashMap, HashSet};
Expand All @@ -22,6 +23,8 @@ pub struct Connection {
pub subscriptions: HashSet<Filter>,
/// Last will of this connection
pub last_will: Option<LastWill>,
/// Properties of Last will
pub last_will_properties: Option<LastWillProperties>,
/// Connection events
pub events: ConnectionEvents,
/// Topic aliases set by clients
Expand All @@ -39,6 +42,7 @@ impl Connection {
client_id: String,
clean: bool,
last_will: Option<LastWill>,
last_will_properties: Option<LastWillProperties>,
dynamic_filters: bool,
topic_alias_max: u16,
) -> Connection {
Expand Down Expand Up @@ -67,6 +71,7 @@ impl Connection {
clean,
subscriptions: HashSet::default(),
last_will,
last_will_properties,
events: ConnectionEvents::default(),
topic_aliases: HashMap::new(),
broker_topic_aliases,
Expand Down
4 changes: 3 additions & 1 deletion rumqttd/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum Event {
/// Data for native commitlog
DeviceData,
/// Disconnection request
Disconnect(Disconnection),
Disconnect,
/// Shadow
Shadow(ShadowRequest),
/// Collect and send alerts to all alerts links
Expand All @@ -63,6 +63,8 @@ pub enum Event {
SendMeters,
/// Get metrics of a connection or all connections
PrintStatus(Print),
/// Publish Will message
PublishWill((String, Option<String>)),
}

/// Notification from router to connection
Expand Down
Loading