Skip to content

Commit

Permalink
Connection stream (#13)
Browse files Browse the repository at this point in the history
* Make connection part of the stream and remove reconnection feature

* Update versions

* Add discord badge

* Remove discord header
  • Loading branch information
Ravi Teja authored Dec 11, 2019
1 parent 8591cb2 commit cdc6393
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 144 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# rumq [![img](https://github.com/tekjar/rumq/workflows/CI/badge.svg)](https://github.com/tekjar/rumq/actions)
# rumq
[![img](https://github.com/tekjar/rumq/workflows/CI/badge.svg)](https://github.com/tekjar/rumq/actions)
[![img](https://img.shields.io/discord/633193308033646605?style=flat-square)](https://discord.gg/mpkSqDg)

MQTT ecosystem in rust which strives to be simple, robust and performant

Expand Down
Empty file added rg.diff
Empty file.
2 changes: 1 addition & 1 deletion rumq-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumq-cli"
version = "0.1.0-alpha.1"
version = "0.1.0-alpha.2"
description = "Commandline mqtt utilities to replace mosquitto_sub and mosquitto_pub"
license = "MIT"
repository = "https://github.com/tekjar/rumq"
Expand Down
4 changes: 2 additions & 2 deletions rumq-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rumq-client"
version = "0.1.0-alpha.1"
version = "0.1.0-alpha.2"
description = "An efficeint and robust mqtt client for your connected devices"
license = "MIT"
repository = "https://github.com/tekjar/rumq"
Expand All @@ -16,7 +16,7 @@ async-stream = "0.2"
futures-util = "0.3"
webpki = "0.21"
tokio-rustls = "0.12"
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.1" }
rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.2" }
log = "0.4"
pin-project = "0.4"

Expand Down
6 changes: 3 additions & 3 deletions rumq-client/examples/gcloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::ops::Add;
use std::env;
use std::fs;

use rumq_client::{self, MqttOptions, ReconnectOptions, Request, MqttEventLoop, eventloop};
use rumq_client::{self, MqttOptions, Request, MqttEventLoop, eventloop};
use serde::{Serialize, Deserialize};
use jsonwebtoken::{encode, Algorithm, Header};
use futures_util::stream::StreamExt;
Expand All @@ -21,7 +21,7 @@ async fn main() {

let (requests_tx, requests_rx) = channel(1);
let mqttoptions = gcloud();
let mut eventloop = eventloop(mqttoptions, requests_rx).await.unwrap();
let mut eventloop = eventloop(mqttoptions, requests_rx);

thread::spawn(move || {
requests(requests_tx);
Expand All @@ -33,7 +33,7 @@ async fn main() {
}

async fn stream_it(eventloop: &mut MqttEventLoop) {
let mut stream = eventloop.stream(ReconnectOptions::Never);
let mut stream = eventloop.stream();

while let Some(item) = stream.next().await {
println!("{:?}", item);
Expand Down
6 changes: 3 additions & 3 deletions rumq-client/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::mpsc::{channel, Sender};
use tokio::task;
use tokio::time;

use rumq_client::{self, MqttOptions, ReconnectOptions, Request, MqttEventLoop, QoS, eventloop};
use rumq_client::{self, MqttOptions, Request, MqttEventLoop, QoS, eventloop};
use std::time::Duration;

#[tokio::main(basic_scheduler)]
Expand All @@ -15,7 +15,7 @@ async fn main() {
let (requests_tx, requests_rx) = channel(10);
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(5).set_throttle(Duration::from_secs(1));
let mut eventloop = eventloop(mqttoptions, requests_rx).await.unwrap();
let mut eventloop = eventloop(mqttoptions, requests_rx);

thread::spawn(move || {
requests(requests_tx);
Expand All @@ -29,7 +29,7 @@ async fn main() {


async fn stream_it(eventloop: &mut MqttEventLoop) {
let mut stream = eventloop.stream(ReconnectOptions::Always(Duration::from_secs(5)));
let mut stream = eventloop.stream();

while let Some(item) = stream.next().await {
println!("{:?}", item);
Expand Down
6 changes: 3 additions & 3 deletions rumq-client/examples/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::mpsc::{channel, Sender};
use tokio::task;
use tokio::time;

use rumq_client::{self, MqttOptions, ReconnectOptions, Request, MqttEventLoop, eventloop};
use rumq_client::{self, MqttOptions, Request, MqttEventLoop, eventloop};
use std::time::Duration;

#[tokio::main(basic_scheduler)]
Expand All @@ -15,7 +15,7 @@ async fn main() {
let (requests_tx, requests_rx) = channel(10);
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(5).set_throttle(Duration::from_secs(1));
let mut eventloop = eventloop(mqttoptions, requests_rx).await.unwrap();
let mut eventloop = eventloop(mqttoptions, requests_rx);

thread::spawn(move || {
requests(requests_tx);
Expand All @@ -29,7 +29,7 @@ async fn main() {


async fn stream_it(eventloop: &mut MqttEventLoop) {
let mut stream = eventloop.stream(ReconnectOptions::Always(Duration::from_secs(5)));
let mut stream = eventloop.stream();

while let Some(item) = stream.next().await {
println!("{:?}", item);
Expand Down
Loading

0 comments on commit cdc6393

Please sign in to comment.