Skip to content

Commit

Permalink
Embedding (#35)
Browse files Browse the repository at this point in the history
* Move deactivation into route

* Feature to send bulk messages from the router

* Broker struct to access router

* Move tokio main to cli

* Remove http specifics and expose types to better support embedding

* Suback and unsuback in notifications

* Handle unexpected eof in codec

* Yield 'connected' notification after a connection

* Move codec to core

* Move codec to a module

* move everything to mqtt4 module to laydown mqtt5

* Make broker compatible with the new core

* Make the client compatible with the new core

* Use codec instead of Async traits in the client
  • Loading branch information
Ravi Teja authored Feb 16, 2020
1 parent 30725be commit 8c20d58
Show file tree
Hide file tree
Showing 27 changed files with 314 additions and 472 deletions.
1 change: 0 additions & 1 deletion rumq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ tokio = { version = "0.2", features = ["full"] }
tokio-util = { version = "0.2", features = ["codec"] }
futures-util = { version = "0.3", features = ["sink"] }
tokio-rustls = "0.12"
hyper = "0.13"
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.4" }
derive_more = "0.99"
log = "0.4"
Expand Down
5 changes: 3 additions & 2 deletions rumq-broker/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use derive_more::From;
use rumq_core::{connack, Packet, Connect, ConnectReturnCode};
use rumq_core::mqtt4::{connack, Packet, Connect, ConnectReturnCode};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::error::SendError;
use tokio::stream::iter;
Expand Down Expand Up @@ -124,7 +124,8 @@ impl<S: Network> Connection<S> {

// eventloop which processes packets and router messages
let mut incoming = &mut self.stream;
let mut incoming = time::throttle(Duration::from_millis(10), &mut incoming);
let mut incoming = time::throttle(Duration::from_millis(1), &mut incoming);

loop {
let mut timeout = time::delay_for(keep_alive);
let (done, routermessage) = select(&mut incoming, &mut self.this_rx, keep_alive, &mut timeout).await?;
Expand Down
86 changes: 0 additions & 86 deletions rumq-broker/src/httppush.rs

This file was deleted.

45 changes: 0 additions & 45 deletions rumq-broker/src/httpserver.rs

This file was deleted.

93 changes: 40 additions & 53 deletions rumq-broker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![recursion_limit="512"]

#[macro_use]
extern crate log;

Expand All @@ -14,6 +12,9 @@ use tokio_rustls::rustls::internal::pemfile::{certs, rsa_private_keys};
use tokio_rustls::rustls::TLSError;
use tokio_rustls::rustls::{AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, ServerConfig};
use tokio_rustls::TlsAcceptor;
use futures_util::sink::Sink;
use futures_util::stream::Stream;
use rumq_core::mqtt4::{codec, Packet};

use serde::Deserialize;

Expand All @@ -25,11 +26,11 @@ use std::time::Duration;
use std::thread;

mod connection;
mod httppush;
mod httpserver;
mod router;
mod state;
mod codec;
mod router;

pub use rumq_core as core;
pub use router::{RouterMessage, Connection};

#[derive(From, Debug)]
pub enum Error {
Expand All @@ -49,19 +50,6 @@ pub enum Error {
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
servers: Vec<ServerSettings>,
httppush: HttpPush,
httpserver: HttpServer,
}

#[derive(Debug, Deserialize, Clone)]
pub struct HttpPush {
url: String,
topic: String,
}

#[derive(Debug, Deserialize, Clone)]
pub struct HttpServer {
port: u16,
}

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -107,7 +95,7 @@ async fn tls_connection<P: AsRef<Path>>(ca_path: Option<P>, cert_path: P, key_pa
Ok(acceptor)
}

pub async fn accept_loop(config: Arc<ServerSettings>, router_tx: Sender<(String, router::RouterMessage)>) -> Result<(), Error> {
async fn accept_loop(config: Arc<ServerSettings>, router_tx: Sender<(String, router::RouterMessage)>) -> Result<(), Error> {
let addr = format!("0.0.0.0:{}", config.port);
let connection_config = config.clone();

Expand Down Expand Up @@ -165,6 +153,10 @@ pub async fn accept_loop(config: Arc<ServerSettings>, router_tx: Sender<(String,
}
}


pub trait Network: Stream<Item = Result<Packet, rumq_core::Error>> + Sink<Packet, Error = io::Error> + Unpin + Send {}
impl<T> Network for T where T: Stream<Item = Result<Packet, rumq_core::Error>> + Sink<Packet, Error = io::Error> + Unpin + Send {}

#[tokio::main(core_threads = 1)]
async fn router(rx: Receiver<(String, router::RouterMessage)>) {
let mut router = router::Router::new(rx);
Expand All @@ -173,51 +165,46 @@ async fn router(rx: Receiver<(String, router::RouterMessage)>) {
}
}

#[tokio::main(core_threads = 4)]
pub async fn start(config: Config) {
pub struct Broker {
config: Config,
router_handle: Sender<(String, router::RouterMessage)>,
}

pub fn new(config: Config) -> Broker {
let (router_tx, router_rx) = channel(100);

// router to route data between connections. creates an extra copy but
// might not be a big deal if we prevent clones/send fat pointers and batch
thread::spawn(move || {
router(router_rx)
});

let http_router_tx = router_tx.clone();
// TODO: Remove clone on main config
let httpserver_config = Arc::new(config.clone());
task::spawn(async move { httpserver::start(httpserver_config, http_router_tx).await });

// TODO: Remove clone on main config
let status_router_tx = router_tx.clone();
let httppush_config = Arc::new(config.clone());
task::spawn(async move {
let out = httppush::start(httppush_config, status_router_tx).await;
error!("Http routine stopped. Result = {:?}", out);
});

let mut servers = Vec::new();
for server in config.servers.into_iter() {
let config = Arc::new(server);

let fut = accept_loop(config, router_tx.clone());
let o = task::spawn(async {
error!("Accept loop returned = {:?}", fut.await);
});
Broker {
config,
router_handle: router_tx
}
}

servers.push(o);
impl Broker {
pub fn new_router_handle(&self) -> Sender<(String, router::RouterMessage)> {
self.router_handle.clone()
}

join_all(servers).await;
}
pub async fn start(&mut self) -> Vec<Result<(), task::JoinError>> {
let mut servers = Vec::new();
let server_configs = self.config.servers.split_off(0);

for server in server_configs.into_iter() {
let config = Arc::new(server);
let fut = accept_loop(config, self.router_handle.clone());
let o = task::spawn(async {
error!("Accept loop returned = {:?}", fut.await);
});

use futures_util::sink::Sink;
use futures_util::stream::Stream;
use rumq_core::Packet;
servers.push(o);
}

pub trait Network: Stream<Item = Result<Packet, rumq_core::Error>> + Sink<Packet, Error = io::Error> + Unpin + Send {}
impl<T> Network for T where T: Stream<Item = Result<Packet, rumq_core::Error>> + Sink<Packet, Error = io::Error> + Unpin + Send {}
join_all(servers).await
}
}

#[cfg(test)]
mod test {
Expand Down
22 changes: 10 additions & 12 deletions rumq-broker/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use derive_more::From;
use rumq_core::{has_wildcards, matches, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe};
use rumq_core::mqtt4::{has_wildcards, matches, publish, QoS, Packet, Connect, Publish, Subscribe, Unsubscribe};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::error::TrySendError;
use tokio::select;
Expand Down Expand Up @@ -27,22 +27,22 @@ pub enum RouterMessage {
/// Client id and connection handle
Connect(Connection),
/// Packet
Packet(rumq_core::Packet),
Packet(Packet),
/// Packets
Packets(VecDeque<rumq_core::Packet>),
Packets(VecDeque<Packet>),
/// Disconnects a client from active connections list. Will handling
Death(String),
/// Pending messages of the previous connection
Pending(VecDeque<Publish>)
}

pub struct Connection {
pub connect: rumq_core::Connect,
pub connect: Connect,
pub handle: Option<Sender<RouterMessage>>
}

impl Connection {
pub fn new(connect: rumq_core::Connect, handle: Sender<RouterMessage>) -> Connection {
pub fn new(connect: Connect, handle: Sender<RouterMessage>) -> Connection {
Connection {
connect,
handle: Some(handle)
Expand All @@ -59,7 +59,7 @@ impl fmt::Debug for Connection {
#[derive(Debug)]
struct ActiveConnection {
pub state: MqttState,
pub outgoing: VecDeque<rumq_core::Packet>,
pub outgoing: VecDeque<Packet>,
tx: Sender<RouterMessage>
}

Expand Down Expand Up @@ -136,7 +136,8 @@ impl Router {
loop {
select! {
o = self.data_rx.recv() => {
let (id, mut message) = o.unwrap();
let (id, mut message) = o.unwrap();
debug!("In router message. Id = {}, {:?}", id, message);
match self.reply(id.clone(), &mut message) {
Ok(Some(message)) => self.forward(&id, message),
Ok(None) => (),
Expand All @@ -151,7 +152,7 @@ impl Router {
error!("Routing error = {:?}", e);
}
}
o = interval.next() => {
_ = interval.next() => {
for (_, connection) in self.active_connections.iter_mut() {
let pending = connection.outgoing.split_off(0);
if pending.len() > 0 {
Expand All @@ -173,8 +174,6 @@ impl Router {
/// inactive connections
/// No routing modifications here
fn reply(&mut self, id: String, message: &mut RouterMessage) -> Result<Option<RouterMessage>, Error> {
debug!("Incoming router message. Id = {}, {:?}", id, message);

match message {
RouterMessage::Connect(connection) => {
let handle = connection.handle.take().unwrap();
Expand All @@ -192,7 +191,6 @@ impl Router {
fn route(&mut self, id: String, message: RouterMessage) -> Result<(), Error> {
match message {
RouterMessage::Packet(packet) => {
debug!("Routing router message. Id = {}, {:?}", id, packet);
match packet {
Packet::Publish(publish) => self.match_subscriptions(&id, publish),
Packet::Subscribe(subscribe) => self.add_to_subscriptions(id, subscribe),
Expand Down Expand Up @@ -474,7 +472,7 @@ impl Router {
let message = mem::replace(&mut will.message, "".to_owned());
let qos = will.qos;

let publish = rumq_core::publish(topic, qos, message);
let publish = publish(topic, qos, message);
self.match_subscriptions(&id, publish);
}

Expand Down
Loading

0 comments on commit 8c20d58

Please sign in to comment.