Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Linting of Files #18

Merged
merged 5 commits into from
Jul 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/CD.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ jobs:

- name: Build
run: |
cargo build --release
cargo build --release --all-features

- name: Publish
run: |
cargo login ${{ secrets.RUST_API_KEY }}
cargo publish --token ${{ secrets.RUST_API_KEY }}
cargo publish --token ${{ secrets.RUST_API_KEY }} --all-features
10 changes: 3 additions & 7 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

env:
RUSTFLAGS: "-D warnings"

defaults:
run:
shell: bash


jobs:
test:
runs-on: ubuntu-latest
Expand All @@ -47,7 +43,7 @@ jobs:

- name: Setup Memphis
run: |
curl -sSL https://raw.githubusercontent.com/memphisdev/memphis-docker/master/docker-compose.yml > docker-compose.yml
curl -sSL https://raw.githubusercontent.com/memphisdev/memphis-docker/master/docker-compose-latest.yml > docker-compose.yml
docker-compose up -d

- name: Install Rust
Expand All @@ -57,7 +53,7 @@ jobs:

- name: Run Tests
run: |
cargo test --all
cargo test --workspace --all-targets --all-features

check_format:
name: check (format)
Expand Down Expand Up @@ -125,4 +121,4 @@ jobs:
rustup component add clippy

- name: Check lint
run: cargo doc
run: cargo doc --all-features
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[workspace]
resolver = "2"
members = [
"memphis"
]
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- POSTGRESQL_DATABASE=memphis
- POSTGRESQL_PORT_NUMBER=5005
memphis:
image: "memphisos/memphis:stable"
image: "memphisos/memphis:latest"
depends_on:
memphis-metadata:
condition: service_healthy
Expand Down
3 changes: 3 additions & 0 deletions memphis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ async-trait = "0.1.17"
hex = { version = "0.4.3", features = ["serde"] }

jsonschema = { version = "0.17", optional = true }

[dev-dependencies]
tokio-test = "0.4.2"
3 changes: 3 additions & 0 deletions memphis/src/constants/memphis_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub(crate) enum MemphisSpecialStation {
ConsumerDestructions,
StationDestructions,

#[allow(dead_code)]
SchemaAttachments,
#[allow(dead_code)]
SchemaDetachments,

Notifications,
Expand Down Expand Up @@ -67,6 +69,7 @@ impl IntoHeaderName for MemphisHeaders {

pub(crate) enum MemphisSubscriptions {
DlsPrefix,
#[allow(dead_code)]
SchemaUpdatesPrefix,
}

Expand Down
2 changes: 1 addition & 1 deletion memphis/src/consumer/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_nats::Error;

use crate::consumer::MemphisMessage;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum MemphisEvent {
MessageReceived(MemphisMessage),
StationUnavailable(Arc<Error>),
Expand Down
8 changes: 5 additions & 3 deletions memphis/src/consumer/incoming_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ impl MemphisMessage {
Ok(_) => Ok(()),
Err(_) => Err(()),
};
}
if let Some(_cg_name) = headers.get("$memphis_pm_cg_name") {
} else if let Some(_cg_name) = headers.get("$memphis_pm_cg_name") {
return match self.msg.ack_with(AckKind::Nak(Some(delay))).await {
Ok(_) => Ok(()),
Err(_) => Err(()),
};
}
}
Err(())
match self.msg.ack_with(AckKind::Nak(Some(delay))).await {
Ok(_) => Ok(()),
Err(_) => Err(()),
}
}
}

Expand Down
82 changes: 49 additions & 33 deletions memphis/src/consumer/memphis_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::time::Duration;
use async_nats::jetstream::consumer::PullConsumer;
use async_nats::{Error, Message};
use futures_util::StreamExt;
use log::{debug, error, info, trace};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use log::{debug, error, info, trace, warn};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_util::sync::CancellationToken;

use crate::constants::memphis_constants::{MemphisSpecialStation, MemphisSubscriptions};
Expand All @@ -24,12 +24,7 @@ pub struct MemphisConsumer {
memphis_client: MemphisClient,
options: MemphisConsumerOptions,
cancellation_token: CancellationToken,

/// The receiver for the MemphisEvents.
/// This is used to communicate with the MemphisConsumer.
/// The MemphisConsumer will send events to this receiver.
pub message_receiver: Receiver<MemphisEvent>,
message_sender: Sender<MemphisEvent>,
message_sender: Option<UnboundedSender<MemphisEvent>>,
}

