온라인 학습/더 자바, Java 8 강의

6. CompletableFuture - Concurrent 프로그래밍, Executors, Callable, Future, CompletableFuture

60cod 2022. 11. 6. 20:55

<1> 자바 Concurrent 프로그래밍

 

 

 

 

  • 자바 Concurrent 프로그래밍이란?
    • 동시에 여러 애플리케이션을 쓰거나 쓸 수 있게 만들거나 한 애플리케이션 안에서도 동시에 여러 일들이 진행될 때.
    • = 동시성 프로그래밍, 동시 프로그래밍, 병행 프로그래밍
    • cf. CVS(동시 버전 시스템)이 Concurrent Version System

 

 

 

 

  • Concurrent 소프트웨어
    • 동시에 여러 작업을 할 수 있는 소프트웨어
    • 예) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다.
    • 예) 녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다.

 

 

 

 

  • 자바에서 지원하는 컨커런트 프로그래밍
    • 멀티프로세싱 (ProcessBuilder) : 한 프로세스에서 다른 프로세스를 만드는 게 가능
    • 멀티쓰레드 (이번 강좌에서 중점적으로 다룰 것)

 

 

 

 

  • Java  프로세스의 기본 쓰레드는 main 쓰레드이다.

 

 

 

 

  • 자바 멀티쓰레드 프로그래밍
    • 하나의 쓰레드에서 다른 쓰레드를 만들 수 있는데, 크게 두 가지 방법이 존재한다.
    • 프로세스와 쓰레드의 차이 알고 가기 https://60cod.tistory.com/352

 

 

1. Thread 상속 (불편한 방법)

- 순서 상 Thread : Thread-0가 Hello보다 먼저 출력되어야 할 것 같지만,

쓰레드의 자원 할당은 OS에 의해 결정되기 때문에 어떤 쓰레드가 먼저 실행될 것인지 순서를 보장할 수 없다.

 

 

 

2. Runnable 구현 또는 람다

- 쓰레드를 쓰긴 하나, 쓰레드의 생성자에 Runnable을 준다.

- Runnable은 void이기 때문에 리턴값이 없다. (섹션1 참고)

// 자바8 (람다식 사용)
Thread thread = new Thread(() -> {
	System.out.println("Thread : " + Thread.currentThread().getName());
});
thread.start();
 
// 자바8 이전 방식
Thread thread = new Thread(new Runnable() {
	@Override
	public void run() {
		System.out.println("Thread : " + Thread.currentThread().getName());
	}
});
thread.start();

// 출력
// Thread: Thread-0

 

 

 

 

  • 쓰레드 주요 기능 3가지
  • sleep : 현재 쓰레드 멈춰두기
    - 주어진 시간 동안 일시 정지 상태가 되고 다시 실행 대기 상태로 돌아간다.
    - 다른 쓰레드가 처리할 수 있도록 우선권을 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있으므로)
    - try~catch문으로 묶는 이유는, 일시 정지 상태에서 주어진 시간이 되기 전에 interrupt() 메소드가 호출되면 InterruptedException예외가 발생하기 때문이다. (main 메서드 안에서는 try~catch문 없이도 동작한다.)
