From 69c2c2aef4def05b7b8ba0ca2d8ffceedea5613f Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 30 Nov 2017 18:01:52 -0800 Subject: [PATCH] #266 - Refactor reactive tests to use StepVerifier and test methods. We now use StepVerifier and RxJava's .test() methods instead of .block() calls. Using blocking methods is an anti pattern which should be avoided within tests. Test API comes with timeouts and protects tests from never completing. --- cassandra/reactive/pom.xml | 6 ++ ...ctiveCassandraTemplateIntegrationTest.java | 24 ++--- ...activePersonRepositoryIntegrationTest.java | 67 ++++---------- ...xJava2PersonRepositoryIntegrationTest.java | 73 +++++++-------- mongodb/reactive/pom.xml | 8 +- .../ReactiveMongoTemplateIntegrationTest.java | 36 +++----- ...activePersonRepositoryIntegrationTest.java | 91 ++++++++----------- ...xJava2PersonRepositoryIntegrationTest.java | 90 +++++++++--------- .../redis/commands/KeyCommandsTests.java | 31 +++---- 9 files changed, 189 insertions(+), 237 deletions(-) diff --git a/cassandra/reactive/pom.xml b/cassandra/reactive/pom.xml index dc86fae72..5816705d6 100644 --- a/cassandra/reactive/pom.xml +++ b/cassandra/reactive/pom.xml @@ -28,6 +28,12 @@ rxjava-reactive-streams + + io.projectreactor + reactor-test + test + + ${project.groupId} spring-data-cassandra-example-utils diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java index 66baaddb1..53986d010 100644 --- a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactiveCassandraTemplateIntegrationTest.java @@ -20,10 +20,10 @@ import example.springdata.cassandra.util.CassandraKeyspace; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import rx.RxReactiveStreams; -import java.util.concurrent.CountDownLatch; - import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -52,14 +52,14 @@ public class ReactiveCassandraTemplateIntegrationTest { @Before public void setUp() { - template.truncate(Person.class) // + Flux truncateAndInsert = template.truncate(Person.class) // .thenMany(Flux.just(new Person("Walter", "White", 50), // new Person("Skyler", "White", 45), // new Person("Saul", "Goodman", 42), // new Person("Jesse", "Pinkman", 27))) // - .flatMap(template::insert) // - .then() // - .block(); + .flatMap(template::insert); + + StepVerifier.create(truncateAndInsert).expectNextCount(4).verifyComplete(); } /** @@ -67,22 +67,18 @@ public void setUp() { * the two counts ({@code 4} and {@code 6}) to the console. */ @Test - public void shouldInsertAndCountData() throws Exception { + public void shouldInsertAndCountData() { - CountDownLatch countDownLatch = new CountDownLatch(1); - - template.count(Person.class) // + Mono saveAndCount = template.count(Person.class) // .doOnNext(System.out::println) // .thenMany(Flux.just(new Person("Hank", "Schrader", 43), // new Person("Mike", "Ehrmantraut", 62))) .flatMap(template::insert) // .last() // .flatMap(v -> template.count(Person.class)) // - .doOnNext(System.out::println) // - .doOnTerminate(countDownLatch::countDown) // - .subscribe(); + .doOnNext(System.out::println); - countDownLatch.await(); + StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete(); } /** diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java index 2c962f1de..5fa464207 100644 --- a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/ReactivePersonRepositoryIntegrationTest.java @@ -15,14 +15,10 @@ */ package example.springdata.cassandra.people; -import static org.assertj.core.api.Assertions.*; - import example.springdata.cassandra.util.CassandraKeyspace; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; - -import java.util.List; -import java.util.concurrent.CountDownLatch; +import reactor.test.StepVerifier; import org.junit.Before; import org.junit.ClassRule; @@ -51,34 +47,30 @@ public class ReactivePersonRepositoryIntegrationTest { @Before public void setUp() { - repository.deleteAll() // + Flux deleteAndInsert = repository.deleteAll() // .thenMany(repository.saveAll(Flux.just(new Person("Walter", "White", 50), // new Person("Skyler", "White", 45), // new Person("Saul", "Goodman", 42), // - new Person("Jesse", "Pinkman", 27)))) - .then() // - .block(); + new Person("Jesse", "Pinkman", 27)))); + + StepVerifier.create(deleteAndInsert).expectNextCount(4).verifyComplete(); } /** * This sample performs a count, inserts data and performs a count again using reactive operator chaining. */ @Test - public void shouldInsertAndCountData() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); + public void shouldInsertAndCountData() { - repository.count() // + Mono saveAndCount = repository.count() // .doOnNext(System.out::println) // .thenMany(repository.saveAll(Flux.just(new Person("Hank", "Schrader", 43), // new Person("Mike", "Ehrmantraut", 62)))) // .last() // .flatMap(v -> repository.count()) // - .doOnNext(System.out::println) // - .doOnTerminate(countDownLatch::countDown) // - .subscribe(); + .doOnNext(System.out::println); - countDownLatch.await(); + StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete(); } /** @@ -86,17 +78,11 @@ public void shouldInsertAndCountData() throws Exception { * prefetch define the amount of fetched records. */ @Test - public void shouldPerformConversionBeforeResultProcessing() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); - - repository.findAll() // - .doOnNext(System.out::println) // - .doOnComplete(countDownLatch::countDown) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); + public void shouldPerformConversionBeforeResultProcessing() { - countDownLatch.await(); + StepVerifier.create(repository.findAll().doOnNext(System.out::println)) // + .expectNextCount(4) // + .verifyComplete(); } /** @@ -104,12 +90,7 @@ public void shouldPerformConversionBeforeResultProcessing() throws Exception { */ @Test public void shouldQueryDataWithQueryDerivation() { - - List whites = repository.findByLastname("White") // - .collectList() // - .block(); - - assertThat(whites).hasSize(2); + StepVerifier.create(repository.findByLastname("White")).expectNextCount(2).verifyComplete(); } /** @@ -117,11 +98,7 @@ public void shouldQueryDataWithQueryDerivation() { */ @Test public void shouldQueryDataWithStringQuery() { - - Person heisenberg = repository.findByFirstnameInAndLastname("Walter", "White") // - .block(); - - assertThat(heisenberg).isNotNull(); + StepVerifier.create(repository.findByFirstnameInAndLastname("Walter", "White")).expectNextCount(1).verifyComplete(); } /** @@ -129,12 +106,7 @@ public void shouldQueryDataWithStringQuery() { */ @Test public void shouldQueryDataWithDeferredQueryDerivation() { - - List whites = repository.findByLastname(Mono.just("White")) // - .collectList() // - .block(); - - assertThat(whites).hasSize(2); + StepVerifier.create(repository.findByLastname(Mono.just("White"))).expectNextCount(2).verifyComplete(); } /** @@ -143,10 +115,9 @@ public void shouldQueryDataWithDeferredQueryDerivation() { @Test public void shouldQueryDataWithMixedDeferredQueryDerivation() { - Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") // - .block(); - - assertThat(heisenberg).isNotNull(); + StepVerifier.create(repository.findByFirstnameAndLastname(Mono.just("Walter"), "White")) // + .expectNextCount(1) // + .verifyComplete(); } } diff --git a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java index 612f09ec7..2ec9ab639 100644 --- a/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java +++ b/cassandra/reactive/src/test/java/example/springdata/cassandra/people/RxJava2PersonRepositoryIntegrationTest.java @@ -15,16 +15,11 @@ */ package example.springdata.cassandra.people; -import static org.assertj.core.api.Assertions.*; - import example.springdata.cassandra.util.CassandraKeyspace; import io.reactivex.Completable; import io.reactivex.Flowable; import io.reactivex.Single; -import java.util.List; -import java.util.concurrent.CountDownLatch; - import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -60,16 +55,16 @@ public void setUp() throws Exception { new Person("Saul", "Goodman", 42), // new Person("Jesse", "Pinkman", 27))); - deleteAll.andThen(save).blockingLast(); + deleteAll.andThen(save).test().await().assertNoErrors(); } /** - * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints + * the two counts ({@code 4} and {@code 6}) to the console. */ @Test - public void shouldInsertAndCountData() throws Exception { + public void shouldInsertAndCountData() { - CountDownLatch countDownLatch = new CountDownLatch(1); repository.count() // .doOnSuccess(System.out::println) // @@ -80,10 +75,11 @@ public void shouldInsertAndCountData() throws Exception { .toSingle() // .flatMap(v -> repository.count()) // .doOnSuccess(System.out::println) // - .doAfterTerminate(countDownLatch::countDown) // - .subscribe(); - - countDownLatch.await(); + .test() // + .awaitCount(1) // + .assertValue(6L) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -91,17 +87,14 @@ public void shouldInsertAndCountData() throws Exception { * prefetch define the amount of fetched records. */ @Test - public void shouldPerformConversionBeforeResultProcessing() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); + public void shouldPerformConversionBeforeResultProcessing() { repository.findAll() // .doOnNext(System.out::println) // - .doOnEach(it -> countDownLatch.countDown()) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); - - countDownLatch.await(); + .test() // + .awaitCount(4) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -110,11 +103,11 @@ public void shouldPerformConversionBeforeResultProcessing() throws Exception { @Test public void shouldQueryDataWithQueryDerivation() { - List whites = repository.findByLastname("White") // - .toList() // - .blockingGet(); - - assertThat(whites).hasSize(2); + repository.findByLastname("White") // + .test() // + .awaitCount(2) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -123,10 +116,11 @@ public void shouldQueryDataWithQueryDerivation() { @Test public void shouldQueryDataWithStringQuery() { - Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") // - .blockingGet(); - - assertThat(heisenberg).isNotNull(); + repository.findByFirstnameAndLastname("Walter", "White") // + .test() // + .awaitCount(1) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -135,11 +129,11 @@ public void shouldQueryDataWithStringQuery() { @Test public void shouldQueryDataWithDeferredQueryDerivation() { - List whites = repository.findByLastname(Single.just("White")) // - .toList() // - .blockingGet(); - - assertThat(whites).hasSize(2); + repository.findByLastname(Single.just("White")) // + .test() // + .awaitCount(2) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -148,9 +142,10 @@ public void shouldQueryDataWithDeferredQueryDerivation() { @Test public void shouldQueryDataWithMixedDeferredQueryDerivation() { - Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") // - .blockingGet(); - - assertThat(heisenberg).isNotNull(); + repository.findByFirstnameAndLastname(Single.just("Walter"), "White") // + .test() // + .awaitCount(1) // + .assertNoErrors() // + .awaitTerminalEvent(); } } diff --git a/mongodb/reactive/pom.xml b/mongodb/reactive/pom.xml index d0777f54d..d00ca95b5 100644 --- a/mongodb/reactive/pom.xml +++ b/mongodb/reactive/pom.xml @@ -16,7 +16,7 @@ - + org.springframework.boot spring-boot-starter-data-mongodb-reactive @@ -33,6 +33,12 @@ rxjava-reactive-streams + + io.projectreactor + reactor-test + test + + diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java index 31886e7b3..6e5807f8b 100644 --- a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactiveMongoTemplateIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,10 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import rx.RxReactiveStreams; -import java.util.concurrent.CountDownLatch; +import java.util.Arrays; import org.junit.Before; import org.junit.Test; @@ -47,19 +48,15 @@ public class ReactiveMongoTemplateIntegrationTest { @Before public void setUp() { - template.collectionExists(Person.class) // - .flatMap(exists -> exists ? template.dropCollection(Person.class) : Mono.just(exists)) // - .flatMap(exists -> template.createCollection(Person.class)) // - .then() // - .block(); + StepVerifier.create(template.dropCollection(Person.class)).verifyComplete(); - template + Flux insertAll = template .insertAll(Flux.just(new Person("Walter", "White", 50), // new Person("Skyler", "White", 45), // new Person("Saul", "Goodman", 42), // - new Person("Jesse", "Pinkman", 27)).collectList()) - .then() // - .block(); + new Person("Jesse", "Pinkman", 27)).collectList()); + + StepVerifier.create(insertAll).expectNextCount(4).verifyComplete(); } /** @@ -67,29 +64,24 @@ public void setUp() { * the two counts ({@code 4} and {@code 6}) to the console. */ @Test - public void shouldInsertAndCountData() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); + public void shouldInsertAndCountData() { - template.count(new Query(), Person.class) // + Mono count = template.count(new Query(), Person.class) // .doOnNext(System.out::println) // - .thenMany(template.save(Flux.just(new Person("Hank", "Schrader", 43), // + .thenMany(template.insertAll(Arrays.asList(new Person("Hank", "Schrader", 43), // new Person("Mike", "Ehrmantraut", 62)))) // .last() // .flatMap(v -> template.count(new Query(), Person.class)) // - .doOnNext(System.out::println) // - .doOnSuccess(it -> countDownLatch.countDown()) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); + .doOnNext(System.out::println);// - countDownLatch.await(); + StepVerifier.create(count).expectNext(6L).verifyComplete(); } /** * Note that the all object conversions are performed before the results are printed to the console. */ @Test - public void convertReactorTypesToRxJava2() throws Exception { + public void convertReactorTypesToRxJava2() { Flux flux = template.find(Query.query(Criteria.where("lastname").is("White")), Person.class); diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java index 090bbd870..e8434abe9 100644 --- a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java @@ -20,12 +20,12 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; +import org.bson.Document; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,6 +35,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoOperations; import org.springframework.test.context.junit4.SpringRunner; +import com.mongodb.reactivestreams.client.MongoCollection; + /** * Integration test for {@link ReactivePersonRepository} using Project Reactor types and operators. * @@ -50,60 +52,50 @@ public class ReactivePersonRepositoryIntegrationTest { @Before public void setUp() { - operations.collectionExists(Person.class) // + Mono> recreateCollection = operations.collectionExists(Person.class) // .flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) // .then(operations.createCollection(Person.class, CollectionOptions.empty() // .size(1024 * 1024) // .maxDocuments(100) // - .capped())) // - .block(); + .capped())); + + StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete(); - repository - .saveAll(Flux.just(new Person("Walter", "White", 50), // + Flux insertAll = operations.insertAll(Flux.just(new Person("Walter", "White", 50), // new Person("Skyler", "White", 45), // new Person("Saul", "Goodman", 42), // - new Person("Jesse", "Pinkman", 27))) // - .then() // - .block(); + new Person("Jesse", "Pinkman", 27)).collectList()); + + StepVerifier.create(insertAll).expectNextCount(4).verifyComplete(); } /** - * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints + * the two counts ({@code 4} and {@code 6}) to the console. */ @Test - public void shouldInsertAndCountData() throws Exception { + public void shouldInsertAndCountData() { - CountDownLatch countDownLatch = new CountDownLatch(1); - - repository.count() // + Mono saveAndCount = repository.count() // .doOnNext(System.out::println) // .thenMany(repository.saveAll(Flux.just(new Person("Hank", "Schrader", 43), // new Person("Mike", "Ehrmantraut", 62)))) // .last() // .flatMap(v -> repository.count()) // - .doOnNext(System.out::println) // - .doOnSuccess(it -> countDownLatch.countDown()) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); + .doOnNext(System.out::println); - countDownLatch.await(); + StepVerifier.create(saveAndCount).expectNext(6L).verifyComplete(); } /** * Note that the all object conversions are performed before the results are printed to the console. */ @Test - public void shouldPerformConversionBeforeResultProcessing() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); + public void shouldPerformConversionBeforeResultProcessing() { - repository.findAll() // - .doOnNext(System.out::println) // - .doOnComplete(countDownLatch::countDown) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); - - countDownLatch.await(); + StepVerifier.create(repository.findAll().doOnNext(System.out::println)) // + .expectNextCount(4) // + .verifyComplete(); } /** @@ -123,15 +115,21 @@ public void shouldStreamDataWithTailableCursor() throws Exception { Thread.sleep(100); - repository.save(new Person("Tuco", "Salamanca", 33)).subscribe(); + StepVerifier.create(repository.save(new Person("Tuco", "Salamanca", 33))) // + .expectNextCount(1) // + .verifyComplete(); Thread.sleep(100); - repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe(); + StepVerifier.create(repository.save(new Person("Mike", "Ehrmantraut", 62))) // + .expectNextCount(1) // + .verifyComplete(); Thread.sleep(100); disposable.dispose(); - repository.save(new Person("Gus", "Fring", 53)).subscribe(); + StepVerifier.create(repository.save(new Person("Gus", "Fring", 53))) // + .expectNextCount(1) // + .verifyComplete(); Thread.sleep(100); assertThat(people).hasSize(6); @@ -142,12 +140,7 @@ public void shouldStreamDataWithTailableCursor() throws Exception { */ @Test public void shouldQueryDataWithQueryDerivation() { - - List whites = repository.findByLastname("White") // - .collectList() // - .block(); - - assertThat(whites).hasSize(2); + StepVerifier.create(repository.findByLastname("White")).expectNextCount(2).verifyComplete(); } /** @@ -155,11 +148,7 @@ public void shouldQueryDataWithQueryDerivation() { */ @Test public void shouldQueryDataWithStringQuery() { - - Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") // - .block(); - - assertThat(heisenberg).isNotNull(); + StepVerifier.create(repository.findByFirstnameAndLastname("Walter", "White")).expectNextCount(1).verifyComplete(); } /** @@ -167,12 +156,7 @@ public void shouldQueryDataWithStringQuery() { */ @Test public void shouldQueryDataWithDeferredQueryDerivation() { - - List whites = repository.findByLastname(Mono.just("White")) // - .collectList() // - .block(); - - assertThat(whites).hasSize(2); + StepVerifier.create(repository.findByLastname(Mono.just("White"))).expectNextCount(2).verifyComplete(); } /** @@ -181,9 +165,8 @@ public void shouldQueryDataWithDeferredQueryDerivation() { @Test public void shouldQueryDataWithMixedDeferredQueryDerivation() { - Person heisenberg = repository.findByFirstnameAndLastname(Mono.just("Walter"), "White") // - .block(); - - assertThat(heisenberg).isNotNull(); + StepVerifier.create(repository.findByFirstnameAndLastname(Mono.just("Walter"), "White")) // + .expectNextCount(1) // + .verifyComplete(); } } diff --git a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java index 88884ed69..bc05b2e1b 100644 --- a/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java +++ b/mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java @@ -21,12 +21,12 @@ import io.reactivex.Single; import io.reactivex.disposables.Disposable; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; +import org.bson.Document; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,6 +36,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoOperations; import org.springframework.test.context.junit4.SpringRunner; +import com.mongodb.reactivestreams.client.MongoCollection; + /** * Integration test for {@link RxJava2PersonRepository} using RxJava2 types. Note that {@link ReactiveMongoOperations} * is only available using Project Reactor types as the native Template API implementation does not come in multiple @@ -55,28 +57,31 @@ public class RxJava2PersonRepositoryIntegrationTest { @Before public void setUp() { - operations.collectionExists(Person.class) // + Mono> recreateCollection = operations.collectionExists(Person.class) // .flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) // .then(operations.createCollection(Person.class, CollectionOptions.empty() // .size(1024 * 1024) // .maxDocuments(100) // - .capped())) // - .block(); + .capped())); + + StepVerifier.create(recreateCollection).expectNextCount(1).verifyComplete(); repository.saveAll(Flowable.just(new Person("Walter", "White", 50), // new Person("Skyler", "White", 45), // new Person("Saul", "Goodman", 42), // new Person("Jesse", "Pinkman", 27))) // - .blockingLast(); + .test() // + .awaitCount(4) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** - * This sample performs a count, inserts data and performs a count again using reactive operator chaining. + * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints + * the two counts ({@code 4} and {@code 6}) to the console. */ @Test - public void shouldInsertAndCountData() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); + public void shouldInsertAndCountData() { Flowable people = Flowable.just(new Person("Hank", "Schrader", 43), // new Person("Mike", "Ehrmantraut", 62)); @@ -89,28 +94,26 @@ public void shouldInsertAndCountData() throws Exception { .toSingle() // .flatMap(v -> repository.count()) // .doOnSuccess(System.out::println) // - .doAfterTerminate(countDownLatch::countDown) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); - - countDownLatch.await(); + .test() // + .awaitCount(1) // + .assertValue(6L) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** * Note that the all object conversions are performed before the results are printed to the console. */ @Test - public void shouldPerformConversionBeforeResultProcessing() throws Exception { - - CountDownLatch countDownLatch = new CountDownLatch(1); + public void shouldPerformConversionBeforeResultProcessing() { repository.findAll() // .doOnNext(System.out::println) // - .doOnComplete(countDownLatch::countDown) // - .doOnError(throwable -> countDownLatch.countDown()) // - .subscribe(); + .test() // + .awaitCount(4) // + .assertNoErrors() // + .awaitTerminalEvent(); - countDownLatch.await(); } /** @@ -130,15 +133,15 @@ public void shouldStreamDataWithTailableCursor() throws Exception { Thread.sleep(100); - repository.save(new Person("Tuco", "Salamanca", 33)).subscribe(); + repository.save(new Person("Tuco", "Salamanca", 33)).test().awaitTerminalEvent(); Thread.sleep(100); - repository.save(new Person("Mike", "Ehrmantraut", 62)).subscribe(); + repository.save(new Person("Mike", "Ehrmantraut", 62)).test().awaitTerminalEvent(); Thread.sleep(100); subscription.dispose(); - repository.save(new Person("Gus", "Fring", 53)).subscribe(); + repository.save(new Person("Gus", "Fring", 53)).test().awaitTerminalEvent(); Thread.sleep(100); assertThat(people).hasSize(6); @@ -150,11 +153,12 @@ public void shouldStreamDataWithTailableCursor() throws Exception { @Test public void shouldQueryDataWithQueryDerivation() { - List whites = repository.findByLastname("White") // - .toList() // - .blockingGet(); + repository.findByLastname("White") // + .test() // + .awaitCount(2) // + .assertNoErrors() // + .awaitTerminalEvent(); - assertThat(whites).hasSize(2); } /** @@ -163,10 +167,11 @@ public void shouldQueryDataWithQueryDerivation() { @Test public void shouldQueryDataWithStringQuery() { - Person heisenberg = repository.findByFirstnameAndLastname("Walter", "White") // - .blockingGet(); - - assertThat(heisenberg).isNotNull(); + repository.findByFirstnameAndLastname("Walter", "White") // + .test() // + .awaitCount(1) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -175,11 +180,11 @@ public void shouldQueryDataWithStringQuery() { @Test public void shouldQueryDataWithDeferredQueryDerivation() { - List whites = repository.findByLastname(Single.just("White")) // - .toList() // - .blockingGet(); - - assertThat(whites).hasSize(2); + repository.findByLastname(Single.just("White")) // + .test() // + .awaitCount(2) // + .assertNoErrors() // + .awaitTerminalEvent(); } /** @@ -188,9 +193,10 @@ public void shouldQueryDataWithDeferredQueryDerivation() { @Test public void shouldQueryDataWithMixedDeferredQueryDerivation() { - Person heisenberg = repository.findByFirstnameAndLastname(Single.just("Walter"), "White") // - .blockingGet(); - - assertThat(heisenberg).isNotNull(); + repository.findByFirstnameAndLastname(Single.just("Walter"), "White") // + .test() // + .awaitCount(1) // + .assertNoErrors() // + .awaitTerminalEvent(); } } diff --git a/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java b/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java index 512ebb23e..cf58cca23 100644 --- a/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java +++ b/redis/reactive/src/test/java/example/springdata/redis/commands/KeyCommandsTests.java @@ -19,6 +19,7 @@ import example.springdata.redis.test.util.RequiresRedisServer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.nio.ByteBuffer; import java.time.Duration; @@ -37,6 +38,7 @@ import org.springframework.data.redis.connection.ReactiveStringCommands.SetCommand; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.data.redis.util.ByteUtils; import org.springframework.test.context.junit4.SpringRunner; /** @@ -75,13 +77,14 @@ public void iterateOverKeysMatchingPrefixUsingKeysCommand() { generateRandomKeys(50); - this.connection.keyCommands() // + Mono keyCount = connection.keyCommands() // .keys(ByteBuffer.wrap(serializer.serialize(KEY_PATTERN))) // .flatMapMany(Flux::fromIterable) // .doOnNext(byteBuffer -> System.out.println(toString(byteBuffer))) // .count() // - .doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count))) // - .block(); + .doOnSuccess(count -> System.out.println(String.format("Total No. found: %s", count))); + + StepVerifier.create(keyCount).expectNext(50L).verifyComplete(); } /** @@ -90,19 +93,19 @@ public void iterateOverKeysMatchingPrefixUsingKeysCommand() { @Test public void storeToListAndPop() { - Mono popResult = this.connection.listCommands() + Mono popResult = connection.listCommands() .brPop(Collections.singletonList(ByteBuffer.wrap("list".getBytes())), Duration.ofSeconds(5)); - Mono llen = this.connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes())); + Mono llen = connection.listCommands().lLen(ByteBuffer.wrap("list".getBytes())); - this.connection.listCommands() // + Mono popAndLlen = connection.listCommands() // .rPush(ByteBuffer.wrap("list".getBytes()), Collections.singletonList(ByteBuffer.wrap("item".getBytes()))) .flatMap(l -> popResult) // .doOnNext(result -> System.out.println(toString(result.getValue()))) // .flatMap(result -> llen) // - .doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count))) // - .then() // - .block(); + .doOnNext(count -> System.out.println(String.format("Total items in list left: %s", count)));// + + StepVerifier.create(popAndLlen).expectNext(0L).verifyComplete(); } private void generateRandomKeys(int nrKeys) { @@ -113,17 +116,11 @@ private void generateRandomKeys(int nrKeys) { .map(key -> SetCommand.set(key) // .value(ByteBuffer.wrap(UUID.randomUUID().toString().getBytes()))); - this.connection.stringCommands() // - .set(generator) // - .then() // - .block(); + StepVerifier.create(connection.stringCommands().set(generator)).expectNextCount(nrKeys).verifyComplete(); } private static String toString(ByteBuffer byteBuffer) { - - byte[] bytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(bytes); - return new String(bytes); + return new String(ByteUtils.getBytes(byteBuffer)); } }