본문 바로가기
java

더 자바(java 8) - CompletableFuture

by 쭈꾸마뇽 2021. 5. 26.

Concurrent 소프트웨어

  • 동시에 여러 작업을 할 수 있는 소프트웨어
  • ex) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다
  • ex) 녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다

Java에서 지원하는 Concurrent 프로그래밍

  • 멀티프로세싱
  • 멀티쓰레드

Java 멀디쓰레드 프로그래밍

  • Thread
public class App {
    public static void main(String[] args) throws InterruptedException {
        MyThread myThread = new MyThread();
        myThread.start();
    }

    static class MyThread extends Thread {
        public void run() {
            System.out.println("Hello : " + Thread.currentThread().getName());
        }
    }
}
  • Runnable 구현 또는 람다
public class App {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        });
        thread.start();
    }
}

쓰레드 주요 기능

  • 현재 쓰레드 멈춰두기 (sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있다)
  • 다른 쓰레드 깨우기 (interupt): 다른 쓰레드를 깨워서 interruptedExeption을 발생 시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료 시킬 수도 있고 계속 하던 일 할 수도 있고.
  • 다른 쓰레드 기다리기 (join): 다른 쓰레드가 끝날 때까지 기다린다.

Executores

고수준(High-Level) Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리 -> Executors에 위임

Executors가 하는일

  • 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어 관리한다.
  • 쓰레드 관리: 쓰레드 생명 주기를 관리한다.
  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

주요 인터페이스

  • Executor: execute(Runnable)
  • ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

ExecutorService로 작업 실행하기

public class App2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> { // 프로세스가 시작되고 종료되지 않기때문에 명시적으로 종료해줘야함
            System.out.println("Thread : " + Thread.currentThread().getName());
        });
}

쓰레드가 실행된 뒤 멈추지 않음

ExecutorsService로 멈추기

executorService.shutdown();  // 처리중인 작업 기다렸다가 종료
executorService.shutdownNow()  // 당장 종료

다수의 쓰레드로 작업하기

public class App2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);  // 두개의 쓰레드로 생성
        executorService.submit(getRunnable("this"));
        executorService.submit(getRunnable("is"));
        executorService.submit(getRunnable("the"));
        executorService.submit(getRunnable("java"));
        executorService.submit(getRunnable("class"));

        executorService.shutdown();  // 명시적 종료
    }

    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message + " : " + Thread.currentThread().getName());
    }
}

쓰레드의 종류가 2개인것을 확인할 수 있다


Callable과 Future

Callable

  • Runnable과 유사하지만 작업의 결과를 받을 수 있다.

Future

  • 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

결과 가져오기 : get()

  • 블로킹 콜이다.
  • 타임아웃을 설정할 수 있다.
public class App3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(3000L);
            return "Hello";
        };

        Future<String> helloFuture = executorService.submit(hello);
        System.out.println("Started!");
        System.out.println(helloFuture.isDone());

        helloFuture.get();  // 블로킹
//        helloFuture.cancel(false);

        System.out.println(helloFuture.isDone());
        System.out.println("End!!");

        executorService.shutdown();
    }
}

3초뒤 true와 End가 출력된다.

작업 상태 확인하기 isDone()

  • 완료 했으면 true 아니면 false를 리턴한다

작업 취소하기 cancel()

  • 취소 했으면 true 못했으면 false를 리턴한다.
  • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다

여러 작업 동시에 실행하기 invokeAll()

  • 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다
public static void main(String[] args) throws InterruptedException, ExecutionException {
//        ExecutorService executorService = Executors.newSingleThreadExecutor();
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        Callable<String> hello = () -> {
            Thread.sleep(3000L);
            return "Hello";
        };

        Callable<String> java = () -> {
            Thread.sleep(2000L);
            return "Java";
        };

        Callable<String> kang = () -> {
            Thread.sleep(1000L);
            return "Kang";
        };

        List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, kang)); // 등록된 Callable이 모두 끝날때까지 기다린다. -> 6초뒤 다음이 실행됨
        for (Future<String> f : futures) {
            System.out.println(f.get());
        }

        executorService.shutdown();
    }

