From 05e10e4bb28249c34e789fd081da584ef3c5f191 Mon Sep 17 00:00:00 2001 From: Fedor Smirnov Date: Sun, 19 May 2024 10:06:28 +0200 Subject: [PATCH 1/3] feat(mqtt-client): add option to set nodelay flag for TCP connection This extension allows users to specify whether the `nodelay` flag is set for the TCP connection from the MQTT client to the broker. This flag can help reduce latency by disabling Nagle's algorithm, which is beneficial in scenarios requiring minimal delay. Signed off: FedorSmirnov89 Issue: https://github.com/bytebeamio/rumqtt/issues/871 --- rumqttc/CHANGELOG.md | 3 ++ rumqttc/examples/asyncpubsub_nodelay.rs | 57 +++++++++++++++++++++++++ rumqttc/src/eventloop.rs | 4 ++ rumqttc/src/lib.rs | 6 +++ 4 files changed, 70 insertions(+) create mode 100644 rumqttc/examples/asyncpubsub_nodelay.rs diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 7e7b4585..2d80a39d 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -14,12 +14,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection * `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`. * `Auth` packet as per MQTT5 standards +* `tcp_nodelay` field on `NetworkOptions` type to enable configuring `tcp_nodelay` for the client +* `set_tcp_nodelay` method on `Networkoptions` ### Changed * rename `N` as `AsyncReadWrite` to describe usage. * use `Framed` to encode/decode MQTT packets. * use `Login` to store credentials +* check `tcp_nodelay` field of `NetworkOptions` when setting up the connection socket to set tcp_nodelay if the flag is set ### Deprecated diff --git a/rumqttc/examples/asyncpubsub_nodelay.rs b/rumqttc/examples/asyncpubsub_nodelay.rs new file mode 100644 index 00000000..2671420a --- /dev/null +++ b/rumqttc/examples/asyncpubsub_nodelay.rs @@ -0,0 +1,57 @@ +//! Same as asyncpubsub.rs but extended by configuring the client to not batch the messages +//! sent to the broker over TCP + +use tokio::{task, time}; + +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + // Configuring the client to not batch the messages sent to the broker over TCP + eventloop.network_options.set_tcp_nodelay(true); + + task::spawn(async move { + requests(client).await; + time::sleep(Duration::from_secs(3)).await; + }); + + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {v:?}"); + } + Err(e) => { + println!("Error = {e:?}"); + return Ok(()); + } + } + } +} + +async fn requests(client: AsyncClient) { + client + .subscribe("hello/world", QoS::AtMostOnce) + .await + .unwrap(); + + for i in 1..=10 { + client + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) + .await + .unwrap(); + + time::sleep(Duration::from_secs(1)).await; + } + + time::sleep(Duration::from_secs(120)).await; +} diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index a9b1ce8c..6c3d3f0c 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -317,6 +317,10 @@ pub(crate) async fn socket_connect( SocketAddr::V6(_) => TcpSocket::new_v6()?, }; + if let Some(nodelay) = network_options.tcp_nodelay { + socket.set_nodelay(nodelay)?; + } + if let Some(send_buff_size) = network_options.tcp_send_buffer_size { socket.set_send_buffer_size(send_buff_size).unwrap(); } diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 29cad1a3..cd1f2b4e 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -369,6 +369,7 @@ impl From for TlsConfiguration { pub struct NetworkOptions { tcp_send_buffer_size: Option, tcp_recv_buffer_size: Option, + tcp_nodelay: Option, conn_timeout: u64, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] bind_device: Option, @@ -379,12 +380,17 @@ impl NetworkOptions { NetworkOptions { tcp_send_buffer_size: None, tcp_recv_buffer_size: None, + tcp_nodelay: None, conn_timeout: 5, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] bind_device: None, } } + pub fn set_tcp_nodelay(&mut self, nodelay: bool) { + self.tcp_nodelay = Some(nodelay); + } + pub fn set_tcp_send_buffer_size(&mut self, size: u32) { self.tcp_send_buffer_size = Some(size); } From 84821f27f3f8c3a886f2c67c2bb052bbc8217f61 Mon Sep 17 00:00:00 2001 From: Fedor Smirnov Date: Mon, 27 May 2024 19:35:52 +0200 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Devdutt Shenoi --- rumqttc/CHANGELOG.md | 4 +--- rumqttc/src/eventloop.rs | 4 +--- rumqttc/src/lib.rs | 6 +++--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 2d80a39d..91ab78dc 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -14,15 +14,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection * `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`. * `Auth` packet as per MQTT5 standards -* `tcp_nodelay` field on `NetworkOptions` type to enable configuring `tcp_nodelay` for the client -* `set_tcp_nodelay` method on `Networkoptions` +* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions` ### Changed * rename `N` as `AsyncReadWrite` to describe usage. * use `Framed` to encode/decode MQTT packets. * use `Login` to store credentials -* check `tcp_nodelay` field of `NetworkOptions` when setting up the connection socket to set tcp_nodelay if the flag is set ### Deprecated diff --git a/rumqttc/src/eventloop.rs b/rumqttc/src/eventloop.rs index 6c3d3f0c..548cd237 100644 --- a/rumqttc/src/eventloop.rs +++ b/rumqttc/src/eventloop.rs @@ -317,9 +317,7 @@ pub(crate) async fn socket_connect( SocketAddr::V6(_) => TcpSocket::new_v6()?, }; - if let Some(nodelay) = network_options.tcp_nodelay { - socket.set_nodelay(nodelay)?; - } + socket.set_nodelay(network_options.tcp_nodelay)?; if let Some(send_buff_size) = network_options.tcp_send_buffer_size { socket.set_send_buffer_size(send_buff_size).unwrap(); diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index cd1f2b4e..93887bb1 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -369,7 +369,7 @@ impl From for TlsConfiguration { pub struct NetworkOptions { tcp_send_buffer_size: Option, tcp_recv_buffer_size: Option, - tcp_nodelay: Option, + tcp_nodelay: bool, conn_timeout: u64, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] bind_device: Option, @@ -380,7 +380,7 @@ impl NetworkOptions { NetworkOptions { tcp_send_buffer_size: None, tcp_recv_buffer_size: None, - tcp_nodelay: None, + tcp_nodelay: false, conn_timeout: 5, #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))] bind_device: None, @@ -388,7 +388,7 @@ impl NetworkOptions { } pub fn set_tcp_nodelay(&mut self, nodelay: bool) { - self.tcp_nodelay = Some(nodelay); + self.tcp_nodelay = nodelay; } pub fn set_tcp_send_buffer_size(&mut self, size: u32) { From e29c6849d0bd65a2aad21a8b78e0e39de01612bb Mon Sep 17 00:00:00 2001 From: Fedor Smirnov Date: Mon, 27 May 2024 19:38:05 +0200 Subject: [PATCH 3/3] Removed the nodelay example following review suggestion --- rumqttc/examples/asyncpubsub_nodelay.rs | 57 ------------------------- 1 file changed, 57 deletions(-) delete mode 100644 rumqttc/examples/asyncpubsub_nodelay.rs diff --git a/rumqttc/examples/asyncpubsub_nodelay.rs b/rumqttc/examples/asyncpubsub_nodelay.rs deleted file mode 100644 index 2671420a..00000000 --- a/rumqttc/examples/asyncpubsub_nodelay.rs +++ /dev/null @@ -1,57 +0,0 @@ -//! Same as asyncpubsub.rs but extended by configuring the client to not batch the messages -//! sent to the broker over TCP - -use tokio::{task, time}; - -use rumqttc::{AsyncClient, MqttOptions, QoS}; -use std::error::Error; -use std::time::Duration; - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - // Configuring the client to not batch the messages sent to the broker over TCP - eventloop.network_options.set_tcp_nodelay(true); - - task::spawn(async move { - requests(client).await; - time::sleep(Duration::from_secs(3)).await; - }); - - loop { - let event = eventloop.poll().await; - match &event { - Ok(v) => { - println!("Event = {v:?}"); - } - Err(e) => { - println!("Error = {e:?}"); - return Ok(()); - } - } - } -} - -async fn requests(client: AsyncClient) { - client - .subscribe("hello/world", QoS::AtMostOnce) - .await - .unwrap(); - - for i in 1..=10 { - client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) - .await - .unwrap(); - - time::sleep(Duration::from_secs(1)).await; - } - - time::sleep(Duration::from_secs(120)).await; -}