⬛ 스레드를 직접 사용할 때의 문제점
개발자A는 하나하나 스레드를 만들어서 일을 처리하고 있다. 스레드를 만들고, 요청을 처리하고, 또 다른 스레드를 만들고...
하지만 문제가 발생한다. 스레드를 만들 때마다 CPU가 점점 느려지고, 메모리 사용량이 크게 늘어나기 때문이다. 스레드 하나당 많은 메모리를 차지하고, 관리도 어렵다.
또한 A가 만들 수 있는 스레드 수는 한정되어있다. 따라서 최대 스레드 수까지만 스레드를 생성할 수 있게 하고싶다.
⬛ 스레드 풀
🟢 스레드 풀
- 컬렉션에 스레드를 보관하고 재사용
- 처리 작업이 없으면 대기(WAITING),작업 요청이 오면 RUNNABLE
하지만 스레드 풀의 관리가 복잡해지고, 작업의 효율을 더 높여야 할 필요가 생겼다.
⬛ Executor 프레임워크 소개
Executor 프레임워크는 멀티스레드 최고의 도구로, 스레드 풀, 작업 제출, 관리까지 모든 것을 깔끔하게 처리해주는 프레임워크이다.
더 이상 복잡하게 스레드를 관리할 필요가 없다.
Executor는 작업을 실행하는 가장 기본적인 역할을 한다.
ExecutorService는 Executor보다 더 많은 기능을 제공하는데, 작업을 제출하고, 상태를 관리하고, 완료되면 결과도 받아들일 수 있다!
비유하자면, Executor는 단순히 작업을 실행하는 데 그치지만, ExecutorService는 작업의 상태와 결과까지도 신경 쓰는 훨씬 강력한 도우미이다.
이제 실제로 작업을 처리하는 주인공(Executorservice의 구현체), ThreadPoolExecutor가 등장한다. 이는 스레드를 풀에서 꺼내어 작업을 효율적으로 처리하는 역할을 한다.
🟢 ThreadPoolExecutor(ExecutorService) 두가지 요소
- 스레드 풀: 스레드를 관리한다.
- BlockingQueue: 작업을 보관한다. 생산자 소비자 문제 해결을 위해 BlockingQueue 사용.
🔶 Executor 인터페이스
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
🔶 ExecutorService 인터페이스
- 해당 인터페이스의 대표 구현체로는 ThreadPoolExecutor 를 쓴다.
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
...
}
로그 출력 유틸리티를 만들어보자. 아래와 같이 Executor 프레임워크 상태를 확인할 수 있다.
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize(); // 스레드 풀에서 관리되는 스레드 수
int active = poolExecutor.getActiveCount(); // 작업을 수행하는 스레드 수
int queuedTasks = poolExecutor.getQueue().size(); // 큐에 대기중인 작업 수
long completedTask = poolExecutor.getCompletedTaskCount(); // 완료된 작업 수
log("[pool=" + pool + ", active=" + active + ", queuedTasks=" +
queuedTasks + ", completedTasks=" + completedTask + "]");
} else {
log(executorService);
}
}
}
스레드 풀에 스레드를 미리 만들어두지 않다가, 최초의 작업이 들어오면 이때 작업을 처리하기 위해 스레드를 만든다.
작업이 완료되면 스레드 풀에 스레드를 반납한다. 스레드를 반납하면 스레드는 대기(WAITING) 상태로 스레드 풀에 대기한다.
(참고로 실제 반납되는게 아니라, 스레드 상태가 변경되는것이다.)
그 후 요청이 오면 스레드 풀에서 스레드를 꺼내 재사용한다.
그리고 close()를 호출하면 ThreadPoolExecutor가 종료되고, 스레드 풀에 대기하는 스레드도 함께 제거한다.
⬛ Runnable의 불편함
public interface Runnable {
void run();
}
Runnable은 체크 예외(checked exception)을 던질 수 없다.
또한 반환값이 없어 join()을 사용해 스레드를 종료대기한 뒤 멤버변수를 찾아 가져와야한다.
반환 값을 간단히 return으로 반환하고 요청스레드가 이를 바로 받을 수 있다면 훨 간단해질 수 있을 것 같다.
⬛ Future와 Callable 의 등장
이를 해결하기 위해 Future와 Callable이 등장한다.
- Future: "걱정 마! 난 작업의 미래 결과를 약속할 수 있어! 작업이 끝나면 바로 결과를 줄게!"
- Callable: "난 반환값도 있고, 예외 처리도 가능하니까 더 쉽게 작업할 수 있어!"
Future와 Callable이 등장하면서, 스레드의 결과를 쉽게 받을 수 있고, 작업이 완료되기 전까지 기다릴 필요 없이 다른 일도 처리할 수 있게 되었다. 결과가 필요할 때 Future.get()을 호출하면 바로 받아볼 수 있다.
🔶 Callable 인터페이스
Runnable과 달리 Callable은 값을 반환할 수 있고 예외를 던질 수 있다.
public interface Callable<V> {
V call() throws Exception;
}
이제 아래 Runnable 예제를 Callable로 바꿔보자.
public class RunnableMain {
public static void main(String[] args) throws InterruptedException{
MyRunnable task = new MyRunnable();
Thread thread = new Thread(task, "Thread-1");
thread.start();
thread.join();
int result = task.value;
log("result value = " + result);
}
static class MyRunnable implements Runnable {
int value;
@Override
public void run() {
log("Runnable 시작");
sleep(2000);
value = new Random().nextInt(10);
log("create value = " + value);
log("Runnable 완료");
}
}
}
🔶 Callable 예제
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1); // 스레드 풀 생성
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt();
log("create value = " + value);
log("Callable 완료");
return value;
}
}
}
🟢 실행흐름
여기서 중요한점은, Future는 즉시 반환된다는 점이다.
future.get()을 호출했을때 완료상태가 아니면 요청스레드는 결과를 얻기 위해 대기(Blocking)한다.
블로킹 상태라 지정된 작업이 완료될때까지 다른 작업을 수행할 수 없다.
작업을 완료하면 Future에 결과를 담고, Future상태를 완료로 변경한 후 요청스레드를 깨운다.
요청 스레드를 깨우면, 요청스레드는 WAITING → RUNNABLE 상태로 변한다.
Future를 반환할땐 아래와 같이 결과를 받는다.
Future<Integer> future = es.submit(new MyCallable()); // 여기는 블로킹 아님
future.get(); // 여기서 블로킹
아래와 같이 결과를 바로 반환하도록 설계하면 다음 결과를 반환할때까지 기다려야한다.
Integer result = es.submit(new MyCallable()); // 여기서 블로킹
하지만 Future는 바로 결과를 반환하고 결과가 필요한 나중에 get()으로 결과를 꺼낼수 잇다.
즉, 먼저 작업을 요청해두고, 결과를 기다리지 않고 바로 다음 작업을 시작할 수 있다.
⬛ Future Interface
public interface Future<V> {
// 완료되지 않은 작업을 취소
boolean cancel(boolean mayInterruptIfRunning);
// 작업이 취소되었는지 확인
boolean isCancelled();
// 작업이 완료되었는지 확인
boolean isDone();
// 작업완료 대기(블로킹), 완료되면 결과 반환
V get() throws InterruptedException, ExecutionException;
// get과 동일, 시간 초과시 예외 발생
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
enum State {
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
default State state() {}
}
⬛ ExecutorService - 작업 컬렉션 처리
invokeAll() : 모든 Callable 작업 제출 후, 완료까지 기다림
invokeAny(): 하나의 Callable 작업 완료 기다리고, 가장 먼저 완료된 작업 결과 반환. 나머지 작업은 취소
'Java' 카테고리의 다른 글
[Java] 김영한의 실전 자바 - 고급 2편 섹션2 I/O 기본1,2 (0) | 2024.10.28 |
---|---|
[Java] 김영한의 실전 자바 - 고급 1편 섹션12 스레드 풀과 Executor 프레임워크2 (1) | 2024.09.30 |
[Java] 김영한의 실전 자바 - 고급 1편 섹션9 생산자 소비자 문제2 (1) | 2024.09.26 |
[Java] 김영한의 실전 자바 - 고급 1편 섹션11 동시성 컬렉션 (0) | 2024.09.23 |
[Java] 김영한의 실전 자바 - 고급 1편 섹션8 생산자 소비자 문제1 (0) | 2024.09.20 |