From 8c20d58523bcd5ca74c823a9f20c0f3dc5a04393 Mon Sep 17 00:00:00 2001 From: Ravi Teja Date: Sun, 16 Feb 2020 14:17:22 +0530 Subject: [PATCH] Embedding (#35) * Move deactivation into route * Feature to send bulk messages from the router * Broker struct to access router * Move tokio main to cli * Remove http specifics and expose types to better support embedding * Suback and unsuback in notifications * Handle unexpected eof in codec * Yield 'connected' notification after a connection * Move codec to core * Move codec to a module * move everything to mqtt4 module to laydown mqtt5 * Make broker compatible with the new core * Make the client compatible with the new core * Use codec instead of Async traits in the client --- rumq-broker/Cargo.toml | 1 - rumq-broker/src/connection.rs | 5 +- rumq-broker/src/httppush.rs | 86 -------------- rumq-broker/src/httpserver.rs | 45 ------- rumq-broker/src/lib.rs | 93 +++++++-------- rumq-broker/src/router.rs | 22 ++-- rumq-broker/src/state.rs | 8 +- rumq-cli/Cargo.toml | 1 + rumq-cli/src/main.rs | 8 +- rumq-client/Cargo.toml | 3 +- rumq-client/src/eventloop.rs | 101 +++++++++------- rumq-client/src/lib.rs | 8 +- rumq-client/src/state.rs | 20 +++- rumq-core/Cargo.toml | 2 + rumq-core/control | 25 ---- rumq-core/src/lib.rs | 109 +---------------- rumq-core/src/{ => mqtt4}/asyncdeserialize.rs | 29 +++-- rumq-core/src/{ => mqtt4}/asyncserialize.rs | 18 +-- .../src => rumq-core/src/mqtt4}/codec.rs | 40 ++++--- rumq-core/src/{ => mqtt4}/deserialize.rs | 6 +- rumq-core/src/mqtt4/mod.rs | 112 ++++++++++++++++++ rumq-core/src/{ => mqtt4}/packets.rs | 2 +- rumq-core/src/{ => mqtt4}/serialize.rs | 6 +- rumq-core/src/{ => mqtt4}/topic.rs | 0 rumq-core/src/mqtt5/mod.rs | 0 rumq-core/variable | 25 ---- rumqd.conf | 11 -- 27 files changed, 314 insertions(+), 472 deletions(-) delete mode 100644 rumq-broker/src/httppush.rs delete mode 100644 rumq-broker/src/httpserver.rs delete mode 100644 rumq-core/control rename rumq-core/src/{ => mqtt4}/asyncdeserialize.rs (95%) rename rumq-core/src/{ => mqtt4}/asyncserialize.rs (95%) rename {rumq-broker/src => rumq-core/src/mqtt4}/codec.rs (53%) rename rumq-core/src/{ => mqtt4}/deserialize.rs (98%) create mode 100644 rumq-core/src/mqtt4/mod.rs rename rumq-core/src/{ => mqtt4}/packets.rs (99%) rename rumq-core/src/{ => mqtt4}/serialize.rs (98%) rename rumq-core/src/{ => mqtt4}/topic.rs (100%) create mode 100644 rumq-core/src/mqtt5/mod.rs delete mode 100644 rumq-core/variable diff --git a/rumq-broker/Cargo.toml b/rumq-broker/Cargo.toml index 71754f6f..989a006b 100644 --- a/rumq-broker/Cargo.toml +++ b/rumq-broker/Cargo.toml @@ -14,7 +14,6 @@ tokio = { version = "0.2", features = ["full"] } tokio-util = { version = "0.2", features = ["codec"] } futures-util = { version = "0.3", features = ["sink"] } tokio-rustls = "0.12" -hyper = "0.13" rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.4" } derive_more = "0.99" log = "0.4" diff --git a/rumq-broker/src/connection.rs b/rumq-broker/src/connection.rs index 83e251d4..1f37876c 100644 --- a/rumq-broker/src/connection.rs +++ b/rumq-broker/src/connection.rs @@ -1,5 +1,5 @@ use derive_more::From; -use rumq_core::{connack, Packet, Connect, ConnectReturnCode}; +use rumq_core::mqtt4::{connack, Packet, Connect, ConnectReturnCode}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::error::SendError; use tokio::stream::iter; @@ -124,7 +124,8 @@ impl Connection { // eventloop which processes packets and router messages let mut incoming = &mut self.stream; - let mut incoming = time::throttle(Duration::from_millis(10), &mut incoming); + let mut incoming = time::throttle(Duration::from_millis(1), &mut incoming); + loop { let mut timeout = time::delay_for(keep_alive); let (done, routermessage) = select(&mut incoming, &mut self.this_rx, keep_alive, &mut timeout).await?; diff --git a/rumq-broker/src/httppush.rs b/rumq-broker/src/httppush.rs deleted file mode 100644 index 51505639..00000000 --- a/rumq-broker/src/httppush.rs +++ /dev/null @@ -1,86 +0,0 @@ -use crate::router::{Connection, RouterMessage}; -use crate::Config; -use derive_more::From; - -use rumq_core::{Packet, QoS}; -use tokio::sync::mpsc::{channel, Sender}; -use tokio::sync::mpsc::error::SendError; - -use hyper::body::Bytes; -use hyper::{body, Client, Request}; - -use std::mem; -use std::sync::Arc; - -#[derive(Debug, From)] -pub enum Error { - Mpsc(SendError<(String, RouterMessage)>), -} - -pub async fn start(config: Arc, mut router_tx: Sender<(String, RouterMessage)>) -> Result<(), Error> { - let (this_tx, mut this_rx) = channel(100); - let client = Client::new(); - - // construct connect router message with client id and handle to this connection - let connect = rumq_core::connect("pushclient"); - let routermessage = RouterMessage::Connect(Connection::new(connect, this_tx)); - router_tx.send(("pushclient".to_owned(), routermessage)).await?; - - let mut subscription = rumq_core::empty_subscribe(); - subscription.add(config.httppush.topic.clone(), QoS::AtLeastOnce); - - let packet = Packet::Subscribe(subscription); - let routermessage = RouterMessage::Packet(packet); - router_tx.send(("pushclient".to_owned(), routermessage)).await?; - - loop { - let packet = match this_rx.recv().await.unwrap() { - RouterMessage::Packet(p) => p, - _ => { - error!("Invalid message. Expecting only status publishes"); - continue; - } - }; - - let mut publish = match packet { - Packet::Publish(p) => p, - Packet::Suback(_s) => continue, - _ => unimplemented!(), - }; - - let payload = mem::replace(&mut publish.payload, Vec::new()); - let topic = mem::replace(&mut publish.topic_name, String::new()); - - let url = config.httppush.url.clone() + &topic; - let body = Bytes::from(payload); - - info!("Http push = {}", url); - let request = match Request::post(url).header("Content-type", "application/json").body(body.into()) { - Ok(request) => request, - Err(e) => { - error!("Post create error = {:?}", e); - continue; - } - }; - - let o = match client.request(request).await { - Ok(res) => res, - Err(e) => { - error!("Http request error = {:?}", e); - continue; - } - }; - - info!("Response = {:?}", o); - - let body_bytes = match body::to_bytes(o.into_body()).await { - Ok(bytes) => bytes, - Err(e) => { - error!("Failed creating bytes. Error = {:?}", e); - continue - } - }; - - info!("Body = {:?}", body_bytes); - } -} diff --git a/rumq-broker/src/httpserver.rs b/rumq-broker/src/httpserver.rs deleted file mode 100644 index 8ffa3e96..00000000 --- a/rumq-broker/src/httpserver.rs +++ /dev/null @@ -1,45 +0,0 @@ -use hyper::service::{make_service_fn, service_fn}; -use hyper::{body, Body, Response, Server}; -use tokio::sync::mpsc::Sender; -use rumq_core::Packet; - -use crate::router::RouterMessage; -use crate::Config; - -use std::sync::Arc; -use tokio::sync::Mutex; - -pub async fn start(config: Arc, router_tx: Sender<(String, RouterMessage)>) { - let addr = ([0, 0, 0, 0], config.httpserver.port).into(); - - let router_tx = Arc::new(Mutex::new(router_tx)); - let server = Server::bind(&addr).serve(make_service_fn(move |_| { - let router_tx = router_tx.clone(); - async move { - let server_function = service_fn(move |request| { - let router_tx = router_tx.clone(); - - async move { - info!("Request = {:?}", request); - let path = request.uri().path().to_owned(); - let body_bytes = body::to_bytes(request.into_body()).await?; - info!("Path = {:?}", path); - info!("Body = {:?}", body_bytes); - - let publish = rumq_core::publish(path, rumq_core::QoS::AtMostOnce, body_bytes.to_vec()); - let packet = Packet::Publish(publish); - let mut router_tx = router_tx.lock().await; - if let Err(e) = router_tx.send(("httpserver".to_owned(), RouterMessage::Packet(packet))).await { - error!("Failed sending data to the router. Error = {:?}", e); - } - - Ok::<_, hyper::Error>(Response::new(Body::from("Forwarding action"))) - } - }); - - Ok::<_, hyper::Error>(server_function) - } - })); - - server.await.unwrap(); -} diff --git a/rumq-broker/src/lib.rs b/rumq-broker/src/lib.rs index ca53daf6..60f49b0f 100644 --- a/rumq-broker/src/lib.rs +++ b/rumq-broker/src/lib.rs @@ -1,5 +1,3 @@ -#![recursion_limit="512"] - #[macro_use] extern crate log; @@ -14,6 +12,9 @@ use tokio_rustls::rustls::internal::pemfile::{certs, rsa_private_keys}; use tokio_rustls::rustls::TLSError; use tokio_rustls::rustls::{AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, ServerConfig}; use tokio_rustls::TlsAcceptor; +use futures_util::sink::Sink; +use futures_util::stream::Stream; +use rumq_core::mqtt4::{codec, Packet}; use serde::Deserialize; @@ -25,11 +26,11 @@ use std::time::Duration; use std::thread; mod connection; -mod httppush; -mod httpserver; -mod router; mod state; -mod codec; +mod router; + +pub use rumq_core as core; +pub use router::{RouterMessage, Connection}; #[derive(From, Debug)] pub enum Error { @@ -49,19 +50,6 @@ pub enum Error { #[derive(Debug, Deserialize, Clone)] pub struct Config { servers: Vec, - httppush: HttpPush, - httpserver: HttpServer, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct HttpPush { - url: String, - topic: String, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct HttpServer { - port: u16, } #[derive(Debug, Deserialize, Clone)] @@ -107,7 +95,7 @@ async fn tls_connection>(ca_path: Option

, cert_path: P, key_pa Ok(acceptor) } -pub async fn accept_loop(config: Arc, router_tx: Sender<(String, router::RouterMessage)>) -> Result<(), Error> { +async fn accept_loop(config: Arc, router_tx: Sender<(String, router::RouterMessage)>) -> Result<(), Error> { let addr = format!("0.0.0.0:{}", config.port); let connection_config = config.clone(); @@ -165,6 +153,10 @@ pub async fn accept_loop(config: Arc, router_tx: Sender<(String, } } + +pub trait Network: Stream> + Sink + Unpin + Send {} +impl Network for T where T: Stream> + Sink + Unpin + Send {} + #[tokio::main(core_threads = 1)] async fn router(rx: Receiver<(String, router::RouterMessage)>) { let mut router = router::Router::new(rx); @@ -173,51 +165,46 @@ async fn router(rx: Receiver<(String, router::RouterMessage)>) { } } -#[tokio::main(core_threads = 4)] -pub async fn start(config: Config) { +pub struct Broker { + config: Config, + router_handle: Sender<(String, router::RouterMessage)>, +} + +pub fn new(config: Config) -> Broker { let (router_tx, router_rx) = channel(100); - // router to route data between connections. creates an extra copy but - // might not be a big deal if we prevent clones/send fat pointers and batch thread::spawn(move || { router(router_rx) }); - let http_router_tx = router_tx.clone(); - // TODO: Remove clone on main config - let httpserver_config = Arc::new(config.clone()); - task::spawn(async move { httpserver::start(httpserver_config, http_router_tx).await }); - - // TODO: Remove clone on main config - let status_router_tx = router_tx.clone(); - let httppush_config = Arc::new(config.clone()); - task::spawn(async move { - let out = httppush::start(httppush_config, status_router_tx).await; - error!("Http routine stopped. Result = {:?}", out); - }); - - let mut servers = Vec::new(); - for server in config.servers.into_iter() { - let config = Arc::new(server); - - let fut = accept_loop(config, router_tx.clone()); - let o = task::spawn(async { - error!("Accept loop returned = {:?}", fut.await); - }); + Broker { + config, + router_handle: router_tx + } +} - servers.push(o); +impl Broker { + pub fn new_router_handle(&self) -> Sender<(String, router::RouterMessage)> { + self.router_handle.clone() } - join_all(servers).await; -} + pub async fn start(&mut self) -> Vec> { + let mut servers = Vec::new(); + let server_configs = self.config.servers.split_off(0); + for server in server_configs.into_iter() { + let config = Arc::new(server); + let fut = accept_loop(config, self.router_handle.clone()); + let o = task::spawn(async { + error!("Accept loop returned = {:?}", fut.await); + }); -use futures_util::sink::Sink; -use futures_util::stream::Stream; -use rumq_core::Packet; + servers.push(o); + } -pub trait Network: Stream> + Sink + Unpin + Send {} -impl Network for T where T: Stream> + Sink + Unpin + Send {} + join_all(servers).await + } +} #[cfg(test)] mod test { diff --git a/rumq-broker/src/router.rs b/rumq-broker/src/router.rs index 35346bb3..0c1eddb9 100644 --- a/rumq-broker/src/router.rs +++ b/rumq-broker/src/router.rs @@ -1,5 +1,5 @@ use derive_more::From; -use rumq_core::{has_wildcards, matches, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe}; +use rumq_core::mqtt4::{has_wildcards, matches, publish, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::error::TrySendError; use tokio::select; @@ -27,9 +27,9 @@ pub enum RouterMessage { /// Client id and connection handle Connect(Connection), /// Packet - Packet(rumq_core::Packet), + Packet(Packet), /// Packets - Packets(VecDeque), + Packets(VecDeque), /// Disconnects a client from active connections list. Will handling Death(String), /// Pending messages of the previous connection @@ -37,12 +37,12 @@ pub enum RouterMessage { } pub struct Connection { - pub connect: rumq_core::Connect, + pub connect: Connect, pub handle: Option> } impl Connection { - pub fn new(connect: rumq_core::Connect, handle: Sender) -> Connection { + pub fn new(connect: Connect, handle: Sender) -> Connection { Connection { connect, handle: Some(handle) @@ -59,7 +59,7 @@ impl fmt::Debug for Connection { #[derive(Debug)] struct ActiveConnection { pub state: MqttState, - pub outgoing: VecDeque, + pub outgoing: VecDeque, tx: Sender } @@ -136,7 +136,8 @@ impl Router { loop { select! { o = self.data_rx.recv() => { - let (id, mut message) = o.unwrap(); + let (id, mut message) = o.unwrap(); + debug!("In router message. Id = {}, {:?}", id, message); match self.reply(id.clone(), &mut message) { Ok(Some(message)) => self.forward(&id, message), Ok(None) => (), @@ -151,7 +152,7 @@ impl Router { error!("Routing error = {:?}", e); } } - o = interval.next() => { + _ = interval.next() => { for (_, connection) in self.active_connections.iter_mut() { let pending = connection.outgoing.split_off(0); if pending.len() > 0 { @@ -173,8 +174,6 @@ impl Router { /// inactive connections /// No routing modifications here fn reply(&mut self, id: String, message: &mut RouterMessage) -> Result, Error> { - debug!("Incoming router message. Id = {}, {:?}", id, message); - match message { RouterMessage::Connect(connection) => { let handle = connection.handle.take().unwrap(); @@ -192,7 +191,6 @@ impl Router { fn route(&mut self, id: String, message: RouterMessage) -> Result<(), Error> { match message { RouterMessage::Packet(packet) => { - debug!("Routing router message. Id = {}, {:?}", id, packet); match packet { Packet::Publish(publish) => self.match_subscriptions(&id, publish), Packet::Subscribe(subscribe) => self.add_to_subscriptions(id, subscribe), @@ -474,7 +472,7 @@ impl Router { let message = mem::replace(&mut will.message, "".to_owned()); let qos = will.qos; - let publish = rumq_core::publish(topic, qos, message); + let publish = publish(topic, qos, message); self.match_subscriptions(&id, publish); } diff --git a/rumq-broker/src/state.rs b/rumq-broker/src/state.rs index 0c13579d..c03d7da2 100644 --- a/rumq-broker/src/state.rs +++ b/rumq-broker/src/state.rs @@ -1,8 +1,10 @@ use std::{collections::VecDeque, result::Result, time::Instant}; -use rumq_core::*; - use crate::router::RouterMessage; +use rumq_core::mqtt4::{ + empty_subscribe, suback, valid_filter, valid_topic, ConnectReturnCode, LastWill, Packet, PacketIdentifier, Publish, QoS, + Subscribe, SubscribeReturnCodes, Unsubscribe, +}; #[derive(Debug)] pub enum Error { @@ -133,8 +135,6 @@ impl MqttState { /// matching packet identifier. Removal is now a O(n) operation. This should be /// usually ok in case of acks due to ack ordering in normal conditions. fn handle_incoming_puback(&mut self, pkid: PacketIdentifier) -> Result, Error> { - let pkids: Vec> = self.outgoing_publishes.iter().map(|p| p.pkid).collect(); - debug!("Pkids = {:?}", pkids); match self.outgoing_publishes.iter().position(|x| x.pkid == Some(pkid)) { Some(index) => { let _publish = self.outgoing_publishes.remove(index).expect("Wrong index"); diff --git a/rumq-cli/Cargo.toml b/rumq-cli/Cargo.toml index 2927ddc2..f289f5e2 100644 --- a/rumq-cli/Cargo.toml +++ b/rumq-cli/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tokio = { version = "0.2", features = ["full"] } structopt = "0.3" pretty_env_logger = "0.3" toml = "0.5" diff --git a/rumq-cli/src/main.rs b/rumq-cli/src/main.rs index b385bf54..149831e3 100644 --- a/rumq-cli/src/main.rs +++ b/rumq-cli/src/main.rs @@ -11,12 +11,16 @@ pub struct CommandLine { config_path: PathBuf, } -fn main() { + +#[tokio::main(core_threads = 1)] +async fn main() { pretty_env_logger::init(); let commandline = CommandLine::from_args(); let config = fs::read_to_string(commandline.config_path).unwrap(); let config = toml::from_str::(&config).unwrap(); - rumq_broker::start(config) + let mut broker = rumq_broker::new(config); + let o = broker.start().await; + println!("Result = {:?}", o); } diff --git a/rumq-client/Cargo.toml b/rumq-client/Cargo.toml index 7f7d6d02..8c989cf5 100644 --- a/rumq-client/Cargo.toml +++ b/rumq-client/Cargo.toml @@ -12,8 +12,9 @@ edition = "2018" [dependencies] derive_more = "0.15" tokio = { version = "0.2", features = ["io-util", "tcp", "dns", "sync", "time"] } +tokio-util = { version = "0.2", features = ["codec"] } +futures-util = { version = "0.3", features = ["sink"] } async-stream = "0.2" -futures-util = "0.3" webpki = "0.21" tokio-rustls = "0.12" rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.4" } diff --git a/rumq-client/src/eventloop.rs b/rumq-client/src/eventloop.rs index ffad79ff..60cf31de 100644 --- a/rumq-client/src/eventloop.rs +++ b/rumq-client/src/eventloop.rs @@ -1,11 +1,12 @@ use crate::{Notification, Request, network}; use derive_more::From; -use rumq_core::{self, Packet, Publish, PacketIdentifier, AsyncMqttRead, AsyncMqttWrite}; +use rumq_core::mqtt4::{connect, Packet, Publish, PacketIdentifier}; use futures_util::{select, pin_mut, ready, FutureExt}; use futures_util::stream::{Stream, StreamExt}; -use tokio::io::{split, AsyncRead, AsyncWrite}; +use futures_util::sink::{Sink, SinkExt}; use tokio::time::{self, Elapsed}; use tokio::stream::iter; +use tokio_util::codec::Framed; use async_stream::stream; use crate::state::{StateError, MqttState}; use crate::MqttOptions; @@ -15,6 +16,7 @@ use std::collections::VecDeque; use std::task::{Poll, Context}; use std::pin::Pin; use std::mem; +use std::io; pub struct MqttEventLoop { // intermediate state of the eventloop. this is set @@ -48,6 +50,8 @@ pub enum EventLoopError { Timeout(Elapsed), Rumq(rumq_core::Error), Network(network::Error), + Io(io::Error), + StreamDone } /// Returns an object which encompasses state of the connection. @@ -84,7 +88,10 @@ impl MqttEventLoop { pub fn stream<'eventloop>(&'eventloop mut self) -> impl Stream + 'eventloop { let stream = stream! { let mut network = match self.connect().await { - Ok(network) => network, + Ok(network) => { + yield Notification::Connected; + network + }, Err(e) => { yield Notification::StreamEnd(e); return @@ -97,9 +104,8 @@ impl MqttEventLoop { let mut pending_rel = iter(self.pending_rel.drain(..)).map(Packet::Pubrec); let mut pending = iter(self.pending_pub.drain(..)).map(Packet::Publish).chain(pending_rel); - let (network_rx, mut network_tx) = split(network); + let (mut network_tx, mut network_rx) = network.split(); let mut network_stream = network_stream(self.options.keep_alive, network_rx); - let mut request_stream = request_stream(self.options.keep_alive, self.options.throttle, &mut pending, &mut self.requests); pin_mut!(network_stream); @@ -134,10 +140,12 @@ impl MqttEventLoop { // write the reply back to the network if let Some(p) = outpacket { - if let Err(e) = network_tx.mqtt_write(&p).await { + if let Err(e) = network_tx.send(p).await { yield Notification::StreamEnd(e.into()); break } + + // network_tx.flush().await?; } // yield the notification to the user @@ -225,8 +233,12 @@ fn network_stream(keep_alive: Duration, mut network: S) -> impl stream! { loop { let timeout_packet = time::timeout(keep_alive, async { - let packet = network.mqtt_read().await; - packet + let packet = match network.next().await { + Some(o) => o?, + None => return Err(EventLoopError::StreamDone) + }; + + Ok::(packet) }).await; let packet = match timeout_packet { @@ -257,11 +269,11 @@ impl MqttEventLoop { let network= time::timeout(Duration::from_secs(5), async { if self.options.ca.is_some() { let o = network::tls_connect(&self.options).await?; - let o = Box::new(o); + let o = Box::new(Framed::new(o, rumq_core::mqtt4::codec::MqttCodec)); Ok::, EventLoopError>(o) } else { let o = network::tcp_connect(&self.options).await?; - let o = Box::new(o); + let o = Box::new(Framed::new(o, rumq_core::mqtt4::codec::MqttCodec)); Ok::, EventLoopError>(o) } }).await??; @@ -275,7 +287,7 @@ impl MqttEventLoop { let keep_alive = self.options.keep_alive().as_secs() as u16; let clean_session = self.options.clean_session(); - let mut connect = rumq_core::connect(id); + let mut connect = connect(id); connect.keep_alive = keep_alive; connect.clean_session = clean_session; @@ -285,14 +297,17 @@ impl MqttEventLoop { // mqtt connection with timeout time::timeout(Duration::from_secs(5), async { - network.mqtt_write(&Packet::Connect(connect)).await?; + network.send(Packet::Connect(connect)).await?; self.state.handle_outgoing_connect()?; Ok::<_, EventLoopError>(()) }).await??; // wait for 'timeout' time to validate connack time::timeout(Duration::from_secs(5), async { - let packet = network.mqtt_read().await?; + let packet = match network.next().await { + Some(o) => o?, + None => return Err(EventLoopError::StreamDone) + }; self.state.handle_incoming_connack(packet)?; Ok::<_, EventLoopError>(()) }).await??; @@ -313,14 +328,14 @@ impl From for Packet { } } -trait Network: AsyncWrite + AsyncRead + Unpin + Send {} -impl Network for T where T: AsyncWrite + AsyncRead + Unpin + Send {} +pub trait Network: Stream> + Sink + Unpin + Send {} +impl Network for T where T: Stream> + Sink + Unpin + Send {} -trait NetworkRead: AsyncRead + Unpin + Send {} -impl NetworkRead for T where T: AsyncRead + Unpin + Send {} +trait NetworkRead: Stream>+ Unpin + Send {} +impl NetworkRead for T where T: Stream> + Unpin + Send {} -trait NetworkWrite: AsyncWrite + Unpin + Send + Sync {} -impl NetworkWrite for T where T: AsyncWrite + Unpin + Send + Sync {} +trait NetworkWrite: Sink + Unpin + Send + Sync {} +impl NetworkWrite for T where T: Sink + Unpin + Send + Sync {} pub trait Requests: Stream + Unpin + Send + Sync {} impl Requests for T where T: Stream + Unpin + Send + Sync {} @@ -331,7 +346,7 @@ impl Packets for T where T: Stream + Unpin + Send + Sync {} #[cfg(test)] mod test { - use rumq_core::*; + use rumq_core::mqtt4::*; use tokio::sync::mpsc::{channel, Sender}; use tokio::net::{TcpListener, TcpStream}; use tokio::{time, task}; @@ -513,7 +528,7 @@ mod test { let mut broker = broker(1887, true).await; for i in 1..=10 { - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; if i > inflight { assert!(packet.is_none()); @@ -551,27 +566,27 @@ mod test { let mut broker = broker(1888, true).await; // packet 1 - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_some()); // packet 2 - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_some()); // packet 3 - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_some()); // packet 4 - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_none()); // ack packet 1 and we should receiver packet 4 broker.ack(PacketIdentifier(1)).await; - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_some()); // packet 5 - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_none()); // ack packet 2 and we should receiver packet 5 broker.ack(PacketIdentifier(2)).await; - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert!(packet.is_some()); } @@ -608,7 +623,7 @@ mod test { { let mut broker = broker(1889, true).await; for i in 1..=2 { - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert_eq!(PacketIdentifier(i), packet.unwrap()); broker.ack(packet.unwrap()).await; } @@ -618,7 +633,7 @@ mod test { { let mut broker = broker(1889, true).await; for i in 3..=4 { - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert_eq!(PacketIdentifier(i), packet.unwrap()); broker.ack(packet.unwrap()).await; } @@ -658,7 +673,7 @@ mod test { { let mut broker = broker(1890, true).await; for i in 1..=2 { - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert_eq!(PacketIdentifier(i), packet.unwrap()); } } @@ -667,7 +682,7 @@ mod test { { let mut broker = broker(1890, true).await; for i in 1..=6 { - let packet = broker.mqtt_read().await; + let packet = broker.async_mqtt_read().await; assert_eq!(PacketIdentifier(i), packet.unwrap()); } } @@ -692,16 +707,16 @@ mod test { stream: TcpStream } - async fn broker(port: u16, connack: bool) -> Broker { + async fn broker(port: u16, ack: bool) -> Broker { let addr = format!("127.0.0.1:{}", port); let mut listener = TcpListener::bind(&addr).await.unwrap(); let (mut stream, _) = listener.accept().await.unwrap(); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); if let Packet::Connect(_) = packet { - if connack { - let connack = rumq_core::connack(ConnectReturnCode::Accepted, false); - stream.mqtt_write(&Packet::Connack(connack)).await.unwrap(); + if ack { + let connack = connack(ConnectReturnCode::Accepted, false); + stream.async_mqtt_write(&Packet::Connack(connack)).await.unwrap(); } } @@ -712,9 +727,9 @@ mod test { impl Broker { // reads a packet from the stream with 2 second timeout - async fn mqtt_read(&mut self) -> Option { + async fn async_mqtt_read(&mut self) -> Option { let mqtt_read = time::timeout(Duration::from_secs(2), async { - self.stream.mqtt_read().await.unwrap() + self.stream.async_mqtt_read().await.unwrap() }); if let Ok(Packet::Publish(publish)) = mqtt_read.await { @@ -726,19 +741,19 @@ mod test { async fn ack(&mut self, pkid: PacketIdentifier) { let packet = Packet::Puback(pkid); - self.stream.mqtt_write(&packet).await.unwrap(); + self.stream.async_mqtt_write(&packet).await.unwrap(); self.stream.flush().await.unwrap(); } fn stream(mut self) -> impl Stream { let stream = stream! { loop { - let packet = self.stream.mqtt_read().await.unwrap(); + let packet = self.stream.async_mqtt_read().await.unwrap(); match packet { Packet::Connect(_) => { - let connack = rumq_core::connack(ConnectReturnCode::Accepted, false); - self.stream.mqtt_write(&Packet::Connack(connack)).await.unwrap(); + let connack = connack(ConnectReturnCode::Accepted, false); + self.stream.async_mqtt_write(&Packet::Connack(connack)).await.unwrap(); } p => yield p } diff --git a/rumq-client/src/lib.rs b/rumq-client/src/lib.rs index 67665c02..5dc3db76 100644 --- a/rumq-client/src/lib.rs +++ b/rumq-client/src/lib.rs @@ -11,20 +11,20 @@ pub(crate) mod state; pub use eventloop::eventloop; pub use eventloop::{EventLoopError, MqttEventLoop}; -pub use rumq_core::*; +pub use rumq_core::mqtt4::*; pub use state::MqttState; /// Incoming notifications from the broker #[derive(Debug)] pub enum Notification { - Reconnection, - Disconnection, + Connected, Publish(Publish), Puback(PacketIdentifier), Pubrec(PacketIdentifier), Pubrel(PacketIdentifier), Pubcomp(PacketIdentifier), - Suback(PacketIdentifier), + Suback(Suback), + Unsuback(PacketIdentifier), StreamEnd(EventLoopError), } diff --git a/rumq-client/src/state.rs b/rumq-client/src/state.rs index 2acaf191..1bfa5776 100644 --- a/rumq-client/src/state.rs +++ b/rumq-client/src/state.rs @@ -2,7 +2,7 @@ use crate::Notification; use std::{collections::VecDeque, result::Result, time::Instant}; -use rumq_core::*; +use rumq_core::mqtt4::*; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum MqttConnectionStatus { @@ -120,8 +120,8 @@ impl MqttState { let out = match packet { Packet::Pingresp => self.handle_incoming_pingresp(), Packet::Publish(publish) => self.handle_incoming_publish(publish.clone()), - Packet::Suback(_pkid) => Ok((None, None)), - Packet::Unsuback(_pkid) => Ok((None, None)), + Packet::Suback(suback) => self.handle_incoming_suback(suback), + Packet::Unsuback(pkid) => self.handle_incoming_unsuback(pkid), Packet::Puback(pkid) => self.handle_incoming_puback(pkid), Packet::Pubrec(pkid) => self.handle_incoming_pubrec(pkid), Packet::Pubrel(pkid) => self.handle_incoming_pubrel(pkid), @@ -174,6 +174,18 @@ impl MqttState { } } + pub fn handle_incoming_suback(&mut self, suback: Suback) -> Result<(Option, Option), StateError> { + let request = None; + let notification = Some(Notification::Suback(suback)); + Ok((notification, request)) + } + + pub fn handle_incoming_unsuback(&mut self, pkid: PacketIdentifier) -> Result<(Option, Option), StateError> { + let request = None; + let notification = Some(Notification::Unsuback(pkid)); + Ok((notification, request)) + } + /// Iterates through the list of stored publishes and removes the publish with the /// matching packet identifier. Removal is now a O(n) operation. This should be /// usually ok in case of acks due to ack ordering in normal conditions. But in cases @@ -360,7 +372,7 @@ impl MqttState { mod test { use super::{MqttConnectionStatus, MqttState, Packet, StateError}; use crate::{MqttOptions, Notification}; - use rumq_core::*; + use rumq_core::mqtt4::*; fn build_outgoing_publish(qos: QoS) -> Publish { let topic = "hello/world".to_owned(); diff --git a/rumq-core/Cargo.toml b/rumq-core/Cargo.toml index 27cf5a1a..916ce33a 100644 --- a/rumq-core/Cargo.toml +++ b/rumq-core/Cargo.toml @@ -8,6 +8,8 @@ authors = ["tekjar"] edition = "2018" [dependencies] +bytes = "0.5" +tokio-util = { version = "0.2", features = ["codec"] } derive_more = "0.15" tokio = { version = "0.2", features = ["io-util"] } async-trait = "0.1" diff --git a/rumq-core/control b/rumq-core/control deleted file mode 100644 index 08e6b652..00000000 --- a/rumq-core/control +++ /dev/null @@ -1,25 +0,0 @@ - -running 13 tests -test deserialize::test::read_packet_connack_works ... ignored -test deserialize::test::read_packet_connect_mqtt_protocol ... ignored -test deserialize::test::read_packet_puback_works ... ignored -test deserialize::test::read_packet_publish_qos0_works ... ignored -test deserialize::test::read_packet_publish_qos1_works ... ignored -test deserialize::test::read_packet_suback_works ... ignored -test deserialize::test::read_packet_subscribe_works ... ignored -test deserialize::test::read_packet_unsubscribe_works ... ignored -test serialize::test::write_packet_connack_works ... ignored -test serialize::test::write_packet_connect_mqtt_protocol_works ... ignored -test serialize::test::write_packet_publish_at_least_once_works ... ignored -test serialize::test::write_packet_publish_at_most_once_works ... ignored -test serialize::test::write_packet_subscribe_works ... ignored - -test result: ok. 0 passed; 0 failed; 13 ignored; 0 measured; 0 filtered out - - -running 2 tests -test publish_deserialize_perf ... bench: 656 ns/iter (+/- 3) -test publish_serialize_perf ... bench: 316 ns/iter (+/- 13) - -test result: ok. 0 passed; 0 failed; 0 ignored; 2 measured - diff --git a/rumq-core/src/lib.rs b/rumq-core/src/lib.rs index a432ff5e..7162cb44 100644 --- a/rumq-core/src/lib.rs +++ b/rumq-core/src/lib.rs @@ -2,114 +2,7 @@ use derive_more::From; use std::io; use std::string::FromUtf8Error; -mod asyncdeserialize; -mod asyncserialize; -mod deserialize; -mod packets; -mod serialize; -mod topic; - -pub use asyncdeserialize::AsyncMqttRead; -pub use asyncserialize::AsyncMqttWrite; -pub use deserialize::MqttRead; -pub use packets::*; -pub use serialize::MqttWrite; -pub use topic::*; - -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] -pub enum QoS { - AtMostOnce = 0, - AtLeastOnce = 1, - ExactlyOnce = 2, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u8)] -pub enum PacketType { - Connect = 1, - Connack, - Publish, - Puback, - Pubrec, - Pubrel, - Pubcomp, - Subscribe, - Suback, - Unsubscribe, - Unsuback, - Pingreq, - Pingresp, - Disconnect, -} - -#[derive(Debug, Clone, PartialEq)] -pub enum Packet { - Connect(Connect), - Connack(Connack), - Publish(Publish), - Puback(PacketIdentifier), - Pubrec(PacketIdentifier), - Pubrel(PacketIdentifier), - Pubcomp(PacketIdentifier), - Subscribe(Subscribe), - Suback(Suback), - Unsubscribe(Unsubscribe), - Unsuback(PacketIdentifier), - Pingreq, - Pingresp, - Disconnect, -} - -/// 7 3 0 -/// +--------------------------+--------------------------+ -/// byte 1 | MQTT Control Packet Type | Flags for each type | -/// +--------------------------+--------------------------+ -/// | Remaining Bytes Len (1 - 4 bytes) | -/// +-----------------------------------------------------+ -/// -/// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_2.2_- - -pub fn qos(num: u8) -> Result { - match num { - 0 => Ok(QoS::AtMostOnce), - 1 => Ok(QoS::AtLeastOnce), - 2 => Ok(QoS::ExactlyOnce), - _ => Err(Error::UnsupportedQoS), - } -} - -pub fn packet_type(num: u8) -> Result { - match num { - 1 => Ok(PacketType::Connect), - 2 => Ok(PacketType::Connack), - 3 => Ok(PacketType::Publish), - 4 => Ok(PacketType::Puback), - 5 => Ok(PacketType::Pubrec), - 6 => Ok(PacketType::Pubrel), - 7 => Ok(PacketType::Pubcomp), - 8 => Ok(PacketType::Subscribe), - 9 => Ok(PacketType::Suback), - 10 => Ok(PacketType::Unsubscribe), - 11 => Ok(PacketType::Unsuback), - 12 => Ok(PacketType::Pingreq), - 13 => Ok(PacketType::Pingresp), - 14 => Ok(PacketType::Disconnect), - _ => Err(Error::UnsupportedPacketType(num)), - } -} - -pub fn connect_return(num: u8) -> Result { - match num { - 0 => Ok(ConnectReturnCode::Accepted), - 1 => Ok(ConnectReturnCode::BadUsernamePassword), - 2 => Ok(ConnectReturnCode::NotAuthorized), - 3 => Ok(ConnectReturnCode::RefusedIdentifierRejected), - 4 => Ok(ConnectReturnCode::RefusedProtocolVersion), - 5 => Ok(ConnectReturnCode::ServerUnavailable), - _ => Err(Error::InvalidConnectReturnCode(num)), - } -} +pub mod mqtt4; #[derive(Debug, From)] pub enum Error { diff --git a/rumq-core/src/asyncdeserialize.rs b/rumq-core/src/mqtt4/asyncdeserialize.rs similarity index 95% rename from rumq-core/src/asyncdeserialize.rs rename to rumq-core/src/mqtt4/asyncdeserialize.rs index 033bfb77..9d26a650 100644 --- a/rumq-core/src/asyncdeserialize.rs +++ b/rumq-core/src/mqtt4/asyncdeserialize.rs @@ -1,18 +1,17 @@ -use crate::*; - +use crate::mqtt4::*; use async_trait::async_trait; use tokio::io::AsyncReadExt; #[async_trait] pub trait AsyncMqttRead: AsyncReadExt + Unpin { - async fn mqtt_read(&mut self) -> Result { + async fn async_mqtt_read(&mut self) -> Result { let packet_type = self.read_u8().await?; let remaining_len = self.read_remaining_length().await?; - self.deserialize(packet_type, remaining_len).await + self.async_deserialize(packet_type, remaining_len).await } - async fn deserialize(&mut self, byte1: u8, remaining_len: usize) -> Result { + async fn async_deserialize(&mut self, byte1: u8, remaining_len: usize) -> Result { let kind = packet_type(byte1 >> 4)?; if remaining_len == 0 { @@ -271,8 +270,8 @@ impl AsyncMqttRead for R {} #[cfg(test)] mod test { use super::AsyncMqttRead; - use crate::{Connack, Connect, Packet, Publish, Suback, Subscribe, Unsubscribe}; - use crate::{ + use super::{Connack, Connect, Packet, Publish, Suback, Subscribe, Unsubscribe}; + use super::{ ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeReturnCodes, SubscribeTopic, }; @@ -294,7 +293,7 @@ mod test { 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, @@ -322,7 +321,7 @@ mod test { 0x01, 0x00, // variable header. connack flags, connect return code 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, @@ -343,7 +342,7 @@ mod test { 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, @@ -367,7 +366,7 @@ mod test { 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, @@ -389,7 +388,7 @@ mod test { 0x00, 0x0A, // fixed header. packet identifier = 10 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!(packet, Packet::Puback(PacketIdentifier(10))); } @@ -408,7 +407,7 @@ mod test { 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, @@ -443,7 +442,7 @@ mod test { 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, @@ -463,7 +462,7 @@ mod test { 0xDE, 0xAD, 0xBE, 0xEF // extra packets in the stream ]); - let packet = stream.mqtt_read().await.unwrap(); + let packet = stream.async_mqtt_read().await.unwrap(); assert_eq!( packet, diff --git a/rumq-core/src/asyncserialize.rs b/rumq-core/src/mqtt4/asyncserialize.rs similarity index 95% rename from rumq-core/src/asyncserialize.rs rename to rumq-core/src/mqtt4/asyncserialize.rs index 6f884dee..0971f9c8 100644 --- a/rumq-core/src/asyncserialize.rs +++ b/rumq-core/src/mqtt4/asyncserialize.rs @@ -1,11 +1,11 @@ -use crate::*; +use crate::mqtt4::*; use async_trait::async_trait; use tokio::io::AsyncWriteExt; #[async_trait] pub trait AsyncMqttWrite: AsyncWriteExt + Unpin { - async fn mqtt_write(&mut self, packet: &Packet) -> Result<(), Error> { + async fn async_mqtt_write(&mut self, packet: &Packet) -> Result<(), Error> { match packet { Packet::Connect(connect) => { self.write_u8(0b00010000).await?; @@ -181,8 +181,8 @@ impl AsyncMqttWrite for W {} #[cfg(test)] mod test { use super::AsyncMqttWrite; - use crate::{Connack, Connect, Packet, Publish, Subscribe}; - use crate::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeTopic}; + use super::{Connack, Connect, Packet, Publish, Subscribe}; + use super::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeTopic}; #[tokio::test] async fn write_packet_connect_mqtt_protocol_works() { @@ -202,7 +202,7 @@ mod test { }); let mut stream = Vec::new(); - stream.mqtt_write(&connect).await.unwrap(); + stream.async_mqtt_write(&connect).await.unwrap(); assert_eq!( stream.clone(), @@ -227,7 +227,7 @@ mod test { }); let mut stream = Vec::new(); - stream.mqtt_write(&connack).await.unwrap(); + stream.async_mqtt_write(&connack).await.unwrap(); assert_eq!(stream, vec![0b00100000, 0x02, 0x01, 0x00]); } @@ -244,7 +244,7 @@ mod test { }); let mut stream = Vec::new(); - stream.mqtt_write(&publish).await.unwrap(); + stream.async_mqtt_write(&publish).await.unwrap(); assert_eq!( stream, @@ -264,7 +264,7 @@ mod test { }); let mut stream = Vec::new(); - stream.mqtt_write(&publish).await.unwrap(); + stream.async_mqtt_write(&publish).await.unwrap(); assert_eq!( stream, @@ -293,7 +293,7 @@ mod test { }); let mut stream = Vec::new(); - stream.mqtt_write(&subscribe).await.unwrap(); + stream.async_mqtt_write(&subscribe).await.unwrap(); assert_eq!( stream, diff --git a/rumq-broker/src/codec.rs b/rumq-core/src/mqtt4/codec.rs similarity index 53% rename from rumq-broker/src/codec.rs rename to rumq-core/src/mqtt4/codec.rs index 9b4ffd49..69264396 100644 --- a/rumq-broker/src/codec.rs +++ b/rumq-core/src/mqtt4/codec.rs @@ -1,9 +1,14 @@ use bytes::buf::Buf; use bytes::BytesMut; -use rumq_core::{self, Error, MqttRead, MqttWrite, Packet}; -use std::io::{self, Cursor, ErrorKind::TimedOut, ErrorKind::WouldBlock}; use tokio_util::codec::{Decoder, Encoder}; +use std::io; +use std::io::{Cursor, ErrorKind::TimedOut, ErrorKind::UnexpectedEof, ErrorKind::WouldBlock}; + +use crate::mqtt4::Packet; +use crate::mqtt4::{MqttRead, MqttWrite}; +use crate::Error; + pub struct MqttCodec; impl MqttCodec { @@ -14,24 +19,31 @@ impl MqttCodec { impl Decoder for MqttCodec { type Item = Packet; - type Error = rumq_core::Error; + type Error = Error; - fn decode(&mut self, buf: &mut BytesMut) -> Result, rumq_core::Error> { - // NOTE: `decode` might be called with `buf.len == 0` when prevous - // decode call read all the bytes in the stream. We should return - // Ok(None) in those cases or else the `read` call will return - // Ok(0) => translated to UnexpectedEOF by `byteorder` crate. - // `read` call Ok(0) happens when buffer specified was 0 bytes in len - // https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + // NOTE: `decode` might be called with `buf.len == 0`. We should return + // Ok(None) in those cases or else the internal `read_exact` call will return UnexpectedEOF if buf.len() < 2 { return Ok(None); } + // TODO: Better implementation by taking packet type and remaining len into account here + // Maybe the next `decode` call can wait for publish size byts to be filled in `buf` before being + // invoked again + // TODO: Find how the underlying implementation invokes this method. Is there an + // implementation with size awareness? let mut buf_ref = buf.as_ref(); - let (packet_type, remaining_len) = match buf_ref.read_packet_type_and_remaining_length() { Ok(len) => len, - Err(Error::Io(e)) if e.kind() == TimedOut || e.kind() == WouldBlock => return Ok(None), + // Not being able to fill `buf_ref` entirely is also UnexpectedEof + // This would be a problem if `buf` len is 2 and if the packet is not ping or other 2 + // byte len, `read_packet_type_and_remaining_length` call tries reading more than 2 bytes + // from `buf` and results in Ok(0) which translates to Eof error when target buffer is + // not completely full + // https://doc.rust-lang.org/stable/std/io/trait.Read.html#tymethod.read + // https://doc.rust-lang.org/stable/src/std/io/mod.rs.html#501-944 + Err(Error::Io(e)) if e.kind() == TimedOut || e.kind() == WouldBlock || e.kind() == UnexpectedEof => return Ok(None), Err(e) => return Err(e.into()), }; @@ -43,7 +55,6 @@ impl Decoder for MqttCodec { // and the next time decode` gets called, there will be more bytes in `buf`, // hopefully enough to frame the packet if buf.len() < len { - buf.reserve(len); return Ok(None); } @@ -61,8 +72,7 @@ impl Encoder for MqttCodec { let mut stream = Cursor::new(Vec::new()); // TODO: Implement `write_packet` for `&mut BytesMut` - if let Err(e) = stream.mqtt_write(&msg) { - error!("Encode error. Error = {:?}", e); + if let Err(_) = stream.mqtt_write(&msg) { return Err(io::Error::new(io::ErrorKind::Other, "Unable to encode!")); } diff --git a/rumq-core/src/deserialize.rs b/rumq-core/src/mqtt4/deserialize.rs similarity index 98% rename from rumq-core/src/deserialize.rs rename to rumq-core/src/mqtt4/deserialize.rs index 92a9ac17..dc84f630 100644 --- a/rumq-core/src/deserialize.rs +++ b/rumq-core/src/mqtt4/deserialize.rs @@ -1,4 +1,4 @@ -use crate::*; +use crate::mqtt4::*; use byteorder::ReadBytesExt; use std::io::Read; @@ -283,8 +283,8 @@ impl MqttRead for R {} #[cfg(test)] mod test { use super::MqttRead; - use crate::{Connack, Connect, Packet, Publish, Suback, Subscribe, Unsubscribe}; - use crate::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeReturnCodes, SubscribeTopic}; + use crate::mqtt4::{Connack, Connect, Packet, Publish, Suback, Subscribe, Unsubscribe}; + use crate::mqtt4::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeReturnCodes, SubscribeTopic}; use std::io::Cursor; #[test] diff --git a/rumq-core/src/mqtt4/mod.rs b/rumq-core/src/mqtt4/mod.rs new file mode 100644 index 00000000..ab6de547 --- /dev/null +++ b/rumq-core/src/mqtt4/mod.rs @@ -0,0 +1,112 @@ +mod asyncdeserialize; +mod asyncserialize; +mod deserialize; +mod packets; +mod serialize; +mod topic; + +pub mod codec; + +pub use asyncdeserialize::AsyncMqttRead; +pub use asyncserialize::AsyncMqttWrite; +pub use deserialize::MqttRead; +pub use packets::*; +pub use serialize::MqttWrite; +pub use topic::*; + +use crate::Error; + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] +pub enum QoS { + AtMostOnce = 0, + AtLeastOnce = 1, + ExactlyOnce = 2, +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PacketType { + Connect = 1, + Connack, + Publish, + Puback, + Pubrec, + Pubrel, + Pubcomp, + Subscribe, + Suback, + Unsubscribe, + Unsuback, + Pingreq, + Pingresp, + Disconnect, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum Packet { + Connect(Connect), + Connack(Connack), + Publish(Publish), + Puback(PacketIdentifier), + Pubrec(PacketIdentifier), + Pubrel(PacketIdentifier), + Pubcomp(PacketIdentifier), + Subscribe(Subscribe), + Suback(Suback), + Unsubscribe(Unsubscribe), + Unsuback(PacketIdentifier), + Pingreq, + Pingresp, + Disconnect, +} + +/// 7 3 0 +/// +--------------------------+--------------------------+ +/// byte 1 | MQTT Control Packet Type | Flags for each type | +/// +--------------------------+--------------------------+ +/// | Remaining Bytes Len (1 - 4 bytes) | +/// +-----------------------------------------------------+ +/// +/// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_2.2_- + +pub fn qos(num: u8) -> Result { + match num { + 0 => Ok(QoS::AtMostOnce), + 1 => Ok(QoS::AtLeastOnce), + 2 => Ok(QoS::ExactlyOnce), + _ => Err(Error::UnsupportedQoS), + } +} + +pub fn packet_type(num: u8) -> Result { + match num { + 1 => Ok(PacketType::Connect), + 2 => Ok(PacketType::Connack), + 3 => Ok(PacketType::Publish), + 4 => Ok(PacketType::Puback), + 5 => Ok(PacketType::Pubrec), + 6 => Ok(PacketType::Pubrel), + 7 => Ok(PacketType::Pubcomp), + 8 => Ok(PacketType::Subscribe), + 9 => Ok(PacketType::Suback), + 10 => Ok(PacketType::Unsubscribe), + 11 => Ok(PacketType::Unsuback), + 12 => Ok(PacketType::Pingreq), + 13 => Ok(PacketType::Pingresp), + 14 => Ok(PacketType::Disconnect), + _ => Err(Error::UnsupportedPacketType(num)), + } +} + +pub fn connect_return(num: u8) -> Result { + match num { + 0 => Ok(ConnectReturnCode::Accepted), + 1 => Ok(ConnectReturnCode::BadUsernamePassword), + 2 => Ok(ConnectReturnCode::NotAuthorized), + 3 => Ok(ConnectReturnCode::RefusedIdentifierRejected), + 4 => Ok(ConnectReturnCode::RefusedProtocolVersion), + 5 => Ok(ConnectReturnCode::ServerUnavailable), + _ => Err(Error::InvalidConnectReturnCode(num)), + } +} diff --git a/rumq-core/src/packets.rs b/rumq-core/src/mqtt4/packets.rs similarity index 99% rename from rumq-core/src/packets.rs rename to rumq-core/src/mqtt4/packets.rs index 2250d018..d0aef9fd 100644 --- a/rumq-core/src/packets.rs +++ b/rumq-core/src/mqtt4/packets.rs @@ -1,6 +1,6 @@ use derive_more::From; -use crate::QoS; +use crate::mqtt4::QoS; use std::fmt; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, From)] diff --git a/rumq-core/src/serialize.rs b/rumq-core/src/mqtt4/serialize.rs similarity index 98% rename from rumq-core/src/serialize.rs rename to rumq-core/src/mqtt4/serialize.rs index b716eb65..55bf322c 100644 --- a/rumq-core/src/serialize.rs +++ b/rumq-core/src/mqtt4/serialize.rs @@ -1,4 +1,4 @@ -use crate::*; +use crate::mqtt4::*; use byteorder::WriteBytesExt; @@ -183,8 +183,8 @@ impl MqttWrite for W {} #[cfg(test)] mod test { use super::MqttWrite; - use crate::{Connack, Connect, Packet, Publish, Subscribe}; - use crate::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeTopic}; + use crate::mqtt4::{Connack, Connect, Packet, Publish, Subscribe}; + use crate::mqtt4::{ConnectReturnCode, LastWill, PacketIdentifier, Protocol, QoS, SubscribeTopic}; #[test] fn write_packet_connect_mqtt_protocol_works() { diff --git a/rumq-core/src/topic.rs b/rumq-core/src/mqtt4/topic.rs similarity index 100% rename from rumq-core/src/topic.rs rename to rumq-core/src/mqtt4/topic.rs diff --git a/rumq-core/src/mqtt5/mod.rs b/rumq-core/src/mqtt5/mod.rs new file mode 100644 index 00000000..e69de29b diff --git a/rumq-core/variable b/rumq-core/variable deleted file mode 100644 index ed8d9b22..00000000 --- a/rumq-core/variable +++ /dev/null @@ -1,25 +0,0 @@ - -running 13 tests -test deserialize::test::read_packet_connack_works ... ignored -test deserialize::test::read_packet_connect_mqtt_protocol ... ignored -test deserialize::test::read_packet_puback_works ... ignored -test deserialize::test::read_packet_publish_qos0_works ... ignored -test deserialize::test::read_packet_publish_qos1_works ... ignored -test deserialize::test::read_packet_suback_works ... ignored -test deserialize::test::read_packet_subscribe_works ... ignored -test deserialize::test::read_packet_unsubscribe_works ... ignored -test serialize::test::write_packet_connack_works ... ignored -test serialize::test::write_packet_connect_mqtt_protocol_works ... ignored -test serialize::test::write_packet_publish_at_least_once_works ... ignored -test serialize::test::write_packet_publish_at_most_once_works ... ignored -test serialize::test::write_packet_subscribe_works ... ignored - -test result: ok. 0 passed; 0 failed; 13 ignored; 0 measured; 0 filtered out - - -running 2 tests -test publish_deserialize_perf ... bench: 669 ns/iter (+/- 5) -test publish_serialize_perf ... bench: 317 ns/iter (+/- 1) - -test result: ok. 0 passed; 0 failed; 0 ignored; 2 measured - diff --git a/rumqd.conf b/rumqd.conf index a9d9ba8e..48ad1d5e 100644 --- a/rumqd.conf +++ b/rumqd.conf @@ -35,14 +35,3 @@ key_path = "tlsfiles/server.key.pem" # provide ca_path to enable client authentication ca_path = "tlsfiles/ca-chain.cert.pem" - - -[httppush] -url = "http://f061ff43.ngrok.io/api/v1" -# topic on which push client subscribes the router -# 'device-1' should be replaced by a wildcard in this particular case -topic = "/devices/+/action/status" - - -[httpserver] -port = 8080