Skip to content

CompletableFuture

1. Introduction

CompletableFuture is the evolution of the Future API and differs from the latter in that it is implemented by a ForkJoinPool type ThreadPool that implements work-stealing.

2. API

2.1 RunAsync

It does not return a value and should be used for operations where we don’t need to obtain a result and that we want to be processed asynchronously in the ForkJoinPool (such as logging, batch operations, notifications, etc.).

Its API returns a CompletableFuture

Since it doesn’t return a result and gives the possibility of thenRunAsync, it’s not necessary for its executions to be performed by the same thread.

    for(int i=1; i <= 2; i++) {
        CompletableFuture.runAsync(() -> {
            System.out.println("Step 1 executor"+ Thread.currentThread().getName());
        }).thenRunAsync(()->{
            System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
        });
    }
So we see that in the nesting of tasks (Step1 ->(then)-> Step2) it doesn’t have to be the same thread that executes Step1 as Step2. This is where the ForkJoinPool comes into play. Output:

Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-4
If we had executed thenRun, the execution thread of the task chain would have been the same for each execution chain:
    for(int i=1; i <= 2; i++) {
        CompletableFuture.runAsync(() -> {
            System.out.println("Step 1 executor"+ Thread.currentThread().getName());
        }).thenRun(()->{
            System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
        });
    }
output
Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-1

2.2 SupplyAsync

It’s the same as runAsync, but this one returns a result:

        for(int i=1; i <= 2; i++) {
            CompletableFuture.supplyAsync(() -> {
                System.out.println("Step 3 executor"+ Thread.currentThread().getName());
                return "A";
            }).thenAcceptAsync(step1Result -> {
                System.out.println("Step 4 executor"+ Thread.currentThread().getName());
                System.out.println("welcome "+step1Result);
            });
        }
Since we used the asynchronous option thenAcceptAsync, any thread from the pool can continue the execution chain:

output:

Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-3
welcome A
welcome A
If we had used it without asynchrony, the thread that started each chain will be maintained:

        for(int i=1; i <= 2; i++) {
            CompletableFuture.supplyAsync(() -> {
                System.out.println("Step 3 executor"+ Thread.currentThread().getName());
                return "A";
            }).thenAccept(step1Result -> {
                System.out.println("Step 4 executor"+ Thread.currentThread().getName());
                System.out.println("welcome "+step1Result);
            });
        }

output

Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-1
welcome A
welcome A

3. Implementing an API

Spring Boot with embedded tomcat/netty is capable of detecting the response type of our endpoint and running the servlet in asynchronous mode.

It will have this behavior when we return a CompletableFuture<?>.

As we have seen, we need to return a CompletableFuture that returns something, so we cannot make use of runAsync since that’s not its purpose, and therefore we must return a supplyAsync:

import java.util.concurrent.CompletableFuture;
...

    @GetMapping("/op")
    public CompletableFuture<String> doExtensiveOperation() {
        return CompletableFuture.supplyAsync(()->{
            String result = myService.doTask();
            return result;
        });
    }

We will see in the execution that ForkJoinPool is used

4. Task Nesting

Reactive and asynchronous programming forces us to write our code split into different subtasks, so that the pool is capable of orchestrating and executing those mini-tasks in the Pool.

That’s why the CompletableFuture API offers us subscription methods, exception control, etc…

Let’s see some: CountDownLatch

public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2); //number of task

        for(int i=1; i <= 2; i++) {
            CompletableFuture.runAsync(() -> {
                System.out.println("Step 1 executor"+ Thread.currentThread().getName());
            }).thenRun(()->{
                System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
                countDownLatch.countDown();
            });
        }

        countDownLatch.await(); //await for all (2), await for zero count
    }

WIP -> write about thenApply and all those things

5. Considerations

Despite using work-stealing, when an execution has blocking behavior, the thread that is doing that “mini-task” cannot be reused.

An example would be this:

var task1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.currentThread().sleep(4000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "OK";
        });
The thread that is processing the lambda that has the sleep will be idle but not reusable by the ForkJoinPool.

https://www.baeldung.com/java-completablefuture-runasync-supplyasync