모든 작업이 끝날때까지 기다린 후 아래 동작이 실행된다

  • newFixedThreadPool(3); 으로 쓰레드를 3개 돌리면 3개의 쓰레드가 각각 작업을 하기때문에 위 코드에서는 약 3초가 걸린다.
  • newSingleThreadExecutor(); 로 싱글 스레드를 돌리면 모든 작업을 혼자서 진행하기 때문에 각 시간을 모두 합한 시간이 흘러야 다음 작업이 진행된다.

여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()

  • 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
  • 블록킹 콜이다.
public static void main(String[] args) throws InterruptedException, ExecutionException {
//        ExecutorService executorService = Executors.newSingleThreadExecutor();
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        Callable<String> hello = () -> {
            Thread.sleep(3000L);
            return "Hello";
        };

        Callable<String> java = () -> {
            Thread.sleep(2000L);
            return "Java";
        };

        Callable<String> kang = () -> {
            Thread.sleep(1000L);
            return "Kang";
        };

        String s = executorService.invokeAny(Arrays.asList(hello, java, kang));  // 제일 빨리 끝나는 친구를 리턴한다
        System.out.println(s);

        executorService.shutdown();
    }

제일 먼저 끝나는 작업의 결과를 가져온다


CompletableFuture

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.

  • Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다

Future로는 하기 어렵던 작업들

  • Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
  • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("kang");
System.out.println(future.get());
  • 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
  • 예외 처리용 API를 제공하지 않는다

CompletableFuture

CompletableFuture<String> future1 = CompletableFuture.completedFuture("kang");
System.out.println(future1.get());

비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
	System.out.println("Hello " + Thread.currentThread().getName());
});
voidFuture.get();
  • 리턴값이 있는 경우: supplyAsync()
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
});
System.out.println(future2.get());
  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool()

콜백 제공하기

  • thenApply(Function): 리턴값을 받아서 다른 값으로 바꾸는 콜백
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenApply((s) -> {
    System.out.println(Thread.currentThread().getName());
    return s.toUpperCase();
});
System.out.println(future2.get());
  • thenAccept(Consumer): 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
CompletableFuture<Void> voidFuture1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenAccept((s) -> {
    System.out.println(Thread.currentThread().getName());
    System.out.println(s.toUpperCase());
});
voidFuture1.get();
  • thenRun(Runnable): 리턴값 받지 다른 작업을 처리하는 콜백
CompletableFuture<Void> voidFuture2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenRun(() -> {
    System.out.println(Thread.currentThread().getName());
});
voidFuture2.get();
  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
ExecutorService executorService = Executors.newFixedThreadPool(4);

CompletableFuture<Void> voidFuture3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}, executorService).thenRunAsync(() -> {
    System.out.println(Thread.currentThread().getName());
}, executorService);
voidFuture3.get();

조합하기

  • thenCompose(): 두 작업이 서로 이어서 실행하도록 조합
public class App5 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = hello.thenCompose(App5::getWorld);
        System.out.println(future.get());
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }

}
  • thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    System.out.println("World " + Thread.currentThread().getName());
    return "World";
});

CompletableFuture<String> future1 = hello.thenCombine(world, (h, w) -> h + " " + w);
System.out.println(future1.get());
  • allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
CompletableFuture<Void> voidFuture = CompletableFuture.allOf(hello, world) // hello와 world의 리턴 타입이 동일하단 보장도 없고 에러가 있을수도 있음
		.thenAccept(System.out::println);
System.out.println(voidFuture.get());
  • anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

 

예외처리

  • exeptionally(Function)
boolean throwError = true;

CompletableFuture<String> hello1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    if (throwError) {
        throw new IllegalStateException();
    }
    return "Hello";
}).exceptionally(ex -> {
    System.out.println(ex);
    return "Error!";
});
System.out.println(hello1.get());
  • handle(BiFunction):
CompletableFuture<String> hello2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    if (throwError) {
        throw new IllegalStateException();
    }
    return "Hello";
}).handle((result, ex) -> {
    if (ex != null) {
        System.out.println(ex);
        return "Error!";
}
    return result;
});
System.out.println(hello2.get());

댓글