Thread thread = new Thread(() -> {
	try {
		Thread.sleep(1000); // 다른 쓰레드 먼저 처리함
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
 
	System.out.println("Thread: " + Thread.currentThread().getName());
});
thread.start();
 
System.out.println("Hello: " + Thread.currentThread().getName());
 
// 출력
// Hello: main
// Thread: Thread-0

위와 같은 상황에서 거의 무조건 다른 쓰레드인 main 쓰레드가 먼저 처리한다.

 

 

 

  • interrupt : 다른 쓰레드 깨우기
    - 스레드가 일시 정지 상태일 때 InterruptedExeption 예외를 발생 시킨다.
    - 실행 대기 또는 실행 상태에 있을 때는 발생하지 않기 때문에 일시 정지 상태가 아니라면 interrupt() 메소드 호출은 의미가 없다. 그래서 try 문 안에서 일단 재운 것.
    - InterruptedException : 자는 동안 누군가가 이 쓰레드를 깨우면 그 안의 내용을 실행한다.
    - interrupt 자체는 종료 기능이 아니고 깨우는 기능이지만, 에러가 발생했을 때 return 시켜서 다른 쓰레드를 종료시키는 식으로 사용할 수 있다. 
Thread thread2 = new Thread(() -> {
	while(true) {  // 무한루프
		System.out.println("Thread 반복: " + Thread.currentThread().getName());
		
		try {
			Thread.sleep(1000);  // 재우기
		} catch (InterruptedException e) {
			System.out.println("exit!");
			return;  // 이거 안 하면 무한루프 계속됨
		}
	}
}); 
thread2.start();
 
System.out.println("Hello: " + Thread.currentThread().getName());  // main 쓰레드
Thread.sleep(3000);  // 3초 후 종료
thread2.interrupt();  //인터럽트 발생(종료)

// 출력:
// Hello: main
// Thread 반복: Thread-0
// Thread 반복: Thread-0
// Thread 반복: Thread-0
// exit!

1초에 한 번씩 루프 돌면서 쓰레드를 찍는다.

그러다가 interrupt 했을 때, Runnable은 리턴값이 없으므로 return 하면 쓰레드가 종료된다.

그 에러가 발생했을 때 할 일은 코딩하기 나름이다. 지금처럼 종료 시킬 수도 있고 아니면 계속 하던 일 할 수도 있고..

위 출력 결과는 main 쓰레드가 먼저 실행되었지만 먼저 말했듯이 순서는 보장되지 않는다.

 

 

 

  •  join : 다른 쓰레드 기다리기 
    - 다른 쓰레드가 끝날 때까지 기다린다.
Thread thread3 = new Thread(() -> {
	System.out.println("Thread: " + Thread.currentThread().getName());
	try {
		Thread.sleep(3000);
	} catch (InterruptedException e) {
		throw new IllegalStateException(e);
	}
});
thread3.start();

System.out.println("Hello: " + Thread.currentThread().getName());
thread3.join();  // main 쓰레드가 thread3이 끝날 때까지 기다린다.(3초)
System.out.println(thread3 + " is finished");
 
// Hello: main
// Thread: Thread-0
// (3초 뒤)
// Thread[Thread-0,5,] is finished

 

 

join도 sleep과 마찬가지로 interrupt로 깨울 수 있다. 

복잡해지기 때문에 멀티 쓰레드 프로그래밍은 이게 문제다..

그래서 Executors가 등장!

 

 

 


<2> Executors

 

 

 

  • 고수준 (High-Level) Concurrency 프로그래밍
    • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리하고
    • 그런 기능을 Executors에게 위임.

 

 

 

  • Executors가 하는 일
    • 쓰레드 만들기 : 애플리케이션이 사용할 Thread Pool을 만들어 관리한다. → 스레드 생성 비용 줄인다.
    • 쓰레드 관리 : 쓰레드 생명 주기를 관리한다.
    • 작업 처리 및 실행 : 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.


예)

2개의 스레드로 5개의 작업 실행하기

 

출력:

메인 애플리케이션에서 작업 5개를 ExecutorService에 보내면, 안에 Thread Pool이 있고 그 안에 스레드가 2개 있다. 스레드가 바빠서 처리를 못하는 건 Blocking Queue에 쌓아두고 스레드가 작업을 마치면 큐에서 대기중인 작업을 받아서 처리한다.

