Concurrent 소프트웨어
- 동시에 여러 작업을 할 수 있는 소프트웨어
- ex) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다
- ex) 녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다
Java에서 지원하는 Concurrent 프로그래밍
- 멀티프로세싱
- 멀티쓰레드
Java 멀디쓰레드 프로그래밍
- Thread
public class App {
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
}
static class MyThread extends Thread {
public void run() {
System.out.println("Hello : " + Thread.currentThread().getName());
}
}
}
- Runnable 구현 또는 람다
public class App {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
thread.start();
}
}
쓰레드 주요 기능
- 현재 쓰레드 멈춰두기 (sleep): 다른 쓰레드가 처리할 수 있도록 기회를 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있다)
- 다른 쓰레드 깨우기 (interupt): 다른 쓰레드를 깨워서 interruptedExeption을 발생 시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료 시킬 수도 있고 계속 하던 일 할 수도 있고.
- 다른 쓰레드 기다리기 (join): 다른 쓰레드가 끝날 때까지 기다린다.
Executores
고수준(High-Level) Concurrency 프로그래밍
- 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리 -> Executors에 위임
Executors가 하는일
- 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어 관리한다.
- 쓰레드 관리: 쓰레드 생명 주기를 관리한다.
- 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.
주요 인터페이스
- Executor: execute(Runnable)
- ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
- ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.
ExecutorService로 작업 실행하기
public class App2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> { // 프로세스가 시작되고 종료되지 않기때문에 명시적으로 종료해줘야함
System.out.println("Thread : " + Thread.currentThread().getName());
});
}
ExecutorsService로 멈추기
executorService.shutdown(); // 처리중인 작업 기다렸다가 종료
executorService.shutdownNow() // 당장 종료
다수의 쓰레드로 작업하기
public class App2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2); // 두개의 쓰레드로 생성
executorService.submit(getRunnable("this"));
executorService.submit(getRunnable("is"));
executorService.submit(getRunnable("the"));
executorService.submit(getRunnable("java"));
executorService.submit(getRunnable("class"));
executorService.shutdown(); // 명시적 종료
}
private static Runnable getRunnable(String message) {
return () -> System.out.println(message + " : " + Thread.currentThread().getName());
}
}
Callable과 Future
Callable
- Runnable과 유사하지만 작업의 결과를 받을 수 있다.
Future
- 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.
결과 가져오기 : get()
- 블로킹 콜이다.
- 타임아웃을 설정할 수 있다.
public class App3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println("Started!");
System.out.println(helloFuture.isDone());
helloFuture.get(); // 블로킹
// helloFuture.cancel(false);
System.out.println(helloFuture.isDone());
System.out.println("End!!");
executorService.shutdown();
}
}
3초뒤 true와 End가 출력된다.
작업 상태 확인하기 isDone()
- 완료 했으면 true 아니면 false를 리턴한다
작업 취소하기 cancel()
- 취소 했으면 true 못했으면 false를 리턴한다.
- parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다
여러 작업 동시에 실행하기 invokeAll()
- 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다
public static void main(String[] args) throws InterruptedException, ExecutionException {
// ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(3);
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(2000L);
return "Java";
};
Callable<String> kang = () -> {
Thread.sleep(1000L);
return "Kang";
};
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, kang)); // 등록된 Callable이 모두 끝날때까지 기다린다. -> 6초뒤 다음이 실행됨
for (Future<String> f : futures) {
System.out.println(f.get());
}
executorService.shutdown();
}
- newFixedThreadPool(3); 으로 쓰레드를 3개 돌리면 3개의 쓰레드가 각각 작업을 하기때문에 위 코드에서는 약 3초가 걸린다.
- newSingleThreadExecutor(); 로 싱글 스레드를 돌리면 모든 작업을 혼자서 진행하기 때문에 각 시간을 모두 합한 시간이 흘러야 다음 작업이 진행된다.
여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()
- 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
- 블록킹 콜이다.
public static void main(String[] args) throws InterruptedException, ExecutionException {
// ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(3);
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(2000L);
return "Java";
};
Callable<String> kang = () -> {
Thread.sleep(1000L);
return "Kang";
};
String s = executorService.invokeAny(Arrays.asList(hello, java, kang)); // 제일 빨리 끝나는 친구를 리턴한다
System.out.println(s);
executorService.shutdown();
}
CompletableFuture
자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.
- Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다
Future로는 하기 어렵던 작업들
- Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
- 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("kang");
System.out.println(future.get());
- 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
- 예외 처리용 API를 제공하지 않는다
CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.completedFuture("kang");
System.out.println(future1.get());
비동기로 작업 실행하기
- 리턴값이 없는 경우: runAsync()
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
});
voidFuture.get();
- 리턴값이 있는 경우: supplyAsync()
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
System.out.println(future2.get());
- 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool()
콜백 제공하기
- thenApply(Function): 리턴값을 받아서 다른 값으로 바꾸는 콜백
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(future2.get());
- thenAccept(Consumer): 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
CompletableFuture<Void> voidFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenAccept((s) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
voidFuture1.get();
- thenRun(Runnable): 리턴값 받지 다른 작업을 처리하는 콜백
CompletableFuture<Void> voidFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenRun(() -> {
System.out.println(Thread.currentThread().getName());
});
voidFuture2.get();
- 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> voidFuture3 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}, executorService).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName());
}, executorService);
voidFuture3.get();
조합하기
- thenCompose(): 두 작업이 서로 이어서 실행하도록 조합
public class App5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future = hello.thenCompose(App5::getWorld);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + " World";
});
}
}
- thenCombine(): 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> future1 = hello.thenCombine(world, (h, w) -> h + " " + w);
System.out.println(future1.get());
- allOf(): 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
CompletableFuture<Void> voidFuture = CompletableFuture.allOf(hello, world) // hello와 world의 리턴 타입이 동일하단 보장도 없고 에러가 있을수도 있음
.thenAccept(System.out::println);
System.out.println(voidFuture.get());
- anyOf(): 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
예외처리
- exeptionally(Function)
boolean throwError = true;
CompletableFuture<String> hello1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
if (throwError) {
throw new IllegalStateException();
}
return "Hello";
}).exceptionally(ex -> {
System.out.println(ex);
return "Error!";
});
System.out.println(hello1.get());
- handle(BiFunction):
CompletableFuture<String> hello2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
if (throwError) {
throw new IllegalStateException();
}
return "Hello";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println(ex);
return "Error!";
}
return result;
});
System.out.println(hello2.get());
'java' 카테고리의 다른 글
Design Patterns - Singleton (0) | 2021.11.11 |
---|---|
함수형 인터페이스를 이용한 Builder (0) | 2021.05.29 |
더 자바(java 8) - Stream, Optional, Date (0) | 2021.05.19 |
더 자바(java 8) - 인터페이스의 변화 (0) | 2021.05.11 |
더 자바(java 8) - 함수형 인터페이스와 람다 (0) | 2021.05.10 |
댓글