impl MemphisConsumer {
Expand Down Expand Up @@ -70,14 +65,11 @@ impl MemphisConsumer {

info!("Consumer '{}' created successfully", &options.consumer_name);

let (tx, rx) = channel(100);

let consumer = Self {
memphis_client,
options,
cancellation_token: CancellationToken::new(),
message_sender: tx,
message_receiver: rx,
message_sender: None,
};

consumer.ping_consumer();
Expand All @@ -97,29 +89,39 @@ impl MemphisConsumer {
/// ```rust
/// use memphis_rust_community::memphis_client::MemphisClient;
/// use memphis_rust_community::consumer::MemphisConsumerOptions;
/// use memphis_rust_community::station::MemphisStationsOptions;
///
/// #[tokio::main]
/// async fn main() {
///
/// let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap();
/// let consumer_options = MemphisConsumerOptions::new("my-station", "my-consumer")
///
/// let station_options = MemphisStationsOptions::new("test_station");
/// let station = client.create_station(station_options).await.unwrap();
///
/// let consumer_options = MemphisConsumerOptions::new("test_consumer")
/// .with_generate_unique_suffix(true);
/// let mut consumer = client.create_consumer(consumer_options).await.unwrap();
/// let mut consumer = station.create_consumer(consumer_options).await.unwrap();
///
/// let mut message_receiver = consumer.consume().await.unwrap();
///
/// tokio::spawn(async move {
/// loop{
/// let msg = message_receiver.recv().await;
/// // Do something with the message
/// break;
/// }
/// });
///
/// consumer.consume().await.unwrap();
/// tokio::spawn(async move {
/// loop{
/// let msg = consumer.message_receiver.recv().await;
/// // Do something with the message
/// break;
/// }
/// });
/// }
/// ```
pub async fn consume(&self) -> Result<(), Error> {
pub async fn consume(&mut self) -> Result<UnboundedReceiver<MemphisEvent>, Error> {
let cloned_token = self.cancellation_token.clone();
let cloned_client = self.memphis_client.clone();
let cloned_options = self.options.clone();
let cloned_sender = self.message_sender.clone();

let (sender, receiver) = unbounded_channel::<MemphisEvent>();
self.message_sender = Some(sender.clone());

// Memphis will create a stream with the name of the station.
// On this Stream it will create a consumer with the name of the consumer Group.
Expand Down Expand Up @@ -156,15 +158,21 @@ impl MemphisConsumer {
trace!(
"Message received from Memphis. (Subject: {}, Sequence: {})",
msg.subject,
msg.info().expect("NONE").stream_sequence
match msg.info() {
Ok(info) => info.stream_sequence,
Err(_e) => 0,
}
);
let memphis_message = MemphisMessage::new(
msg,
cloned_client.clone(),
cloned_options.consumer_group.clone(),
cloned_options.max_ack_time_ms,
);
let _res = cloned_sender.send(MemphisEvent::MessageReceived(memphis_message));
let res = sender.send(MemphisEvent::MessageReceived(memphis_message));
if res.is_err() {
error!("Error while sending message to the receiver. {:?}", res.err());
}
}
}
});
Expand All @@ -173,7 +181,7 @@ impl MemphisConsumer {
"Successfully started consuming messages from Memphis with consumer '{}' on group: '{}'",
self.options.consumer_name, self.options.consumer_group
);
Ok(())
Ok(receiver)
}

/// # Starts consuming DLS messages from Memphis.
Expand All @@ -188,9 +196,9 @@ impl MemphisConsumer {
///
/// # Returns
/// A [Receiver] that will receive the DLS messages.
pub async fn consume_dls(&self) -> Result<Receiver<Arc<Message>>, Error> {
pub async fn consume_dls(&self) -> Result<UnboundedReceiver<Arc<Message>>, Error> {
//TODO: Remove Arc once async_nats is updated to >=0.30.0 (https://github.com/nats-io/nats.rs/pull/975)
let (s, r) = channel::<Arc<Message>>(100);
let (s, r) = unbounded_channel::<Arc<Message>>();
let subject = format!(
"{}{}_{}",
MemphisSubscriptions::DlsPrefix.to_string(),
Expand Down Expand Up @@ -258,9 +266,17 @@ impl MemphisConsumer {
let cloned_sender = self.message_sender.clone();

tokio::spawn(async move {
fn send_message(sender: &Sender<MemphisEvent>, event: MemphisEvent) {
let _res = sender.send(event);
fn send_message(sender: &Option<UnboundedSender<MemphisEvent>>, event: MemphisEvent, consumer_name: &str) {
match sender {
None => {
warn!("Consumer {} tried to send event, without the Sender being initialised", consumer_name);
}
Some(s) => {
let _res = s.send(event);
}
}
}
let name = get_effective_consumer_name(&cloned_options);
while !cloned_token.is_cancelled() {
let stream = match cloned_client
.get_jetstream_context()
Expand All @@ -269,7 +285,7 @@ impl MemphisConsumer {
{
Ok(s) => s,
Err(e) => {
send_message(&cloned_sender, MemphisEvent::StationUnavailable(Arc::new(e)));
send_message(&cloned_sender, MemphisEvent::StationUnavailable(Arc::new(e)), name.as_str());
error!("Station {} is unavailable. (Ping)", &cloned_options.station_name.clone());
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
Expand All @@ -279,7 +295,7 @@ impl MemphisConsumer {
match stream.consumer_info(get_effective_consumer_name(&cloned_options)).await {
Ok(_) => {}
Err(e) => {
send_message(&cloned_sender, MemphisEvent::ConsumerUnavailable(Arc::new(e)));
send_message(&cloned_sender, MemphisEvent::ConsumerUnavailable(Arc::new(e)), name.as_str());
error!(
"Consumer '{}' on group '{}' is unavailable. (Ping)",
&cloned_options.consumer_name, &cloned_options.consumer_group
Expand Down
12 changes: 3 additions & 9 deletions memphis/src/consumer/memphis_consumer_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
///
/// #[tokio::main]
/// async fn main() {
/// let options = MemphisConsumerOptions::new("station_name", "consumer_name")
/// let options = MemphisConsumerOptions::new("station_name")
/// .with_consumer_group("consumer_group")
/// .with_generate_unique_suffix(true)
/// .with_pull_interval_ms(1000)
/// .with_batch_size(10);
/// }
#[derive(Debug, Clone)]
pub struct MemphisConsumerOptions {
pub station_name: String,
pub(crate) station_name: String,
pub consumer_name: String,
pub consumer_group: String,
pub pull_interval_ms: i32,
Expand Down Expand Up @@ -46,19 +46,13 @@ impl Default for MemphisConsumerOptions {
}

impl MemphisConsumerOptions {
pub fn new(station_name: &str, consumer_name: &str) -> Self {
pub fn new(consumer_name: &str) -> Self {
MemphisConsumerOptions {
station_name: station_name.to_string(),
consumer_name: consumer_name.to_string(),
..Default::default()
}
}

pub fn with_station_name(mut self, station_name: String) -> Self {
self.station_name = station_name;
self
}

pub fn with_consumer_name(mut self, consumer_name: String) -> Self {
self.consumer_name = consumer_name;
self
Expand Down
1 change: 1 addition & 0 deletions memphis/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod incoming_message;
mod memphis_consumer;
mod memphis_consumer_options;

/// Get the effective consumer name. If the consumer group is empty, the consumer name is used. Otherwise, the consumer group is used.
fn get_effective_consumer_name(options: &MemphisConsumerOptions) -> String {
if options.consumer_group.is_empty() {
get_internal_name(&options.consumer_name)
Expand Down
13 changes: 9 additions & 4 deletions memphis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
//! ```rust
//! use memphis_rust_community::memphis_client::MemphisClient;
//! use memphis_rust_community::consumer::MemphisConsumerOptions;
//! use memphis_rust_community::station::MemphisStationsOptions;
//!
//! #[tokio::main]
//! async fn main() {
//! let client = MemphisClient::new("localhost:6666", "root", "memphis").await.unwrap();
//! let consumer_options = MemphisConsumerOptions::new("my-station", "my-consumer")
//!
//! let station_options = MemphisStationsOptions::new("my-station");
//! let station = client.create_station(station_options).await.unwrap();
//!
//! let consumer_options = MemphisConsumerOptions::new("my-consumer")
//! .with_generate_unique_suffix(true);
//! let mut consumer = client.create_consumer(consumer_options).await.unwrap();
//! let mut consumer = station.create_consumer(consumer_options).await.unwrap();
//!
//! consumer.consume().await.unwrap();
//! let mut message_receiver = consumer.consume().await.unwrap();
//! tokio::spawn(async move {
//! loop{
//! let msg = consumer.message_receiver.recv().await;
//! let msg = message_receiver.recv().await;
//! // Do something with the message
//! break;
//! }
Expand Down
Loading