From db7f5348585c3e5a92b5fc6cfbb31c48786c9ce4 Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 11 Oct 2023 13:20:16 +0200 Subject: [PATCH] ??? --- examples/basic/Cargo.toml | 1 + examples/basic/src/consumer.rs | 37 ++++++++++++++++++++++-- examples/basic/src/main.rs | 7 ++++- examples/basic/src/producer.rs | 23 +++++++++++---- memphis/src/consumer/memphis_consumer.rs | 2 +- memphis/src/producer/memphis_producer.rs | 1 + 6 files changed, 61 insertions(+), 10 deletions(-) diff --git a/examples/basic/Cargo.toml b/examples/basic/Cargo.toml index dcb83fc..c45eaf9 100644 --- a/examples/basic/Cargo.toml +++ b/examples/basic/Cargo.toml @@ -14,3 +14,4 @@ uuid = {version = "1.4.1", features = ["v4"]} anyhow = "1.0.75" log = "0.4.20" env_logger = "0.10.0" +chrono = {version = "0.4.31", features = ["serde"]} diff --git a/examples/basic/src/consumer.rs b/examples/basic/src/consumer.rs index 427a934..134ce18 100644 --- a/examples/basic/src/consumer.rs +++ b/examples/basic/src/consumer.rs @@ -1,10 +1,14 @@ +use crate::producer::{LogData, LogLevel}; use anyhow::Error; -use log::{error, info}; +use log::{debug, error, info, warn}; use memphis_rust_community::consumer::MemphisConsumerOptions; use memphis_rust_community::station::MemphisStation; +use std::string::FromUtf8Error; pub async fn start_consumer(station: &MemphisStation) -> Result<(), Error> { - let consumer_options = MemphisConsumerOptions::new("log-consumer"); + let consumer_options = MemphisConsumerOptions::new("log-consumer") + .with_consumer_group("log-consumer-group") + .with_max_ack_time_ms(1000); let mut consumer = station.create_consumer(consumer_options).await?; // We need to map the Err here, since async_nats uses a Box Error type... @@ -13,7 +17,34 @@ pub async fn start_consumer(station: &MemphisStation) -> Result<(), Error> { tokio::spawn(async move { while let Some(msg) = receiver.recv().await { // Do something with the message here. - info!("Received: {:?}", msg); + debug!("Received: {:?}", msg); + let json_data = match msg.get_data_as_string() { + Ok(x) => x, + Err(e) => { + error!("Error while getting data as string: {:?}", e); + continue; + } + }; + let log_message: LogData = match serde_json::from_str(&json_data) { + Ok(x) => x, + Err(e) => { + error!("Error while deserializing message: {:?}", e); + continue; + } + }; + + match log_message.level { + LogLevel::Info => { + info!("({}) {}", log_message.date, log_message.message) + } + LogLevel::Warning => { + warn!("({}) {}", log_message.date, log_message.message) + } + LogLevel::Error => { + error!("({}) {}", log_message.date, log_message.message) + } + } + if let Err(e) = msg.ack().await { error!("Error while acking message: {:?}", e); } diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 3cb0a63..2984f16 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -1,8 +1,10 @@ use crate::consumer::start_consumer; +use crate::producer::start_producer; use crate::settings::Settings; use log::info; use memphis_rust_community::memphis_client::MemphisClient; use memphis_rust_community::station::MemphisStationsOptions; +use memphis_rust_community::station::RetentionType::{AckBased, Messages}; mod consumer; mod producer; @@ -21,10 +23,13 @@ async fn main() -> Result<(), anyhow::Error> { ) .await?; - let station_options = MemphisStationsOptions::new("logs"); + let station_options = MemphisStationsOptions::new("logs") + .with_retention_type(Messages) + .with_retention_value(100); let station = memphis_client.create_station(station_options).await?; start_consumer(&station).await?; + start_producer(&station).await?; info!("Press CTRL-C to stop the application."); tokio::signal::ctrl_c().await?; diff --git a/examples/basic/src/producer.rs b/examples/basic/src/producer.rs index 97993d2..30b34d8 100644 --- a/examples/basic/src/producer.rs +++ b/examples/basic/src/producer.rs @@ -1,4 +1,5 @@ use anyhow::Error; +use chrono::{DateTime, Utc}; use log::error; use memphis_rust_community::producer::{ComposableMessage, MemphisProducerOptions, ProducerError}; use memphis_rust_community::station::MemphisStation; @@ -16,19 +17,30 @@ pub enum LogLevel { pub struct LogData { pub level: LogLevel, pub message: String, + pub date: DateTime, } pub async fn start_producer(station: &MemphisStation) -> Result<(), Error> { - let producer_options = MemphisProducerOptions::new("amazing-service"); + let producer_options = + MemphisProducerOptions::new("amazing-service").with_generate_unique_suffix(true); let mut producer = station.create_producer(producer_options).await?; tokio::spawn(async move { let mut counter = 0; loop { - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(1)).await; + // Some different types of messages. + let level = if counter % 5 == 0 { + LogLevel::Warning + } else if counter % 3 == 0 { + LogLevel::Error + } else { + LogLevel::Info + }; let message = LogData { - level: LogLevel::Info, + level, message: format!("Something incredible happened we counted to: {}", counter), + date: Utc::now(), }; let json_message = match serde_json::to_string(&message) { Ok(x) => x, @@ -37,12 +49,13 @@ pub async fn start_producer(station: &MemphisStation) -> Result<(), Error> { continue; } }; - let composable_message = ComposableMessage::new().with_payload(json_message); + let composable_message = ComposableMessage::new() + .with_payload(json_message) + .with_msg_id(counter.to_string()); if let Err(e) = producer.produce(composable_message).await { error!("Error while producing message: {:?}", e); } counter += 1; - tokio::time::sleep(Duration::from_secs(5)).await; } }); diff --git a/memphis/src/consumer/memphis_consumer.rs b/memphis/src/consumer/memphis_consumer.rs index f23c1f1..d36b743 100644 --- a/memphis/src/consumer/memphis_consumer.rs +++ b/memphis/src/consumer/memphis_consumer.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use async_nats::jetstream::consumer::PullConsumer; +use async_nats::jetstream::consumer::{OrderedPullConsumer, PullConsumer, PushConsumer}; use async_nats::{Error, Message}; use futures_util::StreamExt; diff --git a/memphis/src/producer/memphis_producer.rs b/memphis/src/producer/memphis_producer.rs index 6699172..cc55d32 100644 --- a/memphis/src/producer/memphis_producer.rs +++ b/memphis/src/producer/memphis_producer.rs @@ -1,3 +1,4 @@ +use async_nats::client::FlushError; use log::{error, info, trace}; use crate::constants::memphis_constants::{