Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rumqttd): assign random client identifier to clients #709

Merged
merged 14 commits into from
Feb 21, 2024
30 changes: 19 additions & 11 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::link::local::{LinkError, LinkRx, LinkTx};
use crate::link::network;
use crate::link::network::Network;
use crate::local::LinkBuilder;
use crate::protocol::{Connect, Packet, Protocol};
use crate::protocol::{ConnAck, Connect, ConnectReturnCode, Packet, Protocol};
use crate::router::{Event, Notification};
use crate::{ConnectionId, ConnectionSettings};

Expand Down Expand Up @@ -66,14 +66,15 @@ impl<P: Protocol> RemoteLink<P> {
mut network: Network<P>,
connect_packet: Packet,
dynamic_filters: bool,
assigned_client_id: Option<String>,
) -> Result<RemoteLink<P>, Error> {
let Packet::Connect(connect, props, lastwill, lastwill_props, _) = connect_packet else {
return Err(Error::NotConnectPacket(connect_packet));
};

// Register this connection with the router. Router replys with ack which if ok will
// start the link. Router can sometimes reject the connection (ex max connection limit)
let client_id = &connect.client_id;
let client_id = assigned_client_id.as_ref().unwrap_or(&connect.client_id);
let clean_session = connect.clean_session;

let topic_alias_max = props.as_ref().and_then(|p| p.topic_alias_max);
Expand Down Expand Up @@ -103,8 +104,13 @@ impl<P: Protocol> RemoteLink<P> {
let id = link_rx.id();
Span::current().record("connection_id", id);

if let Some(packet) = notification.into() {
network.write(packet).await?;
if let Some(mut packet) = notification.into() {
if let Packet::ConnAck(_ack, props) = &mut packet {
let mut new_props = props.clone().unwrap_or_default();
new_props.assigned_client_identifier = assigned_client_id;
*props = Some(new_props);
network.write(packet).await?;
}
swanandx marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(RemoteLink {
Expand Down Expand Up @@ -209,16 +215,18 @@ where
return Err(Error::ZeroKeepAlive);
}

// Register this connection with the router. Router replys with ack which if ok will
// start the link. Router can sometimes reject the connection (ex max connection limit)
let empty_client_id = connect.client_id.is_empty();
h3nill marked this conversation as resolved.
Show resolved Hide resolved
let clean_session = connect.clean_session;

if cfg!(feature = "allow-duplicate-clientid") {
if !clean_session && empty_client_id {
return Err(Error::InvalidClientId);
}
} else if empty_client_id {
if empty_client_id && !clean_session {
let ack = ConnAck {
session_present: false,
code: ConnectReturnCode::ClientIdentifierNotValid,
};

let packet = Packet::ConnAck(ack, None);
network.write(packet).await?;

return Err(Error::InvalidClientId);
}
swanandx marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
1 change: 0 additions & 1 deletion rumqttd/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ pub struct Login {
pub enum ConnectReturnCode {
Success,
RefusedProtocolVersion,
BadClientId,
ServiceUnavailable,
UnspecifiedError,
MalformedPacket,
Expand Down
4 changes: 2 additions & 2 deletions rumqttd/src/protocol/v4/connack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
match num {
0 => Ok(ConnectReturnCode::Success),
1 => Ok(ConnectReturnCode::RefusedProtocolVersion),
2 => Ok(ConnectReturnCode::BadClientId),
2 => Ok(ConnectReturnCode::ClientIdentifierNotValid),
3 => Ok(ConnectReturnCode::ServiceUnavailable),
4 => Ok(ConnectReturnCode::BadUserNamePassword),
5 => Ok(ConnectReturnCode::NotAuthorized),
Expand All @@ -51,7 +51,7 @@ fn connect_code(return_code: ConnectReturnCode) -> u8 {
match return_code {
ConnectReturnCode::Success => 0,
ConnectReturnCode::RefusedProtocolVersion => 1,
ConnectReturnCode::BadClientId => 2,
ConnectReturnCode::ClientIdentifierNotValid => 2,
swanandx marked this conversation as resolved.
Show resolved Hide resolved
ConnectReturnCode::ServiceUnavailable => 3,
ConnectReturnCode::BadUserNamePassword => 4,
ConnectReturnCode::NotAuthorized => 5,
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ impl Router {
let tenant_prefix = tenant_id.map(|id| format!("/tenants/{id}/"));

let Some((will, will_props)) = self.last_wills.remove(&client_id) else {
return
return;
};

let publish = Publish {
Expand Down
13 changes: 13 additions & 0 deletions rumqttd/src/server/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::link::console;
use crate::link::local::{self, LinkRx, LinkTx};
use crate::router::{Event, Router};
use crate::{Config, ConnectionId, ServerSettings};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::error::Elapsed;
use tokio::{task, time};
Expand Down Expand Up @@ -478,6 +480,16 @@ async fn remote<P: Protocol>(
_ => unreachable!(),
};

let mut assigned_client_id = None;
if client_id.is_empty() {
client_id = thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect();
assigned_client_id = Some(client_id.clone());
}
swanandx marked this conversation as resolved.
Show resolved Hide resolved

if let Some(tenant_id) = &tenant_id {
// client_id is set to "tenant_id.client_id"
// this is to make sure we are consistent,
Expand Down Expand Up @@ -507,6 +519,7 @@ async fn remote<P: Protocol>(
network,
connect_packet,
dynamic_filters,
assigned_client_id,
)
.await
{
Expand Down