Skip to content

Commit

Permalink
Fixed bug in Iterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
turulix committed Aug 14, 2023
1 parent d0256f1 commit 9f05d79
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions memphis/src/producer/memphis_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use log::{debug, error, info};
use crate::constants::memphis_constants::{
MemphisHeaders, MemphisNotificationType, MemphisSpecialStation,
};
use crate::helper::memphis_util::{get_internal_name, sanitize_name};
use crate::helper::memphis_util::sanitize_name;
use crate::helper::partition_iterator::PartitionIterator;
use crate::models::request::{CreateProducerRequest, DestroyProducerRequest};
use crate::models::response::CreateProducerResponse;
Expand Down Expand Up @@ -81,7 +81,7 @@ impl MemphisProducer {
Ok(producer)
}

pub async fn produce(&self, mut message: ComposableMessage) -> Result<(), ProducerError> {
pub async fn produce(&mut self, mut message: ComposableMessage) -> Result<(), ProducerError> {
if message.payload.is_empty() {
return Err(ProducerError::PayloadEmpty);
}
Expand All @@ -106,17 +106,14 @@ impl MemphisProducer {
.await
.map_err(ProducerError::SchemaValidationError)?;

match &self.partitions_iterator {
match &mut self.partitions_iterator {
None => {
if let Err(e) = self
.station
.memphis_client
.get_broker_connection()
.publish_with_headers(
format!(
"{}.final",
get_internal_name(&self.station.get_internal_name(None))
),
self.station.get_internal_subject_name(None),
message.headers,
message.payload,
)
Expand All @@ -140,10 +137,7 @@ impl MemphisProducer {
.memphis_client
.get_broker_connection()
.publish_with_headers(
format!(
"{}.final",
&self.station.get_internal_name(Some(*partition))
),
self.station.get_internal_subject_name(Some(partition)),
message.headers,
message.payload,
)
Expand Down Expand Up @@ -204,7 +198,7 @@ impl MemphisProducer {
};

if let Err(e) = schema_validator.validate(&message.payload) {
self.send_notification(&message, &e).await?;
self.send_notification(message, &e).await?;

if self.station.options.send_schema_failed_msg_to_dls {
self.send_message_to_dls(message, &e).await?;
Expand Down

0 comments on commit 9f05d79

Please sign in to comment.