Saltar a contenido

CompletableFuture

1. Introduccion

CompletableFuture es la evolucion del API Future y se diferencia de este ultimo en que es implementado por ThreadPool de tipo ForkJoinPool que implementan work-stolen.

2. API

2.1 RunAsync

No retorna un valor y debe ser utilizado para operaciones que no requerimos obtener resultado y que queremos que se procesen en modo asincrono en el ForkJoinPool (Como por ejemplo login, operaciones batch, notificaciones, etc.. ).

Su api retorna un CompletableFuture

Al no retornar resultado y dar la posibilidad de thenRunAsync, no es necesario que sus ejecuciones la realice el mismo hilo.

    for(int i=1; i <= 2; i++) {
        CompletableFuture.runAsync(() -> {
            System.out.println("Step 1 executor"+ Thread.currentThread().getName());
        }).thenRunAsync(()->{
            System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
        });
    }
Asi vemos que en la anidacion de tareas (Step1 ->(then)-> Step2) no tiene porque ser el mismo hilo el que ejecute el Step1 que el Step2. Aqui entra en juego el ForkJoinPool. Output:

Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-4
Si hubieramos ejecutado thenRun si que el hilo de ejecucion de la cadena de tareas habria sido el mismo para cada cadena de ejecuciones:
    for(int i=1; i <= 2; i++) {
        CompletableFuture.runAsync(() -> {
            System.out.println("Step 1 executor"+ Thread.currentThread().getName());
        }).thenRun(()->{
            System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
        });
    }
output
Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-1

2.2 SupplyAsync

Es igual que runAsycn, pero este retorna un resultado:

        for(int i=1; i <= 2; i++) {
            CompletableFuture.supplyAsync(() -> {
                System.out.println("Step 3 executor"+ Thread.currentThread().getName());
                return "A";
            }).thenAcceptAsync(step1Result -> {
                System.out.println("Step 4 executor"+ Thread.currentThread().getName());
                System.out.println("bienvenido "+step1Result);
            });
        }
Como hemos usado la opcion asincrona thenAcceptAsync cualquier hilo del pool puede continuar la cadena de ejecucion:

output:

Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-3
bienvenido A
bienvenido A
Si hubieramos usado sin asincronia, el hilo que empezo cada cadena se mantendrá:

        for(int i=1; i <= 2; i++) {
            CompletableFuture.supplyAsync(() -> {
                System.out.println("Step 3 executor"+ Thread.currentThread().getName());
                return "A";
            }).thenAccept(step1Result -> {
                System.out.println("Step 4 executor"+ Thread.currentThread().getName());
                System.out.println("bienvenido "+step1Result);
            });
        }

output

Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-1
bienvenido A
bienvenido A

3. Implementando un API

SpringBoot con tomcat/netty embebido es capaz de detectar el tipo de respuesta de nuestro endpoint y correr el servlet en modo asincrono.

Este comportamiento lo tendra cuando devolvamos un CompletableFuture<?>.

Como hemos visto, necesitamos devolver un CompletableFuturo que retorne algo, por lo que no podemos hacer uso de runAsync ya que noes su proposito, y por lo tanto deberemos retornar un suppyAsync:

import java.util.concurrent.CompletableFuture;
...

    @GetMapping("/op")
    public CompletableFuture<String> doExtensiveOperation() {
        return CompletableFuture.supplyAsync(()->{
            String result = myService.doTask();
            return result;
        });
    }

Veremos en la ejecucion que se hace uso de ForkJoinPool

4. Anidacion de tareas

La programacion reactiva y asincrona nos obliga a escribir nuestro codigo troceado en distintas subtareas, de manera que el pool sea capaz de orquestar y ejecutar esas minitareas en el Pool.

Por eso el API de completable future nos ofrece metodos de subscription, control de excepciones, etc…

Veamos algunos: CountDownLatch

public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2); //number of task

        for(int i=1; i <= 2; i++) {
            CompletableFuture.runAsync(() -> {
                System.out.println("Step 1 executor"+ Thread.currentThread().getName());
            }).thenRun(()->{
                System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
                countDownLatch.countDown();
            });
        }

        countDownLatch.await(); //await for all (2), await for zero count
    }

WIP -> escribir sobre el thenapply y todas esas cosas

5. Consideraciones

A pesar de hacer uso de work-stolen cuando una ejecucion tiene un comportamiento bloqueante, el hilo que esta haciendo esa “minitarea” no podra ser reutilizado.

Un ejemplo seria este:

var task1 = CompletableFuture.supplyAsync(()->{
            try {
                Thread.currentThread().sleep(4000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "OK";
        });
El hilo que esta procesando la lambda que tiene el sleep estara ocioso pero no reutilizable por el forkjoinPool.

https://www.baeldung.com/java-completablefuture-runasync-supplyasync