diff --git a/.github/workflows/CD.yml b/.github/workflows/CD.yml index ab24157..5c32e60 100644 --- a/.github/workflows/CD.yml +++ b/.github/workflows/CD.yml @@ -31,9 +31,9 @@ jobs: - name: Build run: | - cargo build --release + cargo build --release --all-features - name: Publish run: | cargo login ${{ secrets.RUST_API_KEY }} - cargo publish --token ${{ secrets.RUST_API_KEY }} + cargo publish --token ${{ secrets.RUST_API_KEY }} --all-features diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 5cf0583..edd8282 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -17,14 +17,10 @@ concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} cancel-in-progress: true -env: - RUSTFLAGS: "-D warnings" - defaults: run: shell: bash - jobs: test: runs-on: ubuntu-latest @@ -47,7 +43,7 @@ jobs: - name: Setup Memphis run: | - curl -sSL https://raw.githubusercontent.com/memphisdev/memphis-docker/master/docker-compose.yml > docker-compose.yml + curl -sSL https://raw.githubusercontent.com/memphisdev/memphis-docker/master/docker-compose-latest.yml > docker-compose.yml docker-compose up -d - name: Install Rust @@ -57,7 +53,7 @@ jobs: - name: Run Tests run: | - cargo test --all + cargo test --workspace --all-targets --all-features check_format: name: check (format) @@ -125,4 +121,4 @@ jobs: rustup component add clippy - name: Check lint - run: cargo doc + run: cargo doc --all-features diff --git a/Cargo.toml b/Cargo.toml index 840d5f5..2a77dca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,4 @@ [workspace] -resolver = "2" members = [ "memphis" ] diff --git a/docker-compose.yml b/docker-compose.yml index 25bc650..5435480 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,7 @@ services: - POSTGRESQL_DATABASE=memphis - POSTGRESQL_PORT_NUMBER=5005 memphis: - image: "memphisos/memphis:stable" + image: "memphisos/memphis:latest" depends_on: memphis-metadata: condition: service_healthy diff --git a/memphis/Cargo.toml b/memphis/Cargo.toml index 5d71470..6baf05e 100644 --- a/memphis/Cargo.toml +++ b/memphis/Cargo.toml @@ -49,3 +49,6 @@ async-trait = "0.1.17" hex = { version = "0.4.3", features = ["serde"] } jsonschema = { version = "0.17", optional = true } + +[dev-dependencies] +tokio-test = "0.4.2" diff --git a/memphis/src/constants/memphis_constants.rs b/memphis/src/constants/memphis_constants.rs index 2f5bc17..4349d41 100644 --- a/memphis/src/constants/memphis_constants.rs +++ b/memphis/src/constants/memphis_constants.rs @@ -10,7 +10,9 @@ pub(crate) enum MemphisSpecialStation { ConsumerDestructions, StationDestructions, + #[allow(dead_code)] SchemaAttachments, + #[allow(dead_code)] SchemaDetachments, Notifications, @@ -67,6 +69,7 @@ impl IntoHeaderName for MemphisHeaders { pub(crate) enum MemphisSubscriptions { DlsPrefix, + #[allow(dead_code)] SchemaUpdatesPrefix, } diff --git a/memphis/src/consumer/event.rs b/memphis/src/consumer/event.rs index 29d37f6..449e8fa 100644 --- a/memphis/src/consumer/event.rs +++ b/memphis/src/consumer/event.rs @@ -4,7 +4,7 @@ use async_nats::Error; use crate::consumer::MemphisMessage; -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum MemphisEvent { MessageReceived(MemphisMessage), StationUnavailable(Arc), diff --git a/memphis/src/consumer/incoming_message.rs b/memphis/src/consumer/incoming_message.rs index 9e4df74..f3b30c2 100644 --- a/memphis/src/consumer/incoming_message.rs +++ b/memphis/src/consumer/incoming_message.rs @@ -76,15 +76,17 @@ impl MemphisMessage { Ok(_) => Ok(()), Err(_) => Err(()), }; - } - if let Some(_cg_name) = headers.get("$memphis_pm_cg_name") { + } else if let Some(_cg_name) = headers.get("$memphis_pm_cg_name") { return match self.msg.ack_with(AckKind::Nak(Some(delay))).await { Ok(_) => Ok(()), Err(_) => Err(()), }; } } - Err(()) + match self.msg.ack_with(AckKind::Nak(Some(delay))).await { + Ok(_) => Ok(()), + Err(_) => Err(()), + } } } diff --git a/memphis/src/consumer/memphis_consumer.rs b/memphis/src/consumer/memphis_consumer.rs index bb67b30..9c954d0 100644 --- a/memphis/src/consumer/memphis_consumer.rs +++ b/memphis/src/consumer/memphis_consumer.rs @@ -4,8 +4,8 @@ use std::time::Duration; use async_nats::jetstream::consumer::PullConsumer; use async_nats::{Error, Message}; use futures_util::StreamExt; -use log::{debug, error, info, trace}; -use tokio::sync::broadcast::{channel, Receiver, Sender}; +use log::{debug, error, info, trace, warn}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_util::sync::CancellationToken; use crate::constants::memphis_constants::{MemphisSpecialStation, MemphisSubscriptions}; @@ -24,12 +24,7 @@ pub struct MemphisConsumer { memphis_client: MemphisClient, options: MemphisConsumerOptions, cancellation_token: CancellationToken, - - /// The receiver for the MemphisEvents. - /// This is used to communicate with the MemphisConsumer. - /// The MemphisConsumer will send events to this receiver. - pub message_receiver: Receiver, - message_sender: Sender, + message_sender: Option>, } impl MemphisConsumer { @@ -70,14 +65,11 @@ impl MemphisConsumer { info!("Consumer '{}' created successfully", &options.consumer_name); - let (tx, rx) = channel(100); - let consumer = Self { memphis_client, options, cancellation_token: CancellationToken::new(), - message_sender: tx, - message_receiver: rx, + message_sender: None, }; consumer.ping_consumer(); @@ -97,29 +89,39 @@ impl MemphisConsumer { /// ```rust /// use memphis_rust_community::memphis_client::MemphisClient; /// use memphis_rust_community::consumer::MemphisConsumerOptions; + /// use memphis_rust_community::station::MemphisStationsOptions; /// /// #[tokio::main] /// async fn main() { + /// /// let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap(); - /// let consumer_options = MemphisConsumerOptions::new("my-station", "my-consumer") + /// + /// let station_options = MemphisStationsOptions::new("test_station"); + /// let station = client.create_station(station_options).await.unwrap(); + /// + /// let consumer_options = MemphisConsumerOptions::new("test_consumer") /// .with_generate_unique_suffix(true); - /// let mut consumer = client.create_consumer(consumer_options).await.unwrap(); + /// let mut consumer = station.create_consumer(consumer_options).await.unwrap(); + /// + /// let mut message_receiver = consumer.consume().await.unwrap(); + /// + /// tokio::spawn(async move { + /// loop{ + /// let msg = message_receiver.recv().await; + /// // Do something with the message + /// break; + /// } + /// }); /// - /// consumer.consume().await.unwrap(); - /// tokio::spawn(async move { - /// loop{ - /// let msg = consumer.message_receiver.recv().await; - /// // Do something with the message - /// break; - /// } - /// }); /// } /// ``` - pub async fn consume(&self) -> Result<(), Error> { + pub async fn consume(&mut self) -> Result, Error> { let cloned_token = self.cancellation_token.clone(); let cloned_client = self.memphis_client.clone(); let cloned_options = self.options.clone(); - let cloned_sender = self.message_sender.clone(); + + let (sender, receiver) = unbounded_channel::(); + self.message_sender = Some(sender.clone()); // Memphis will create a stream with the name of the station. // On this Stream it will create a consumer with the name of the consumer Group. @@ -156,7 +158,10 @@ impl MemphisConsumer { trace!( "Message received from Memphis. (Subject: {}, Sequence: {})", msg.subject, - msg.info().expect("NONE").stream_sequence + match msg.info() { + Ok(info) => info.stream_sequence, + Err(_e) => 0, + } ); let memphis_message = MemphisMessage::new( msg, @@ -164,7 +169,10 @@ impl MemphisConsumer { cloned_options.consumer_group.clone(), cloned_options.max_ack_time_ms, ); - let _res = cloned_sender.send(MemphisEvent::MessageReceived(memphis_message)); + let res = sender.send(MemphisEvent::MessageReceived(memphis_message)); + if res.is_err() { + error!("Error while sending message to the receiver. {:?}", res.err()); + } } } }); @@ -173,7 +181,7 @@ impl MemphisConsumer { "Successfully started consuming messages from Memphis with consumer '{}' on group: '{}'", self.options.consumer_name, self.options.consumer_group ); - Ok(()) + Ok(receiver) } /// # Starts consuming DLS messages from Memphis. @@ -188,9 +196,9 @@ impl MemphisConsumer { /// /// # Returns /// A [Receiver] that will receive the DLS messages. - pub async fn consume_dls(&self) -> Result>, Error> { + pub async fn consume_dls(&self) -> Result>, Error> { //TODO: Remove Arc once async_nats is updated to >=0.30.0 (https://github.com/nats-io/nats.rs/pull/975) - let (s, r) = channel::>(100); + let (s, r) = unbounded_channel::>(); let subject = format!( "{}{}_{}", MemphisSubscriptions::DlsPrefix.to_string(), @@ -258,9 +266,17 @@ impl MemphisConsumer { let cloned_sender = self.message_sender.clone(); tokio::spawn(async move { - fn send_message(sender: &Sender, event: MemphisEvent) { - let _res = sender.send(event); + fn send_message(sender: &Option>, event: MemphisEvent, consumer_name: &str) { + match sender { + None => { + warn!("Consumer {} tried to send event, without the Sender being initialised", consumer_name); + } + Some(s) => { + let _res = s.send(event); + } + } } + let name = get_effective_consumer_name(&cloned_options); while !cloned_token.is_cancelled() { let stream = match cloned_client .get_jetstream_context() @@ -269,7 +285,7 @@ impl MemphisConsumer { { Ok(s) => s, Err(e) => { - send_message(&cloned_sender, MemphisEvent::StationUnavailable(Arc::new(e))); + send_message(&cloned_sender, MemphisEvent::StationUnavailable(Arc::new(e)), name.as_str()); error!("Station {} is unavailable. (Ping)", &cloned_options.station_name.clone()); tokio::time::sleep(Duration::from_secs(30)).await; continue; @@ -279,7 +295,7 @@ impl MemphisConsumer { match stream.consumer_info(get_effective_consumer_name(&cloned_options)).await { Ok(_) => {} Err(e) => { - send_message(&cloned_sender, MemphisEvent::ConsumerUnavailable(Arc::new(e))); + send_message(&cloned_sender, MemphisEvent::ConsumerUnavailable(Arc::new(e)), name.as_str()); error!( "Consumer '{}' on group '{}' is unavailable. (Ping)", &cloned_options.consumer_name, &cloned_options.consumer_group diff --git a/memphis/src/consumer/memphis_consumer_options.rs b/memphis/src/consumer/memphis_consumer_options.rs index e3aef69..d7a44cd 100644 --- a/memphis/src/consumer/memphis_consumer_options.rs +++ b/memphis/src/consumer/memphis_consumer_options.rs @@ -6,7 +6,7 @@ /// /// #[tokio::main] /// async fn main() { -/// let options = MemphisConsumerOptions::new("station_name", "consumer_name") +/// let options = MemphisConsumerOptions::new("station_name") /// .with_consumer_group("consumer_group") /// .with_generate_unique_suffix(true) /// .with_pull_interval_ms(1000) @@ -14,7 +14,7 @@ /// } #[derive(Debug, Clone)] pub struct MemphisConsumerOptions { - pub station_name: String, + pub(crate) station_name: String, pub consumer_name: String, pub consumer_group: String, pub pull_interval_ms: i32, @@ -46,19 +46,13 @@ impl Default for MemphisConsumerOptions { } impl MemphisConsumerOptions { - pub fn new(station_name: &str, consumer_name: &str) -> Self { + pub fn new(consumer_name: &str) -> Self { MemphisConsumerOptions { - station_name: station_name.to_string(), consumer_name: consumer_name.to_string(), ..Default::default() } } - pub fn with_station_name(mut self, station_name: String) -> Self { - self.station_name = station_name; - self - } - pub fn with_consumer_name(mut self, consumer_name: String) -> Self { self.consumer_name = consumer_name; self diff --git a/memphis/src/consumer/mod.rs b/memphis/src/consumer/mod.rs index 07d6cb3..a939d66 100644 --- a/memphis/src/consumer/mod.rs +++ b/memphis/src/consumer/mod.rs @@ -12,6 +12,7 @@ mod incoming_message; mod memphis_consumer; mod memphis_consumer_options; +/// Get the effective consumer name. If the consumer group is empty, the consumer name is used. Otherwise, the consumer group is used. fn get_effective_consumer_name(options: &MemphisConsumerOptions) -> String { if options.consumer_group.is_empty() { get_internal_name(&options.consumer_name) diff --git a/memphis/src/lib.rs b/memphis/src/lib.rs index 730b07c..5cef5d5 100644 --- a/memphis/src/lib.rs +++ b/memphis/src/lib.rs @@ -5,18 +5,23 @@ //! ```rust //! use memphis_rust_community::memphis_client::MemphisClient; //! use memphis_rust_community::consumer::MemphisConsumerOptions; +//! use memphis_rust_community::station::MemphisStationsOptions; //! //! #[tokio::main] //! async fn main() { //! let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap(); -//! let consumer_options = MemphisConsumerOptions::new("my-station", "my-consumer") +//! +//! let station_options = MemphisStationsOptions::new("my-station"); +//! let station = client.create_station(station_options).await.unwrap(); +//! +//! let consumer_options = MemphisConsumerOptions::new("my-consumer") //! .with_generate_unique_suffix(true); -//! let mut consumer = client.create_consumer(consumer_options).await.unwrap(); +//! let mut consumer = station.create_consumer(consumer_options).await.unwrap(); //! -//! consumer.consume().await.unwrap(); +//! let mut message_receiver = consumer.consume().await.unwrap(); //! tokio::spawn(async move { //! loop{ -//! let msg = consumer.message_receiver.recv().await; +//! let msg = message_receiver.recv().await; //! // Do something with the message //! break; //! } diff --git a/memphis/src/memphis_client.rs b/memphis/src/memphis_client.rs index 0c4f7d5..323b750 100644 --- a/memphis/src/memphis_client.rs +++ b/memphis/src/memphis_client.rs @@ -20,24 +20,10 @@ use crate::station_settings::StationSettingsStore; /// /// ```rust /// use memphis_rust_community::memphis_client::MemphisClient; -/// use memphis_rust_community::consumer::MemphisConsumerOptions; /// /// #[tokio::main] /// async fn main() { /// let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap(); -/// let consumer_options = MemphisConsumerOptions::new("my-station", "my-consumer") -/// .with_generate_unique_suffix(true); -/// let mut consumer = client.create_consumer(consumer_options).await.unwrap(); -/// -/// // Start consuming messages -/// consumer.consume().await.unwrap(); -/// tokio::spawn(async move { -/// loop{ -/// let msg = consumer.message_receiver.recv().await; -/// // Do something with the message -/// break; -/// } -/// }); /// } /// ``` #[derive(Clone)] @@ -70,16 +56,16 @@ impl MemphisClient { /// } pub async fn new(memphis_host: &str, memphis_username: &str, memphis_password: &str) -> Result { let uuid = Uuid::new_v4(); - let name = format!("{}::{}", &uuid, memphis_username); + let connection_name = format!("{}::{}", &uuid, memphis_username); // TODO: Replace 1 with account_id - let broker_settings = MemphisClient::create_settings(format!("{}${}", memphis_username, 1).as_str(), memphis_password, name.clone()); + let broker_settings = MemphisClient::create_settings(format!("{}${}", memphis_username, 1).as_str(), memphis_password, connection_name.clone()); let connection = match async_nats::connect_with_options(memphis_host, broker_settings).await { Ok(c) => c, Err(e) => { if e.to_string().contains("authorization violation") { - let broker_settings = MemphisClient::create_settings(memphis_username, memphis_password, name.clone()); + let broker_settings = MemphisClient::create_settings(memphis_username, memphis_password, connection_name); let connection = async_nats::connect_with_options(memphis_host, broker_settings).await; match connection { Ok(c) => c, @@ -96,7 +82,7 @@ impl MemphisClient { Ok(MemphisClient { jetstream_context: Arc::new(jetstream::new(connection.clone())), broker_connection: Arc::new(connection), - username: Arc::new(name), + username: Arc::new(memphis_username.to_string()), connection_id: Arc::new(uuid.to_string()), station_settings: Arc::new(StationSettingsStore::new()), }) @@ -167,48 +153,3 @@ impl MemphisClient { .name(name) } } - -#[cfg(feature = "consumers")] -mod consumers { - use crate::consumer::{ConsumerError, MemphisConsumer, MemphisConsumerOptions}; - use crate::memphis_client::MemphisClient; - - impl MemphisClient { - /// Creates a consumer for the given station and returns a MemphisConsumer - /// You need to call **consume()** on the MemphisConsumer to start consuming messages. - /// # Arguments - /// * `consumer_options` - [MemphisConsumerOptions](MemphisConsumerOptions) - /// - /// # Example - /// ```rust - /// use memphis_rust_community::memphis_client::MemphisClient; - /// use memphis_rust_community::consumer::MemphisConsumerOptions; - /// - /// #[tokio::main] - /// async fn main() { - /// let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap(); - /// let consumer_options = MemphisConsumerOptions::new("my-station", "my-consumer") - /// .with_generate_unique_suffix(true); - /// - /// let mut consumer = client.create_consumer(consumer_options).await.unwrap(); - /// // Start consuming messages - /// consumer.consume().await.unwrap(); - /// } - pub async fn create_consumer(&self, consumer_options: MemphisConsumerOptions) -> Result { - MemphisConsumer::new(self.clone(), consumer_options).await - } - } -} - -#[cfg(feature = "producers")] -mod producers { - use crate::memphis_client::MemphisClient; - use crate::producer::{MemphisProducer, MemphisProducerOptions}; - use crate::RequestError; - - impl MemphisClient { - pub async fn create_producer(&self, producer_options: MemphisProducerOptions) -> Result { - MemphisProducer::new(self.clone(), producer_options).await - } - } -} diff --git a/memphis/src/producer/composable_message.rs b/memphis/src/producer/composable_message.rs index 87ea704..73dba8f 100644 --- a/memphis/src/producer/composable_message.rs +++ b/memphis/src/producer/composable_message.rs @@ -1,7 +1,7 @@ use async_nats::header::{IntoHeaderName, IntoHeaderValue}; use async_nats::HeaderMap; use bytes::Bytes; -use serde::{Serialize, Serializer}; +use serde::Serialize; #[derive(Debug, Default, Serialize)] pub struct ComposableMessage { diff --git a/memphis/src/producer/memphis_producer.rs b/memphis/src/producer/memphis_producer.rs index 2eaa511..c7cb16a 100644 --- a/memphis/src/producer/memphis_producer.rs +++ b/memphis/src/producer/memphis_producer.rs @@ -54,7 +54,7 @@ impl MemphisProducer { } #[cfg(feature = "schemaverse")] - self.validate_message(&message).await.map_err(|e| ProducerError::SchemaValidationError(e))?; + self.validate_message(&message).await.map_err(ProducerError::SchemaValidationError)?; if let Err(e) = self .memphis_client diff --git a/memphis/src/producer/memphis_producer_options.rs b/memphis/src/producer/memphis_producer_options.rs index 7db7909..729a844 100644 --- a/memphis/src/producer/memphis_producer_options.rs +++ b/memphis/src/producer/memphis_producer_options.rs @@ -1,5 +1,5 @@ pub struct MemphisProducerOptions { - pub station_name: String, + pub(crate) station_name: String, pub producer_name: String, pub generate_unique_suffix: bool, } @@ -15,10 +15,9 @@ impl Default for MemphisProducerOptions { } impl MemphisProducerOptions { - pub fn new(station_name: &str, consumer_name: &str) -> Self { + pub fn new(producer_name: &str) -> Self { MemphisProducerOptions { - station_name: station_name.to_string(), - producer_name: consumer_name.to_string(), + producer_name: producer_name.to_string(), ..Default::default() } } diff --git a/memphis/src/schemaverse/schema/json.rs b/memphis/src/schemaverse/schema/json.rs index a58308f..4ab645b 100644 --- a/memphis/src/schemaverse/schema/json.rs +++ b/memphis/src/schemaverse/schema/json.rs @@ -1,6 +1,5 @@ use bytes::Bytes; use std::borrow::Cow; -use std::fmt::Display; use jsonschema::{Draft, JSONSchema, ValidationError}; use thiserror::Error; @@ -24,7 +23,7 @@ impl JsonSchemaValidator { impl SchemaValidator for JsonSchemaValidator { fn validate(&self, message: &Bytes) -> Result<(), SchemaValidationError> { - let deserialized = serde_json::from_slice(message).map_err(|e| JsonSchemaError::from(e))?; + let deserialized = serde_json::from_slice(message).map_err(JsonSchemaError::from)?; if let Err(mut e) = self.schema.validate(&deserialized) { let Some(error) = e.next() else { @@ -38,7 +37,7 @@ impl SchemaValidator for JsonSchemaValidator { } fn from_bytes(bytes: &Bytes) -> Result { - let deserialized = serde_json::from_slice(bytes).map_err(|e| JsonSchemaError::from(e))?; + let deserialized = serde_json::from_slice(bytes).map_err(JsonSchemaError::from)?; Ok(Self::new(deserialized)?) } @@ -49,7 +48,7 @@ fn validation_error_to_owned(e: ValidationError) -> ValidationError<'static> { instance: Cow::Owned(e.instance.into_owned()), kind: e.kind, instance_path: e.instance_path.to_owned(), - schema_path: e.schema_path.to_owned(), + schema_path: e.schema_path, } } diff --git a/memphis/src/schemaverse/schema/mod.rs b/memphis/src/schemaverse/schema/mod.rs index 0b3776d..073d7e9 100644 --- a/memphis/src/schemaverse/schema/mod.rs +++ b/memphis/src/schemaverse/schema/mod.rs @@ -18,7 +18,7 @@ pub trait SchemaValidator: Send + Sync { where Self: Sized, { - let bytes = tokio::fs::read(path).await.map_err(|e| SchemaValidationError::ReadFileError(e))?; + let bytes = tokio::fs::read(path).await.map_err(SchemaValidationError::ReadFileError)?; Self::from_bytes(&bytes.into()) } } diff --git a/memphis/src/schemaverse/schema_type.rs b/memphis/src/schemaverse/schema_type.rs index 7130dad..9260723 100644 --- a/memphis/src/schemaverse/schema_type.rs +++ b/memphis/src/schemaverse/schema_type.rs @@ -17,7 +17,6 @@ impl ToString for SchemaType { SchemaType::GraphQL => "graphql".to_string(), #[cfg(feature = "validator_protobuf")] SchemaType::Protobuf => "protobuf".to_string(), - _ => panic!("unknown SchemaType"), } } } diff --git a/memphis/src/station/memphis_station.rs b/memphis/src/station/memphis_station.rs index c93cf8b..6d89e1a 100644 --- a/memphis/src/station/memphis_station.rs +++ b/memphis/src/station/memphis_station.rs @@ -56,4 +56,58 @@ impl MemphisStation { Ok(()) } + + pub fn get_name(&self) -> &str { + &self.options.station_name + } +} + +#[cfg(feature = "consumers")] +mod consumers { + use crate::consumer::{ConsumerError, MemphisConsumer, MemphisConsumerOptions}; + use crate::station::MemphisStation; + + impl MemphisStation { + /// Creates a consumer for the given station and returns a MemphisConsumer + /// You need to call **consume()** on the MemphisConsumer to start consuming messages. + /// # Arguments + /// * `consumer_options` - [MemphisConsumerOptions](MemphisConsumerOptions) + /// + /// # Example + /// ```rust + /// use memphis_rust_community::memphis_client::MemphisClient; + /// use memphis_rust_community::consumer::MemphisConsumerOptions; + /// + /// #[tokio::main] + /// async fn main() { + /// use memphis_rust_community::station::MemphisStationsOptions; + /// let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap(); + /// + /// let station_options = MemphisStationsOptions::new("my-station"); + /// let station = client.create_station(station_options).await.unwrap(); + /// + /// let consumer_options = MemphisConsumerOptions::new("my-consumer").with_generate_unique_suffix(true); + /// let mut consumer = station.create_consumer(consumer_options).await.unwrap(); + /// + /// let msg_receiver = consumer.consume().await.unwrap(); + /// } + pub async fn create_consumer(&self, mut consumer_options: MemphisConsumerOptions) -> Result { + consumer_options.station_name = self.options.station_name.clone(); + MemphisConsumer::new(self.memphis_client.clone(), consumer_options).await + } + } +} + +#[cfg(feature = "producers")] +mod producer { + use crate::producer::{MemphisProducer, MemphisProducerOptions}; + use crate::station::MemphisStation; + use crate::RequestError; + + impl MemphisStation { + pub async fn create_producer(&self, mut producer_options: MemphisProducerOptions) -> Result { + producer_options.station_name = self.options.station_name.clone(); + MemphisProducer::new(self.memphis_client.clone(), producer_options).await + } + } } diff --git a/memphis/src/station/memphis_station_options.rs b/memphis/src/station/memphis_station_options.rs index 2777a48..3737395 100644 --- a/memphis/src/station/memphis_station_options.rs +++ b/memphis/src/station/memphis_station_options.rs @@ -12,23 +12,25 @@ pub struct MemphisStationsOptions { pub tiered_storage_enabled: bool, } -#[derive(Debug)] +#[derive(Debug, Default)] pub enum RetentionType { + #[default] MessageAgeSec, Messages, Bytes, } -#[derive(Debug)] +#[derive(Debug, Default)] pub enum StorageType { - Disk, + #[default] + File, Memory, } impl MemphisStationsOptions { - pub fn new(station_name: String) -> Self { + pub fn new(station_name: &str) -> Self { MemphisStationsOptions { - station_name, + station_name: station_name.to_string(), ..Default::default() } } @@ -93,18 +95,6 @@ impl Default for MemphisStationsOptions { } } -impl Default for RetentionType { - fn default() -> Self { - RetentionType::MessageAgeSec - } -} - -impl Default for StorageType { - fn default() -> Self { - StorageType::Disk - } -} - impl ToString for RetentionType { fn to_string(&self) -> String { match self { @@ -119,7 +109,7 @@ impl ToString for RetentionType { impl ToString for StorageType { fn to_string(&self) -> String { match self { - StorageType::Disk => "disk", + StorageType::File => "file", StorageType::Memory => "memory", } .to_string() diff --git a/memphis/src/station_settings.rs b/memphis/src/station_settings.rs index afc0ce6..30d04db 100644 --- a/memphis/src/station_settings.rs +++ b/memphis/src/station_settings.rs @@ -23,10 +23,12 @@ impl StationSettingsStore { Default::default() } + #[allow(dead_code)] pub async fn get_settings(&self, station_name: &str) -> Option { self.settings.read().await.get(station_name).cloned() } + #[allow(dead_code)] pub async fn set_settings(&self, station_name: impl ToString, settings: StationSettings) { self.settings.write().await.insert(station_name.to_string(), Arc::new(settings)); } diff --git a/memphis/tests/common.rs b/memphis/tests/common.rs new file mode 100644 index 0000000..aa3d8b8 --- /dev/null +++ b/memphis/tests/common.rs @@ -0,0 +1,65 @@ +use memphis_rust_community::consumer::{MemphisConsumer, MemphisConsumerOptions}; +use memphis_rust_community::memphis_client::MemphisClient; +use memphis_rust_community::producer::{MemphisProducer, MemphisProducerOptions}; +use memphis_rust_community::station::{MemphisStation, MemphisStationsOptions, StorageType}; + +#[allow(dead_code)] +pub async fn connect_to_memphis() -> MemphisClient { + let client = MemphisClient::new("localhost:6666", "root", "memphis").await; + + assert!(client.is_ok(), "Connecting to Memphis should be possible."); + + client.unwrap() +} + +#[allow(dead_code)] +pub async fn create_random_station(client: &MemphisClient) -> MemphisStation { + let random_station_name = uuid::Uuid::new_v4().to_string(); + eprintln!("random_station_name: {}", random_station_name); + + let station_options = MemphisStationsOptions::new(&random_station_name).with_storage_type(StorageType::Memory); + + let station = client.create_station(station_options).await; + + assert!(station.is_ok(), "Creating Station should be possible."); + + station.unwrap() +} + +#[allow(dead_code)] +pub async fn create_random_consumer(station: &MemphisStation) -> MemphisConsumer { + let random_consumer_name = uuid::Uuid::new_v4().to_string(); + + let consumer_options = MemphisConsumerOptions::new(&random_consumer_name) + .with_max_ack_time_ms(5000) + .with_max_msg_deliveries(2); + + let consumer = station.create_consumer(consumer_options).await; + + assert!(consumer.is_ok(), "Creating Consumer should be possible."); + + consumer.unwrap() +} + +#[allow(dead_code)] +pub async fn create_random_producer(station: &MemphisStation) -> MemphisProducer { + let random_producer_name = uuid::Uuid::new_v4().to_string(); + + let producer_options = MemphisProducerOptions::new(&random_producer_name); + + let producer = station.create_producer(producer_options).await; + + assert!(producer.is_ok(), "Creating Producer should be possible."); + + producer.unwrap() +} + +#[allow(dead_code)] +pub async fn create_random_setup() -> (MemphisClient, MemphisStation, MemphisConsumer, MemphisProducer) { + let client = connect_to_memphis().await; + let station = create_random_station(&client).await; + let consumer = create_random_consumer(&station).await; + let producer = create_random_producer(&station).await; + + (client, station, consumer, producer) +} diff --git a/memphis/tests/message_tests.rs b/memphis/tests/message_tests.rs new file mode 100644 index 0000000..234bc21 --- /dev/null +++ b/memphis/tests/message_tests.rs @@ -0,0 +1,171 @@ +use memphis_rust_community::consumer::MemphisEvent; +use memphis_rust_community::producer::ComposableMessage; + +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::Mutex; +use tokio_test::assert_ok; + +mod common; + +use common::*; + +#[tokio::test] +async fn send_receive_message() { + let client = connect_to_memphis().await; + let station = create_random_station(&client).await; + + let mut consumer = create_random_consumer(&station).await; + let mut receiver = consumer.consume().await.unwrap(); + + let producer = create_random_producer(&station).await; + let payload = "Hello World!"; + + let res = producer + .produce(ComposableMessage::new().with_payload(payload).with_header("TestHeader", "TestValue")) + .await; + assert_ok!(res, "Sending a Message should be possible."); + + let msg = receiver.recv().await.unwrap(); + match msg { + MemphisEvent::MessageReceived(m) => { + assert_eq!(m.get_data_as_string().unwrap().as_str(), payload); + assert_eq!(m.get_headers().clone().unwrap().get("TestHeader").unwrap().as_str(), "TestValue"); + m.ack().await.unwrap(); + } + _ => panic!("Received Event should be a MessageReceived Event."), + } +} + +#[tokio::test] +async fn message_resend_test() { + let client = connect_to_memphis().await; + let station = create_random_station(&client).await; + + let mut consumer = create_random_consumer(&station).await; + let mut receiver = consumer.consume().await.unwrap(); + let mut dls_receiver = consumer.consume_dls().await.unwrap(); + + let dls_received = Arc::new(Mutex::new(false)); + let dls_received_clone = dls_received.clone(); + + let handle = tokio::spawn(async move { + dls_receiver.recv().await.unwrap(); + eprintln!("Received Message from DLS"); + *dls_received_clone.lock().await = true; + }); + + let producer = create_random_producer(&station).await; + let payload = "This should be send twice!"; + + let res = producer.produce(ComposableMessage::new().with_payload(payload)).await; + assert_ok!(res, "Sending a Message should be possible."); + + let msg = receiver.recv().await.unwrap(); + match msg { + MemphisEvent::MessageReceived(m) => { + assert_eq!(m.get_data_as_string().unwrap().as_str(), payload); + } + _ => panic!("Received Event should be a MessageReceived Event."), + } + let msg = receiver.recv().await.unwrap(); + match msg { + MemphisEvent::MessageReceived(m) => { + assert_eq!(m.get_data_as_string().unwrap().as_str(), payload); + m.ack().await.unwrap(); + } + _ => panic!("Received Event should be a MessageReceived Event."), + } + + tokio::time::sleep(Duration::from_secs(7)).await; + assert!(!*dls_received.lock().await, "The Message should not be received by the DLS"); + handle.abort(); +} + +#[tokio::test] +async fn message_delay_test() { + let (_, _station, mut consumer, producer) = create_random_setup().await; + + let payload = "This should be delayed!"; + + producer.produce(ComposableMessage::new().with_payload(payload)).await.unwrap(); + + let mut receiver = consumer.consume().await.unwrap(); + let msg = receiver.recv().await.unwrap(); + match msg { + MemphisEvent::MessageReceived(m) => { + let res = m.delay(Duration::from_secs(15)).await; + assert_ok!(res, "Delaying a Message should be possible."); + } + _ => panic!("Received Event should be a MessageReceived Event."), + } + tokio::time::sleep(Duration::from_secs(7)).await; + let msg = receiver.try_recv(); + match msg { + Ok(_) => { + panic!("Received Event should be an Error Event. Got a Message"); + } + Err(e) => { + assert_eq!(e, TryRecvError::Empty); + } + } + tokio::time::sleep(Duration::from_secs(10)).await; + let msg = receiver.try_recv(); + match msg { + Ok(m) => match m { + MemphisEvent::MessageReceived(m) => { + assert_eq!(m.get_data_as_string().unwrap().as_str(), payload); + m.ack().await.unwrap(); + } + _ => panic!("Received Event should be a MessageReceived Event."), + }, + Err(e) => { + panic!("Received Event should be an Error Event. Got an Error: {:?}", e) + } + } +} + +#[tokio::test] +async fn max_messages_test() { + let (_, _, mut consumer, producer) = create_random_setup().await; + let mut receiver = consumer.consume().await.unwrap(); + + let now = std::time::Instant::now(); + + let message_count = 100_000; + + for i in 0..message_count { + let res = producer + .produce( + ComposableMessage::new() + .with_payload(format!("Message {}", i)) + .with_header("id", format!("{}", i).as_str()), + ) + .await; + assert_ok!(res, "Sending a Message should be possible."); + } + eprintln!("Sending {} Messages took: {:?}", message_count, now.elapsed()); + + let now = std::time::Instant::now(); + let mut counter = 0; + while let Some(msg) = receiver.recv().await { + match msg { + MemphisEvent::MessageReceived(m) => { + assert_eq!(m.get_data_as_string().unwrap().as_str(), format!("Message {}", &counter)); + counter += 1; + m.ack().await.unwrap(); + if m.get_headers().clone().unwrap().get("id").unwrap().as_str() == format!("{}", message_count - 1) { + break; + } + } + _ => panic!("Received Event should be a MessageReceived Event."), + } + } + if counter != message_count { + panic!("Not all Messages were received. Only {} of {}", counter, message_count); + } + eprintln!("Receiving {} Messages took: {:?}", message_count, now.elapsed()); +} + +//TODO: Test for Messages in DLS once Memphis automatically resends them. diff --git a/memphis/tests/station_tests.rs b/memphis/tests/station_tests.rs new file mode 100644 index 0000000..ff864b4 --- /dev/null +++ b/memphis/tests/station_tests.rs @@ -0,0 +1,24 @@ +use memphis_rust_community::station::{MemphisStationsOptions, StorageType}; +use tokio_test::assert_ok; +mod common; +use common::*; + +#[tokio::test] +async fn test_station_creation() { + let random_station_name = uuid::Uuid::new_v4().to_string(); + eprintln!("random_station_name: {}", random_station_name); + + let client = connect_to_memphis().await; + + let station_options = MemphisStationsOptions::new(&random_station_name).with_storage_type(StorageType::File); + let station = client.create_station(station_options).await; + assert_ok!(&station, "Creating Station should be possible."); + + let station_options = MemphisStationsOptions::new(&random_station_name).with_storage_type(StorageType::File); + let station = client.create_station(station_options).await; + assert_ok!(station, "Creating Station with same Settings should be possible."); + + let station_options = MemphisStationsOptions::new(&random_station_name).with_storage_type(StorageType::Memory); + let station = client.create_station(station_options).await; + assert_ok!(station, "Creating Station with different Settings should be possible."); +} diff --git a/tests/empty.rs b/tests/empty.rs deleted file mode 100644 index 4653555..0000000 --- a/tests/empty.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[tokio::test] -async fn test() {}