스레드 풀을 사용하면 스레드를 생성하는 비용이 덜 든다는 장점이 있다.

 

 

 

  • 주요 인터페이스
    • Executor : execute(Runnable) <- 기능이 Runnable 실행 하나 뿐
    • ExecutorService : Executor 상속 받은 인터페이스로, Runnable과 Callable 실행할 수 있으며, Executor를 종료시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
    • ScheduledExecutorService : ExecutorService를 상속 받은 인터페이스로, schedule 메소드를 제공해서 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

 

 

 

  • ExecutorService로 작업 실행하기
ExecutorService executorService = Executors.newSingleThreadExecutors();  // 스레드를 하나만 쓰는 executor
executorService.submit(() -> {
	System.out.println("Thread " + Thread.currentThread().getName());
});
// 출력:
// Thread pool-1-thread-1

이때 주의할 점!

ExecutorService는 작업을 실행하고나서 다음 작업이 들어올 때까지 계속 대기하고 있으므로 명시적으로 종료를 시켜줘야 한다. 

 

 

 

  • ExecutorService로 작업 멈추기
    • shutdown() : 현재 진행중인 작업 마치고 종료 (graceful shutdown)
    • shutdownNow() : 당장 종료
executorService.shutdown();  
executorService.shutdownNow();

 

 

 

 

private static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.schedule(getRunnable("Hello"), 5, TimeUnit.SECONDS);
    
    executorService.shutdown();
}

private static Runnable getRunnable(String message) {
    return () -> System.out.println(message + Thread.currentThread().getName());
}

출력:

(5초 후)

Hellopool-1-thread-1

 

 

반복해서 실행시키고 싶을 때는 schedule 대신 scheduleAtFixedRate 쓰고 몇 초마다 반복할 건지 써준다.

// 반복 실행
private static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    executorService.scheduleAtFixedRate(getRunnable("Hello"), 1, 2, TimeUnit.SECONDS);  // 1초 기다렸다가 2초에 한 번씩
    // 셧다운 없이 해야 함
}

private static Runnable getRunnable(String message) {
    return () -> System.out.println(message + Thread.currentThread().getName());
}

출력:

(1초 후) (2초마다 계속)

Hellopool-1-thread-1

Hellopool-1-thread-1

...

 

 

 

  • Fork/Join 프레임워크
    • ExecutorService의 구현체로 손쉽게 멀티 프로세서를 활용할 수 있게끔 도와준다. 
    • 맨 마지막에 들어온 게 먼저 나가는 Dequeue 사용한다. → 스레드가 할 일이 없으면 스레드가 직접 디큐에서 할 일을 가져와서 처리한다.
    • 작업 단위를 쪼갠 서브 task를 다른 스레드에 분산시켜서 작업을 처리하고, 모아서(join) 결과값을 도출해낸다.

 


<3> Callable과 Future

 

 

 

  • Callable
    • Runnable과 유사하지만, void인 Runnable과 달리 작업의 결과를 받을 수 있다.
    • return 가능

 

 

 

  • 결과를 가져오기 get
    • 블록킹 콜이다.
    • 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.

get 이전까지는 안 기다리고 코드가 실행된다.

그러다가 get을 만난 순간 멈추고, 결과값을 가져올 때까지 2초 기다린다.

그래서 collable을 썼다고 애플리케이션이 다 빨라지는 게 아니다. 

 

 

 

  • 작업 상태 확인하기 isDone()
    • 완료 했으면 true, 아니면 false를 리턴한다.

 

 

 

  • 작업 취소하기 cancel()
    • 취소 했으면 true, 못했으면 false를 리턴한다.
    • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.

cancel 했는데 get으로 작업 가져오라고 하니까 에러남.

 

 

 

  • 여러 작업 동시에 실행하기 invokeAll()
    • 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.

 

 

 

  • 여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()
    • 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
    • 블록킹 콜이다.

 

출력:

keesun

 

싱글스레드로 했더니 Hello가 먼저 나왔다.

스레드를 4개로 늘려주고 해야 제일 짧게 걸리는 keesun이 먼저 나온다.

 

 

 

  • Future
    • 비동기적인 작업현재 상태를 조회하거나 결과를 가져올 수 있다.

 

 


