Skip to content

Commit

Permalink
chore(EndpointDataReferenceReceiverRegistry): make the transfer proce…
Browse files Browse the repository at this point in the history
…ss fail if no receivers are registered (eclipse-edc#2330)
  • Loading branch information
majadlymhmd authored Dec 14, 2022
1 parent 52df192 commit 0eb4b03
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiver;
import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiverRegistry;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.jetbrains.annotations.NotNull;
Expand All @@ -40,14 +41,18 @@ public void registerReceiver(@NotNull EndpointDataReferenceReceiver receiver) {

@Override
public @NotNull CompletableFuture<Result<Void>> receiveAll(@NotNull EndpointDataReference edr) {
return receivers.stream()
.map(receiver -> receiver.send(edr))
.collect(asyncAllOf())
.thenApply(results -> results.stream()
.filter(Result::failed)
.findFirst()
.map(failed -> Result.<Void>failure(failed.getFailureMessages()))
.orElse(Result.success()))
.exceptionally(throwable -> Result.failure("Failed to receive endpoint data reference: " + throwable.getMessage()));
if (receivers.isEmpty()) {
return CompletableFuture.failedFuture(new EdcException("There are no registered receivers."));
} else {
return receivers.stream()
.map(receiver -> receiver.send(edr))
.collect(asyncAllOf())
.thenApply(results -> results.stream()
.filter(Result::failed)
.findFirst()
.map(failed -> Result.<Void>failure(failed.getFailureMessages()))
.orElse(Result.success()))
.exceptionally(throwable -> Result.failure("Failed to receive endpoint data reference: " + throwable.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiver;
import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiverRegistry;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.result.Result;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -42,12 +44,14 @@ public void setUp() {
receiver1 = mock(EndpointDataReferenceReceiver.class);
receiver2 = mock(EndpointDataReferenceReceiver.class);
registry = new EndpointDataReferenceReceiverRegistryImpl();
registry.registerReceiver(receiver1);
registry.registerReceiver(receiver2);

}

@Test
void receiveAll_success() {
registry.registerReceiver(receiver1);
registry.registerReceiver(receiver2);

var edr = EndpointDataReferenceFixtures.createEndpointDataReference();

when(receiver1.send(any())).thenReturn(CompletableFuture.completedFuture(Result.success()));
Expand All @@ -63,6 +67,9 @@ void receiveAll_success() {

@Test
void receiveAll_failsBecauseReceiverReturnsFailedResult_shouldReturnFailedResult() {
registry.registerReceiver(receiver1);
registry.registerReceiver(receiver2);

var edr = EndpointDataReferenceFixtures.createEndpointDataReference();
var errorMsg = "Test error message";

Expand All @@ -82,6 +89,9 @@ void receiveAll_failsBecauseReceiverReturnsFailedResult_shouldReturnFailedResult

@Test
void receiveAll_failsBecauseReceiverThrows_shouldReturnFailedResult() {
registry.registerReceiver(receiver1);
registry.registerReceiver(receiver2);

var edr = EndpointDataReferenceFixtures.createEndpointDataReference();
var errorMsg = "Test error message";

Expand All @@ -98,4 +108,14 @@ void receiveAll_failsBecauseReceiverThrows_shouldReturnFailedResult() {
assertThat(res.getFailureMessages().stream().anyMatch(s -> s.contains(errorMsg))).isTrue();
});
}

@Test
void receiveAll_throwsExceptionIfNoReceiversRegistered() {
var edr = EndpointDataReferenceFixtures.createEndpointDataReference();
var future = registry.receiveAll(edr);
assertThat(future).failsWithin(1, TimeUnit.SECONDS)
.withThrowableOfType(ExecutionException.class)
.withRootCauseInstanceOf(EdcException.class)
.withMessageContaining("no registered receivers.");
}
}

0 comments on commit 0eb4b03

Please sign in to comment.