CompletableFuture
1. Introduction
CompletableFuture is the evolution of the Future API and differs from the latter in that it is implemented by a ForkJoinPool type ThreadPool that implements work-stealing.
2. API
2.1 RunAsync
It does not return a value and should be used for operations where we don’t need to obtain a result and that we want to be processed asynchronously in the ForkJoinPool (such as logging, batch operations, notifications, etc.).
Its API returns a CompletableFuture
Since it doesn’t return a result and gives the possibility of thenRunAsync, it’s not necessary for its executions to be performed by the same thread.
for(int i=1; i <= 2; i++) {
CompletableFuture.runAsync(() -> {
System.out.println("Step 1 executor"+ Thread.currentThread().getName());
}).thenRunAsync(()->{
System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
});
}
Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-4
for(int i=1; i <= 2; i++) {
CompletableFuture.runAsync(() -> {
System.out.println("Step 1 executor"+ Thread.currentThread().getName());
}).thenRun(()->{
System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
});
}
Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-1
2.2 SupplyAsync
It’s the same as runAsync, but this one returns a result:
for(int i=1; i <= 2; i++) {
CompletableFuture.supplyAsync(() -> {
System.out.println("Step 3 executor"+ Thread.currentThread().getName());
return "A";
}).thenAcceptAsync(step1Result -> {
System.out.println("Step 4 executor"+ Thread.currentThread().getName());
System.out.println("welcome "+step1Result);
});
}
output:
Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-3
welcome A
welcome A
for(int i=1; i <= 2; i++) {
CompletableFuture.supplyAsync(() -> {
System.out.println("Step 3 executor"+ Thread.currentThread().getName());
return "A";
}).thenAccept(step1Result -> {
System.out.println("Step 4 executor"+ Thread.currentThread().getName());
System.out.println("welcome "+step1Result);
});
}
output
Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-1
welcome A
welcome A
3. Implementing an API
Spring Boot with embedded tomcat/netty is capable of detecting the response type of our endpoint and running the servlet in asynchronous mode.
It will have this behavior when we return a CompletableFuture<?>.
As we have seen, we need to return a CompletableFuture that returns something, so we cannot make use of runAsync since that’s not its purpose, and therefore we must return a supplyAsync:
import java.util.concurrent.CompletableFuture;
...
@GetMapping("/op")
public CompletableFuture<String> doExtensiveOperation() {
return CompletableFuture.supplyAsync(()->{
String result = myService.doTask();
return result;
});
}
We will see in the execution that ForkJoinPool is used
4. Task Nesting
Reactive and asynchronous programming forces us to write our code split into different subtasks, so that the pool is capable of orchestrating and executing those mini-tasks in the Pool.
That’s why the CompletableFuture API offers us subscription methods, exception control, etc…
Let’s see some: CountDownLatch
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2); //number of task
for(int i=1; i <= 2; i++) {
CompletableFuture.runAsync(() -> {
System.out.println("Step 1 executor"+ Thread.currentThread().getName());
}).thenRun(()->{
System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
countDownLatch.countDown();
});
}
countDownLatch.await(); //await for all (2), await for zero count
}
WIP -> write about thenApply and all those things
5. Considerations
Despite using work-stealing, when an execution has blocking behavior, the thread that is doing that “mini-task” cannot be reused.
An example would be this:
var task1 = CompletableFuture.supplyAsync(()->{
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "OK";
});
https://www.baeldung.com/java-completablefuture-runasync-supplyasync