Skip to content

Commit

Permalink
Merge pull request #14340 from etrandafir93/features/BAEL-6579-comple…
Browse files Browse the repository at this point in the history
…table_future_thread_pool

BAEL-6579: completable future's thread pool
  • Loading branch information
davidmartinezbarua committed Jul 5, 2023
2 parents 39ae838 + 54eb5de commit de37f35
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.baeldung.concurrent.completablefuture.threadpool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class CustomCompletableFuture<T> extends CompletableFuture<T> {
private static final Executor executor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "Custom-Single-Thread"));

public static <TYPE> CustomCompletableFuture<TYPE> supplyAsync(Supplier<TYPE> supplier) {
CustomCompletableFuture<TYPE> future = new CustomCompletableFuture<>();
executor.execute(() -> {
try {
future.complete(supplier.get());
} catch (Exception ex) {
future.completeExceptionally(ex);
}
});
return future;
}

@Override
public Executor defaultExecutor() {
return executor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.baeldung.concurrent.completablefuture.threadpool;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Test;

public class CompletableFutureThreadPoolUnitTest {

@Test
void whenUsingNonAsync_thenUsesMainThread() throws ExecutionException, InterruptedException {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

CompletableFuture<Integer> nameLength = name.thenApply(value -> {
printCurrentThread();
return value.length();
});

assertThat(nameLength.get()).isEqualTo(8);
}

@Test
void whenUsingNonAsync_thenUsesCallersThread() throws InterruptedException {
Runnable test = () -> {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

CompletableFuture<Integer> nameLength = name.thenApply(value -> {
printCurrentThread();
return value.length();
});

try {
assertThat(nameLength.get()).isEqualTo(8);
} catch (Exception e) {
fail(e.getMessage());
}
};

new Thread(test, "test-thread").start();
Thread.sleep(100l);
}

@Test
void whenUsingAsync_thenUsesCommonPool() throws ExecutionException, InterruptedException {
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread();
return value.length();
});

assertThat(nameLength.get()).isEqualTo(8);
}

@Test
void whenUsingAsync_thenUsesCustomExecutor() throws ExecutionException, InterruptedException {
Executor testExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread();
return value.length();
}, testExecutor);

assertThat(nameLength.get()).isEqualTo(8);
}

@Test
void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() throws ExecutionException, InterruptedException {
CompletableFuture<String> name = CustomCompletableFuture.supplyAsync(() -> "Baeldung");

CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
printCurrentThread();
return value.length();
});

assertThat(nameLength.get()).isEqualTo(8);
}

private static void printCurrentThread() {
System.out.println(Thread.currentThread().getName());
}
}

0 comments on commit de37f35

Please sign in to comment.