CompletableFuture
1. Introduccion
CompletableFuture es la evolucion del API Future y se diferencia de este ultimo en que es implementado por ThreadPool de tipo ForkJoinPool que implementan work-stolen.
2. API
2.1 RunAsync
No retorna un valor y debe ser utilizado para operaciones que no requerimos obtener resultado y que queremos que se procesen en modo asincrono en el ForkJoinPool (Como por ejemplo login, operaciones batch, notificaciones, etc.. ).
Su api retorna un CompletableFuture
Al no retornar resultado y dar la posibilidad de thenRunAsync, no es necesario que sus ejecuciones la realice el mismo hilo.
for(int i=1; i <= 2; i++) {
CompletableFuture.runAsync(() -> {
System.out.println("Step 1 executor"+ Thread.currentThread().getName());
}).thenRunAsync(()->{
System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
});
}
Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-4
for(int i=1; i <= 2; i++) {
CompletableFuture.runAsync(() -> {
System.out.println("Step 1 executor"+ Thread.currentThread().getName());
}).thenRun(()->{
System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
});
}
Step 1 executorForkJoinPool.commonPool-worker-1
Step 1 executorForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-2
Step 2 executor -> ForkJoinPool.commonPool-worker-1
2.2 SupplyAsync
Es igual que runAsycn, pero este retorna un resultado:
for(int i=1; i <= 2; i++) {
CompletableFuture.supplyAsync(() -> {
System.out.println("Step 3 executor"+ Thread.currentThread().getName());
return "A";
}).thenAcceptAsync(step1Result -> {
System.out.println("Step 4 executor"+ Thread.currentThread().getName());
System.out.println("bienvenido "+step1Result);
});
}
output:
Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-3
bienvenido A
bienvenido A
for(int i=1; i <= 2; i++) {
CompletableFuture.supplyAsync(() -> {
System.out.println("Step 3 executor"+ Thread.currentThread().getName());
return "A";
}).thenAccept(step1Result -> {
System.out.println("Step 4 executor"+ Thread.currentThread().getName());
System.out.println("bienvenido "+step1Result);
});
}
output
Step 3 executorForkJoinPool.commonPool-worker-2
Step 3 executorForkJoinPool.commonPool-worker-1
Step 4 executorForkJoinPool.commonPool-worker-2
Step 4 executorForkJoinPool.commonPool-worker-1
bienvenido A
bienvenido A
3. Implementando un API
SpringBoot con tomcat/netty embebido es capaz de detectar el tipo de respuesta de nuestro endpoint y correr el servlet en modo asincrono.
Este comportamiento lo tendra cuando devolvamos un CompletableFuture<?>.
Como hemos visto, necesitamos devolver un CompletableFuturo que retorne algo, por lo que no podemos hacer uso de runAsync ya que noes su proposito, y por lo tanto deberemos retornar un suppyAsync:
import java.util.concurrent.CompletableFuture;
...
@GetMapping("/op")
public CompletableFuture<String> doExtensiveOperation() {
return CompletableFuture.supplyAsync(()->{
String result = myService.doTask();
return result;
});
}
Veremos en la ejecucion que se hace uso de ForkJoinPool
4. Anidacion de tareas
La programacion reactiva y asincrona nos obliga a escribir nuestro codigo troceado en distintas subtareas, de manera que el pool sea capaz de orquestar y ejecutar esas minitareas en el Pool.
Por eso el API de completable future nos ofrece metodos de subscription, control de excepciones, etc…
Veamos algunos: CountDownLatch
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2); //number of task
for(int i=1; i <= 2; i++) {
CompletableFuture.runAsync(() -> {
System.out.println("Step 1 executor"+ Thread.currentThread().getName());
}).thenRun(()->{
System.out.println("Step 2 executor -> "+ Thread.currentThread().getName());
countDownLatch.countDown();
});
}
countDownLatch.await(); //await for all (2), await for zero count
}
WIP -> escribir sobre el thenapply y todas esas cosas
5. Consideraciones
A pesar de hacer uso de work-stolen cuando una ejecucion tiene un comportamiento bloqueante, el hilo que esta haciendo esa “minitarea” no podra ser reutilizado.
Un ejemplo seria este:
var task1 = CompletableFuture.supplyAsync(()->{
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "OK";
});
https://www.baeldung.com/java-completablefuture-runasync-supplyasync