Skip to content

Commit

Permalink
???
Browse files Browse the repository at this point in the history
  • Loading branch information
turulix committed Oct 11, 2023
1 parent 5d7ac1e commit db7f534
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 10 deletions.
1 change: 1 addition & 0 deletions examples/basic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ uuid = {version = "1.4.1", features = ["v4"]}
anyhow = "1.0.75"
log = "0.4.20"
env_logger = "0.10.0"
chrono = {version = "0.4.31", features = ["serde"]}
37 changes: 34 additions & 3 deletions examples/basic/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::producer::{LogData, LogLevel};
use anyhow::Error;
use log::{error, info};
use log::{debug, error, info, warn};
use memphis_rust_community::consumer::MemphisConsumerOptions;
use memphis_rust_community::station::MemphisStation;
use std::string::FromUtf8Error;

pub async fn start_consumer(station: &MemphisStation) -> Result<(), Error> {
let consumer_options = MemphisConsumerOptions::new("log-consumer");
let consumer_options = MemphisConsumerOptions::new("log-consumer")
.with_consumer_group("log-consumer-group")
.with_max_ack_time_ms(1000);
let mut consumer = station.create_consumer(consumer_options).await?;

// We need to map the Err here, since async_nats uses a Box Error type...
Expand All @@ -13,7 +17,34 @@ pub async fn start_consumer(station: &MemphisStation) -> Result<(), Error> {
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
// Do something with the message here.
info!("Received: {:?}", msg);
debug!("Received: {:?}", msg);
let json_data = match msg.get_data_as_string() {
Ok(x) => x,
Err(e) => {
error!("Error while getting data as string: {:?}", e);
continue;
}
};
let log_message: LogData = match serde_json::from_str(&json_data) {
Ok(x) => x,
Err(e) => {
error!("Error while deserializing message: {:?}", e);
continue;
}
};

match log_message.level {
LogLevel::Info => {
info!("({}) {}", log_message.date, log_message.message)
}
LogLevel::Warning => {
warn!("({}) {}", log_message.date, log_message.message)
}
LogLevel::Error => {
error!("({}) {}", log_message.date, log_message.message)
}
}

if let Err(e) = msg.ack().await {
error!("Error while acking message: {:?}", e);
}
Expand Down
7 changes: 6 additions & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::consumer::start_consumer;
use crate::producer::start_producer;
use crate::settings::Settings;
use log::info;
use memphis_rust_community::memphis_client::MemphisClient;
use memphis_rust_community::station::MemphisStationsOptions;
use memphis_rust_community::station::RetentionType::{AckBased, Messages};

mod consumer;
mod producer;
Expand All @@ -21,10 +23,13 @@ async fn main() -> Result<(), anyhow::Error> {
)
.await?;

let station_options = MemphisStationsOptions::new("logs");
let station_options = MemphisStationsOptions::new("logs")
.with_retention_type(Messages)
.with_retention_value(100);
let station = memphis_client.create_station(station_options).await?;

start_consumer(&station).await?;
start_producer(&station).await?;

info!("Press CTRL-C to stop the application.");
tokio::signal::ctrl_c().await?;
Expand Down
23 changes: 18 additions & 5 deletions examples/basic/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Error;
use chrono::{DateTime, Utc};
use log::error;
use memphis_rust_community::producer::{ComposableMessage, MemphisProducerOptions, ProducerError};
use memphis_rust_community::station::MemphisStation;
Expand All @@ -16,19 +17,30 @@ pub enum LogLevel {
pub struct LogData {
pub level: LogLevel,
pub message: String,
pub date: DateTime<Utc>,
}

pub async fn start_producer(station: &MemphisStation) -> Result<(), Error> {
let producer_options = MemphisProducerOptions::new("amazing-service");
let producer_options =
MemphisProducerOptions::new("amazing-service").with_generate_unique_suffix(true);
let mut producer = station.create_producer(producer_options).await?;

tokio::spawn(async move {
let mut counter = 0;
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
// Some different types of messages.
let level = if counter % 5 == 0 {
LogLevel::Warning
} else if counter % 3 == 0 {
LogLevel::Error
} else {
LogLevel::Info
};
let message = LogData {
level: LogLevel::Info,
level,
message: format!("Something incredible happened we counted to: {}", counter),
date: Utc::now(),
};
let json_message = match serde_json::to_string(&message) {
Ok(x) => x,
Expand All @@ -37,12 +49,13 @@ pub async fn start_producer(station: &MemphisStation) -> Result<(), Error> {
continue;
}
};
let composable_message = ComposableMessage::new().with_payload(json_message);
let composable_message = ComposableMessage::new()
.with_payload(json_message)
.with_msg_id(counter.to_string());
if let Err(e) = producer.produce(composable_message).await {
error!("Error while producing message: {:?}", e);
}
counter += 1;
tokio::time::sleep(Duration::from_secs(5)).await;
}
});

Expand Down
2 changes: 1 addition & 1 deletion memphis/src/consumer/memphis_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use async_nats::jetstream::consumer::PullConsumer;
use async_nats::jetstream::consumer::{OrderedPullConsumer, PullConsumer, PushConsumer};

use async_nats::{Error, Message};
use futures_util::StreamExt;
Expand Down
1 change: 1 addition & 0 deletions memphis/src/producer/memphis_producer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_nats::client::FlushError;
use log::{error, info, trace};

use crate::constants::memphis_constants::{
Expand Down

0 comments on commit db7f534

Please sign in to comment.