6. CompletableFuture - Concurrent 프로그래밍, Executors, Callable, Future, CompletableFuture
<1> 자바 Concurrent 프로그래밍
- 자바 Concurrent 프로그래밍이란?
- 동시에 여러 애플리케이션을 쓰거나 쓸 수 있게 만들거나 한 애플리케이션 안에서도 동시에 여러 일들이 진행될 때.
- = 동시성 프로그래밍, 동시 프로그래밍, 병행 프로그래밍
- cf. CVS(동시 버전 시스템)이 Concurrent Version System
- Concurrent 소프트웨어
- 동시에 여러 작업을 할 수 있는 소프트웨어
- 예) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다.
- 예) 녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다.
- 자바에서 지원하는 컨커런트 프로그래밍
- 멀티프로세싱 (ProcessBuilder) : 한 프로세스에서 다른 프로세스를 만드는 게 가능
- 멀티쓰레드 (이번 강좌에서 중점적으로 다룰 것)
- Java 프로세스의 기본 쓰레드는 main 쓰레드이다.
- 자바 멀티쓰레드 프로그래밍
- 하나의 쓰레드에서 다른 쓰레드를 만들 수 있는데, 크게 두 가지 방법이 존재한다.
- 프로세스와 쓰레드의 차이 알고 가기 https://60cod.tistory.com/352
1. Thread 상속 (불편한 방법)
- 순서 상 Thread : Thread-0가 Hello보다 먼저 출력되어야 할 것 같지만,
쓰레드의 자원 할당은 OS에 의해 결정되기 때문에 어떤 쓰레드가 먼저 실행될 것인지 순서를 보장할 수 없다.
2. Runnable 구현 또는 람다
- 쓰레드를 쓰긴 하나, 쓰레드의 생성자에 Runnable을 준다.
- Runnable은 void이기 때문에 리턴값이 없다. (섹션1 참고)
// 자바8 (람다식 사용)
Thread thread = new Thread(() -> {
System.out.println("Thread : " + Thread.currentThread().getName());
});
thread.start();
// 자바8 이전 방식
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Thread : " + Thread.currentThread().getName());
}
});
thread.start();
// 출력
// Thread: Thread-0
- 쓰레드 주요 기능 3가지
- sleep : 현재 쓰레드 멈춰두기
- 주어진 시간 동안 일시 정지 상태가 되고 다시 실행 대기 상태로 돌아간다.
- 다른 쓰레드가 처리할 수 있도록 우선권을 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있으므로)
- try~catch문으로 묶는 이유는, 일시 정지 상태에서 주어진 시간이 되기 전에 interrupt() 메소드가 호출되면 InterruptedException예외가 발생하기 때문이다. (main 메서드 안에서는 try~catch문 없이도 동작한다.)
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000); // 다른 쓰레드 먼저 처리함
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread: " + Thread.currentThread().getName());
});
thread.start();
System.out.println("Hello: " + Thread.currentThread().getName());
// 출력
// Hello: main
// Thread: Thread-0
위와 같은 상황에서 거의 무조건 다른 쓰레드인 main 쓰레드가 먼저 처리한다.
- interrupt : 다른 쓰레드 깨우기
- 스레드가 일시 정지 상태일 때 InterruptedExeption 예외를 발생 시킨다.
- 실행 대기 또는 실행 상태에 있을 때는 발생하지 않기 때문에 일시 정지 상태가 아니라면 interrupt() 메소드 호출은 의미가 없다. 그래서 try 문 안에서 일단 재운 것.
- InterruptedException : 자는 동안 누군가가 이 쓰레드를 깨우면 그 안의 내용을 실행한다.
- interrupt 자체는 종료 기능이 아니고 깨우는 기능이지만, 에러가 발생했을 때 return 시켜서 다른 쓰레드를 종료시키는 식으로 사용할 수 있다.
Thread thread2 = new Thread(() -> {
while(true) { // 무한루프
System.out.println("Thread 반복: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 재우기
} catch (InterruptedException e) {
System.out.println("exit!");
return; // 이거 안 하면 무한루프 계속됨
}
}
});
thread2.start();
System.out.println("Hello: " + Thread.currentThread().getName()); // main 쓰레드
Thread.sleep(3000); // 3초 후 종료
thread2.interrupt(); //인터럽트 발생(종료)
// 출력:
// Hello: main
// Thread 반복: Thread-0
// Thread 반복: Thread-0
// Thread 반복: Thread-0
// exit!
1초에 한 번씩 루프 돌면서 쓰레드를 찍는다.
그러다가 interrupt 했을 때, Runnable은 리턴값이 없으므로 return 하면 쓰레드가 종료된다.
그 에러가 발생했을 때 할 일은 코딩하기 나름이다. 지금처럼 종료 시킬 수도 있고 아니면 계속 하던 일 할 수도 있고..
위 출력 결과는 main 쓰레드가 먼저 실행되었지만 먼저 말했듯이 순서는 보장되지 않는다.
- join : 다른 쓰레드 기다리기
- 다른 쓰레드가 끝날 때까지 기다린다.
Thread thread3 = new Thread(() -> {
System.out.println("Thread: " + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
thread3.start();
System.out.println("Hello: " + Thread.currentThread().getName());
thread3.join(); // main 쓰레드가 thread3이 끝날 때까지 기다린다.(3초)
System.out.println(thread3 + " is finished");
// Hello: main
// Thread: Thread-0
// (3초 뒤)
// Thread[Thread-0,5,] is finished
join도 sleep과 마찬가지로 interrupt로 깨울 수 있다.
복잡해지기 때문에 멀티 쓰레드 프로그래밍은 이게 문제다..
그래서 Executors가 등장!
<2> Executors
- 고수준 (High-Level) Concurrency 프로그래밍
- 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리하고
- 그런 기능을 Executors에게 위임.
- Executors가 하는 일
- 쓰레드 만들기 : 애플리케이션이 사용할 Thread Pool을 만들어 관리한다. → 스레드 생성 비용 줄인다.
- 쓰레드 관리 : 쓰레드 생명 주기를 관리한다.
- 작업 처리 및 실행 : 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.
예)
2개의 스레드로 5개의 작업 실행하기
출력:
메인 애플리케이션에서 작업 5개를 ExecutorService에 보내면, 안에 Thread Pool이 있고 그 안에 스레드가 2개 있다. 스레드가 바빠서 처리를 못하는 건 Blocking Queue에 쌓아두고 스레드가 작업을 마치면 큐에서 대기중인 작업을 받아서 처리한다.
스레드 풀을 사용하면 스레드를 생성하는 비용이 덜 든다는 장점이 있다.
- 주요 인터페이스
- Executor : execute(Runnable) <- 기능이 Runnable 실행 하나 뿐
- ExecutorService : Executor 상속 받은 인터페이스로, Runnable과 Callable 실행할 수 있으며, Executor를 종료시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
- ScheduledExecutorService : ExecutorService를 상속 받은 인터페이스로, schedule 메소드를 제공해서 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.
- ExecutorService로 작업 실행하기
ExecutorService executorService = Executors.newSingleThreadExecutors(); // 스레드를 하나만 쓰는 executor
executorService.submit(() -> {
System.out.println("Thread " + Thread.currentThread().getName());
});
// 출력:
// Thread pool-1-thread-1
이때 주의할 점!
ExecutorService는 작업을 실행하고나서 다음 작업이 들어올 때까지 계속 대기하고 있으므로 명시적으로 종료를 시켜줘야 한다.
- ExecutorService로 작업 멈추기
- shutdown() : 현재 진행중인 작업 마치고 종료 (graceful shutdown)
- shutdownNow() : 당장 종료
executorService.shutdown();
executorService.shutdownNow();
private static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.schedule(getRunnable("Hello"), 5, TimeUnit.SECONDS);
executorService.shutdown();
}
private static Runnable getRunnable(String message) {
return () -> System.out.println(message + Thread.currentThread().getName());
}
출력:
(5초 후)
Hellopool-1-thread-1
반복해서 실행시키고 싶을 때는 schedule 대신 scheduleAtFixedRate 쓰고 몇 초마다 반복할 건지 써준다.
// 반복 실행
private static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("Hello"), 1, 2, TimeUnit.SECONDS); // 1초 기다렸다가 2초에 한 번씩
// 셧다운 없이 해야 함
}
private static Runnable getRunnable(String message) {
return () -> System.out.println(message + Thread.currentThread().getName());
}
출력:
(1초 후) (2초마다 계속)
Hellopool-1-thread-1
Hellopool-1-thread-1
...
- Fork/Join 프레임워크
- ExecutorService의 구현체로 손쉽게 멀티 프로세서를 활용할 수 있게끔 도와준다.
- 맨 마지막에 들어온 게 먼저 나가는 Dequeue 사용한다. → 스레드가 할 일이 없으면 스레드가 직접 디큐에서 할 일을 가져와서 처리한다.
- 작업 단위를 쪼갠 서브 task를 다른 스레드에 분산시켜서 작업을 처리하고, 모아서(join) 결과값을 도출해낸다.
<3> Callable과 Future
- Callable
- Runnable과 유사하지만, void인 Runnable과 달리 작업의 결과를 받을 수 있다.
- return 가능
- 결과를 가져오기 get
- 블록킹 콜이다.
- 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.
get 이전까지는 안 기다리고 코드가 실행된다.
그러다가 get을 만난 순간 멈추고, 결과값을 가져올 때까지 2초 기다린다.
그래서 collable을 썼다고 애플리케이션이 다 빨라지는 게 아니다.
- 작업 상태 확인하기 isDone()
- 완료 했으면 true, 아니면 false를 리턴한다.
- 작업 취소하기 cancel()
- 취소 했으면 true, 못했으면 false를 리턴한다.
- parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.
cancel 했는데 get으로 작업 가져오라고 하니까 에러남.
- 여러 작업 동시에 실행하기 invokeAll()
- 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.
- 여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()
- 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
- 블록킹 콜이다.
출력:
keesun
싱글스레드로 했더니 Hello가 먼저 나왔다.
스레드를 4개로 늘려주고 해야 제일 짧게 걸리는 keesun이 먼저 나온다.
- Future
- 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.
<4> CompletableFuture
- 자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.
- Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.
- Future로는 하기 어렵던 작업들
- Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
- 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
- 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
- 예외 처리용 API를 제공하지 않는다.
- CompletableFuture
- Implements Future
- Implements CompletionStage
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("tomato");
// future의 기본값을 "tomato"라고 정해주고 future의 작업 자체를 끝낸 것.
System.out.println(future.get()); // 출력: tomato
}
// static factory method 쓸 때
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.completedFuture("tomato");
System.out.println(future.get()); // 똑같이 출력
}
- 비동기로 작업 실행하기
- 리턴값이 없는 경우 : runAsync()
- 리턴값이 있는 경우 : supplyAsync()
- 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool())
// runAsync (리턴값 없음)
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
}); // 여기까지는 정의만 한 거라 아무 일도 안 일어남. get을 해줘야 함.
future.get();
// 출력: Hello ForkJoinPool.commonPool-worker-3
// supplyAsync (리턴값 있음)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
System.out.println(future.get()); // get 호출 안 하면 아무 일 안 일어남
// 출력:
// Hello ForkJoinPool.commonPool-worker-3
// Hello
- 콜백 제공하기
- thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백
- thenAccept(Consumer) : 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
- thenRun(Runnable) : 리턴값 받지 않고 다른 작업을 처리하는 콜백
- 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
// thenApply
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
}); // 이전에는 콜백을 이렇게 get 호출 전에 정의하는 게 불가능했다.
System.out.println(future.get());
// 출력:
// Hello ForkJoinPool.commonPool-worker-3
// ForkJoinPool.commonPool-worker-3
// HELLO
// thenAccept
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenAccept((s) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
future.get(); // 출력 결과 같음
// thenRun
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}).thenRun(() -> {
System.out.println(Thread.currentThread().getName());
});
future.get();
// 출력:
// Hello ForkJoinPool.commonPool-worker-3
// ForkJoinPool.commonPool-worker-3
- 조합하기
- thenCompose() : 두 작업이 서로 이어서 실행하도록 조 합
- thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
- allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
- anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
예)
Hello 다음에 World가 나오게 하고 싶을 때 thenCompose
출력:
World 리턴하는 걸 메소드로 만들어버렸고,
Hello 리턴하는 거랑 World 리턴하는 거 둘을 연결한 하나의 future를 get 해서 출력했다.
예)
thenCombine
출력 결과는 같음
- 예외처리
- exeptionally(Function)
- handle(BiFunction)
출력:
출력:
* 백기선 님의 인프런 강의 <더 자바, Java 8>을 듣고 정리한 내용입니다.
강의 정보: https://www.inflearn.com/course/the-java-java8/
참고 :
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html
https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html
https://docs.oracle.com/javase/tutorial/essential/concurrency/
https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt--