From f93cd4ffae8bc9afcb3062ea984a251428dc2b07 Mon Sep 17 00:00:00 2001 From: Steven Sheehy Date: Thu, 1 Jul 2021 12:26:34 -0500 Subject: [PATCH] Fix RedisEntityListenerTest.onSlowPublish Signed-off-by: Steven Sheehy --- .../entity/redis/RedisEntityListenerTest.java | 82 +++++++++---------- 1 file changed, 40 insertions(+), 42 deletions(-) diff --git a/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/redis/RedisEntityListenerTest.java b/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/redis/RedisEntityListenerTest.java index 5fbc22c0d66..068cb9c088a 100644 --- a/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/redis/RedisEntityListenerTest.java +++ b/hedera-mirror-importer/src/test/java/com/hedera/mirror/importer/parser/record/entity/redis/RedisEntityListenerTest.java @@ -20,17 +20,14 @@ * ‍ */ -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.Uninterruptibles; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; +import java.time.Duration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -39,6 +36,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.SessionCallback; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; import com.hedera.mirror.importer.MirrorProperties; import com.hedera.mirror.importer.domain.EntityId; @@ -51,7 +52,7 @@ @ExtendWith(MockitoExtension.class) class RedisEntityListenerTest { - private static final long TIMEOUT_MILLIS = 2000L; + private static final Duration TIMEOUT = Duration.ofSeconds(2L); @Mock private RedisOperations redisOperations; @@ -72,45 +73,38 @@ void setup() { @Test void onSlowPublish() { - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger saveCount = new AtomicInteger(0); - // given - List messages = new ArrayList<>(); - for (int i = 0; i < redisProperties.getQueueCapacity() + 2; i++) { - messages.add(topicMessage()); - } + int publishCount = redisProperties.getQueueCapacity() + 2; + Sinks.Many sink = Sinks.many().multicast().directBestEffort(); + Flux publisher = Flux.range(1, publishCount).doOnNext(i -> { + submitAndSave(topicMessage()); + entityListener.onCleanup(new EntityBatchCleanupEvent(this)); + }); // when when(redisOperations.executePipelined(any(SessionCallback.class))).then((callback) -> { - latch.await(); + Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(50L)); + sink.tryEmitNext(callback); return null; }); - // drive entityListener to handle topic messages in a different thread cause it will block - new Thread(() -> { - try { - for (TopicMessage message : messages) { - submitAndSave(message); - entityListener.onCleanup(new EntityBatchCleanupEvent(this)); - saveCount.getAndIncrement(); - } - } catch (InterruptedException e) { - // ignore - } - }).start(); - //then - //Thread is blocked because queue is full, and publisher is blocked on first message. - verify(redisOperations, timeout(TIMEOUT_MILLIS * 5).times(1)) + StepVerifier redisVerifier = sink.asFlux() + .subscribeOn(Schedulers.parallel()) + .as(StepVerifier::create) + .expectNextCount(publishCount) + .thenCancel() + .verifyLater(); + + publisher.publishOn(Schedulers.parallel()) + .as(StepVerifier::create) + .expectNextCount(publishCount) + .expectComplete() + .verify(TIMEOUT); + + redisVerifier.verify(TIMEOUT); + verify(redisOperations, timeout(TIMEOUT.toMillis() * 5).times(publishCount)) .executePipelined(any(SessionCallback.class)); - assertThat(saveCount.get()).isEqualTo(redisProperties.getQueueCapacity() + 1); - - latch.countDown(); - //All messages should be queued and published - verify(redisOperations, timeout(TIMEOUT_MILLIS * 5).times(redisProperties.getQueueCapacity() + 2)) - .executePipelined(any(SessionCallback.class)); - assertThat(saveCount.get()).isEqualTo(redisProperties.getQueueCapacity() + 2); } @Test @@ -122,21 +116,21 @@ void onDuplicateTopicMessages() throws InterruptedException { //submitAndSave two messages, verify publish logic called twice submitAndSave(topicMessage1); submitAndSave(topicMessage2); - verify(redisOperations, timeout(TIMEOUT_MILLIS).times(2)) + verify(redisOperations, timeout(TIMEOUT.toMillis()).times(2)) .executePipelined(any(SessionCallback.class)); //submitAndSave two duplicate messages, verify publish was not attempted Mockito.reset(redisOperations); submitAndSave(topicMessage1); submitAndSave(topicMessage2); - verify(redisOperations, timeout(TIMEOUT_MILLIS).times(0)) + verify(redisOperations, timeout(TIMEOUT.toMillis()).times(0)) .executePipelined(any(SessionCallback.class)); //submitAndSave third new unique message, verify publish called once. Mockito.reset(redisOperations); submitAndSave(topicMessage3); entityListener.onCleanup(new EntityBatchCleanupEvent(this)); - verify(redisOperations, timeout(TIMEOUT_MILLIS).times(1)) + verify(redisOperations, timeout(TIMEOUT.toMillis()).times(1)) .executePipelined(any(SessionCallback.class)); } @@ -156,8 +150,12 @@ protected TopicMessage topicMessage() { return topicMessage; } - private void submitAndSave(TopicMessage topicMessage) throws InterruptedException { - entityListener.onTopicMessage(topicMessage); - entityListener.onSave(new EntityBatchSaveEvent(this)); + private void submitAndSave(TopicMessage topicMessage) { + try { + entityListener.onTopicMessage(topicMessage); + entityListener.onSave(new EntityBatchSaveEvent(this)); + } catch (Exception e) { + Thread.currentThread().interrupt(); + } } }