Skip to content

Commit

Permalink
Fix Tests on Stable.
Browse files Browse the repository at this point in the history
  • Loading branch information
turulix committed Aug 12, 2023
1 parent e63729f commit a19187f
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 27 deletions.
5 changes: 2 additions & 3 deletions docker-compose.yml → docker-compose-latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ services:
memphis-metadata:
image: memphisos/memphis-metadata:docker-15.2.0-debian-11-r27
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U memphis -p 5005" ]
test: ["CMD-SHELL", "pg_isready -U memphis -p 5005"]
interval: 5s
timeout: 5s
retries: 5
Expand Down Expand Up @@ -36,7 +36,6 @@ services:
environment:
- ROOT_PASSWORD=memphis
- DOCKER_ENV=true
- ANALYTICS=true
- USER_PASS_BASED_AUTH=true
- CONNECTION_TOKEN=memphis
- METADATA_DB_HOST=memphis-metadata
Expand Down Expand Up @@ -66,4 +65,4 @@ services:
networks:
memphis:
ipam:
driver: default
driver: default
68 changes: 68 additions & 0 deletions docker-compose-stable.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
services:
memphis-metadata:
image: memphisos/memphis-metadata:docker-15.2.0-debian-11-r27
healthcheck:
test: ["CMD-SHELL", "pg_isready -U memphis -p 5005"]
interval: 5s
timeout: 5s
retries: 5
networks:
- memphis
ports:
- '5005:5005'
environment:
- POSTGRESQL_USERNAME=memphis
- POSTGRESQL_PASSWORD=memphis
- POSTGRESQL_DATABASE=memphis
- POSTGRESQL_PORT_NUMBER=5005
memphis:
image: "memphisos/memphis:stable"
depends_on:
memphis-metadata:
condition: service_healthy
healthcheck:
test: wget http://127.0.0.1:9000 --spider || exit 1
interval: 10s
retries: 30
start_period: 5s
restart: on-failure
pull_policy: always
networks:
- memphis
ports:
- "9000:9000"
- "6666:6666"
- "7770:7770"
environment:
- ROOT_PASSWORD=memphis
- DOCKER_ENV=true
- USER_PASS_BASED_AUTH=true
- CONNECTION_TOKEN=memphis
- METADATA_DB_HOST=memphis-metadata
volumes:
- /var/run/docker.sock:/var/run/docker.sock

memphis-rest-gateway:
image: "memphisos/memphis-rest-gateway:latest"
depends_on:
memphis:
condition: service_healthy
restart: on-failure
pull_policy: always
networks:
- memphis
ports:
- "4444:4444"
environment:
- JWT_SECRET=JWT_TEST_PURPOSE
- REFRESH_JWT_SECRET=REFRESH_JWT_TEST_PURPOSE
- USER_PASS_BASED_AUTH=true
- CONNECTION_TOKEN=memphis
- ROOT_USER=root
- ROOT_PASSWORD=memphis
- MEMPHIS_HOST=memphis
- HTTP_PORT=4444
networks:
memphis:
ipam:
driver: default
20 changes: 19 additions & 1 deletion memphis/src/models/response/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,26 @@ use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct CreateProducerResponse {
pub(crate) partitions_update: PartitionUpdate,
pub(crate) partitions_update: Option<PartitionUpdate>,
pub(crate) schema_update: SchemaUpdate,
pub(crate) error: String,
pub(crate) schemaverse_to_dls: bool,
pub(crate) send_notification: bool,
}

#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct SchemaUpdate {
pub(crate) schema_name: String,
pub(crate) active_version: ActiveVersion,
#[serde(rename = "type")]
pub(crate) type_name: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct ActiveVersion {
pub(crate) version_number: u32,
pub(crate) descriptor: String,
pub(crate) schema_content: String,
pub(crate) message_struct_name: String,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
20 changes: 13 additions & 7 deletions memphis/src/producer/memphis_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ impl MemphisProducer {
.map_err(|e| RequestError::MemphisError(e.to_string()))?;

let producer = match serde_json::from_str::<CreateProducerResponse>(res) {
Ok(x) => Self {
station,
options,
partitions_iterator: Some(PartitionIterator::new(
x.partitions_update.partitions_list,
)),
},
Ok(x) => {
let partitions_iterator = if let Some(partitions) = x.partitions_update {
Some(PartitionIterator::new(partitions.partitions_list))
} else {
None
};

Self {
station,
options,
partitions_iterator,
}
}
Err(e) => {
if res.is_empty() {
Self {
Expand Down
23 changes: 17 additions & 6 deletions memphis/tests/consumer_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod common;

use async_nats::jetstream::context::GetStreamError;
use async_nats::jetstream::stream::Stream;
use common::*;
use memphis_rust_community::consumer::{MemphisConsumerOptions, MemphisEvent};
use memphis_rust_community::producer::ComposableMessage;
Expand All @@ -24,12 +26,21 @@ async fn create_consumer() {
.await
);

let stream = assert_ok!(
client
.get_jetstream_context()
.get_stream(station.get_internal_name(Some(1)))
.await
);
let stream = match client
.get_jetstream_context()
.get_stream(station.get_internal_name(Some(1)))
.await
{
Ok(s) => s,
Err(_e) => {
assert_ok!(
client
.get_jetstream_context()
.get_stream(station.get_internal_name(None))
.await
)
}
};

let consumer1_info = assert_ok!(stream.consumer_info("no-group").await);
assert!(consumer1_info.name.eq(&consumer1.get_internal_name()));
Expand Down
23 changes: 13 additions & 10 deletions memphis/tests/message_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod common;

use common::*;
use memphis_rust_community::station::MemphisStationsOptions;
use memphis_rust_community::station::StorageType::Memory;

#[tokio::test]
async fn send_receive_message() {
Expand Down Expand Up @@ -218,11 +219,21 @@ async fn partition_sending_receiving() {
let station = assert_ok!(
client
.create_station(
MemphisStationsOptions::new(&random_station_name).with_partition_number(10)
MemphisStationsOptions::new(&random_station_name)
.with_storage_type(Memory)
.with_partition_number(10)
)
.await
);

let mut consumer = assert_ok!(
station
.create_consumer(MemphisConsumerOptions::new("consumer"))
.await
);

let mut receiver = assert_ok!(consumer.consume().await);

let mut producer = assert_ok!(
station
.create_producer(MemphisProducerOptions::new("producer"))
Expand All @@ -241,15 +252,7 @@ async fn partition_sending_receiving() {
assert_ok!(res, "Sending a Message should be possible.");
}

let mut consumer = assert_ok!(
station
.create_consumer(MemphisConsumerOptions::new("consumer"))
.await
);

tokio::time::sleep(Duration::from_secs(2)).await;

let mut receiver = assert_ok!(consumer.consume().await);
tokio::time::sleep(Duration::from_secs(10)).await;

for _ in 0..20 {
assert_ok!(receiver.try_recv());
Expand Down

0 comments on commit a19187f

Please sign in to comment.