<4> CompletableFuture

 

 

 

  • 자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.
    • Future를 사용해서도 어느정도 가능했지만 하기 힘든 일들이 많았다.

 

 

 

  • Future로는 하기 어렵던 작업들
    • Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
    • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    • 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
    • 예외 처리용 API를 제공하지 않는다.

 

 

 

  • CompletableFuture
    • Implements Future
    • Implements CompletionStage
public static void main(String[] args) throws ExecutionException, InterruptedException {
	CompletableFuture<String> future = new CompletableFuture<>();
	future.complete("tomato");  
	// future의 기본값을 "tomato"라고 정해주고 future의 작업 자체를 끝낸 것.

	System.out.println(future.get());  // 출력: tomato
}
// static factory method 쓸 때
public static void main(String[] args) throws ExecutionException, InterruptedException {
	CompletableFuture<String> future = CompletableFuture.completedFuture("tomato");
	System.out.println(future.get());  // 똑같이 출력
}

 

 

 

  • 비동기로 작업 실행하기
    • 리턴값이 없는 경우 : runAsync()
    • 리턴값이 있는 경우 : supplyAsync()
    • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool())
// runAsync (리턴값 없음)
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
	System.out.println("Hello " + Thread.currentThread().getName());
});  // 여기까지는 정의만 한 거라 아무 일도 안 일어남. get을 해줘야 함.
future.get();
// 출력: Hello ForkJoinPool.commonPool-worker-3
// supplyAsync (리턴값 있음)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
});  
System.out.println(future.get());  // get 호출 안 하면 아무 일 안 일어남
// 출력: 
// Hello ForkJoinPool.commonPool-worker-3
// Hello

 

ForkJoinPool 말고 원하는 쓰레드풀 사용

 

 

 

  • 콜백 제공하기
    • thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백
    • thenAccept(Consumer) : 리턴값을 또 다른 작업을 처리하는 콜백 (리턴없이)
    • thenRun(Runnable) : 리턴값 받지 않고 다른 작업을 처리하는 콜백
    • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
// thenApply
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenApply((s) -> {
	System.out.println(Thread.currentThread().getName());
	return s.toUpperCase();
});  // 이전에는 콜백을 이렇게 get 호출 전에 정의하는 게 불가능했다.
System.out.println(future.get());
// 출력:
// Hello ForkJoinPool.commonPool-worker-3
// ForkJoinPool.commonPool-worker-3
// HELLO
// thenAccept
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenAccept((s) -> {
	System.out.println(Thread.currentThread().getName());
	System.out.println(s.toUpperCase());
}); 
future.get();  // 출력 결과 같음
// thenRun
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).thenRun(() -> {
	System.out.println(Thread.currentThread().getName());
}); 
future.get();  
// 출력:
// Hello ForkJoinPool.commonPool-worker-3
// ForkJoinPool.commonPool-worker-3

 

 

 

  • 조합하기
    • thenCompose() : 두 작업이 서로 이어서 실행하도록 조 합
    • thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
    • allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
    • anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

예)

Hello 다음에 World가 나오게 하고 싶을 때 thenCompose

출력:

World 리턴하는 걸 메소드로 만들어버렸고,

Hello 리턴하는 거랑 World 리턴하는 거 둘을 연결한 하나의 future를 get 해서 출력했다.

 

예)

thenCombine

출력 결과는 같음

 

 

 

  • 예외처리
    • exeptionally(Function)
    • handle(BiFunction) 

 

에러 있을 때

출력:

 

에러 없을 때

출력:

 

 

 

 

 

 

* 백기선 님의 인프런 강의 <더 자바, Java 8>을 듣고 정리한 내용입니다.

강의 정보: https://www.inflearn.com/course/the-java-java8/

 

참고 :

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html

https://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html

https://docs.oracle.com/javase/tutorial/essential/concurrency/
https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt--