[Java] 김영한의 실전 자바 - 고급 1편 섹션9 생산자 소비자 문제2
⬛ Lock Condition
ReentrantLock과 Condition을 사용하여 생산자와 소비자가 서로 독립적인 대기 집합을 사용하도록 분리한다. 이는 효율적인 대기를 가능하게 하여 불필요한 스레드 깨어남을 방지한다.
ReentrantLock 은 내부에 락과, 락 획득을 대기하는 스레드를 관리하는 대기 큐가 있다.
condition.await(): Object.wait()와 유사하며, 현재 스레드를 대기 상태로 보관하면서 ReentrantLock에서 획득한 락을 반납한다.
condition.signal(): Object.notify()와 유사하며, 대기 중인 스레드 하나를 깨워 대기 상태에서 빠져나오게 한다.
기존 코드를 ReentrantLock과 Condition을 사용하도록 바꾸면 아래와 같다.
🔶 예제코드
public class BoundedQueueV4 implements BoundedQueue{
private final Lock lock = new ReentrantLock();
// ReentrantLock의 스레드 대기 공간 생성
private final Condition condition = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV4(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
condition.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출"); condition.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
condition.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출"); condition.signal();
return data;
} finally {
lock.unlock();
}
}
}
⬛ 생상자 소비자 대기공간 분리
대기공간을 분리해보자. 핵심은 생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다는 점이다.
public class BoundedQueueV5 implements BoundedQueue{
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await(); // 생산자 스레드를 생산자 전용 스레드 대기에 보관
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
consumerCond.signal(); // 소비자 전용 스레드 대기 공간에 신호
} finally {
lock.unlock();
}
}
@Override
public String take() {
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCond.await(); // 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
producerCond.signal(); // 생산자 전용 스레드 대기 공간에 신호
return data;
} finally {
lock.unlock();
}
}
}
단순한 예시 그림을 보자. 아래와같은 상황에서 출발한다고해보자.(위 코드와는 관련이 없다.)
🔶 생산자 실행 상황
🔶 소비자 실행 상황
🟢 Object.notify() vs Condition.signal() 차이
Object.notify()는 누구나 깨어날 수 있는 반면, Condition.signal()은 먼저 대기한 스레드가 깨어날 가능성이 더 높은 방식이다.
Object.notify():
- 무작위로 깨어남: 대기 중인 스레드 중 아무나 하나를 깨운다.누가 깨어날지는 정해져 있지 않으며, JVM에 따라 다르다.
- 순서 없음: 들어온 순서와 관계없이 깨어날 수 있다.
- synchronized 블록 내에서 사용: notify()는 synchronized로 보호된 코드 안에서만 호출 가능하다.
Condition.signal():
- 순서에 따라 깨어남: 대기 중인 스레드 중 하나를 깨우되, 보통은 FIFO(First In, First Out) 방식으로 깨어난다. 즉, 먼저 기다린 스레드가 먼저 깨어난다.
- 더 예측 가능: Condition은 보통 Queue 구조를 사용하므로, 스레드가 깨어나는 순서를 좀 더 예측할 수 있다.
- ReentrantLock과 함께 사용: signal()은 ReentrantLock과 함께 사용된다.
⬛ 스레드 대기
스레드 대기 규칙은 아래와 같다.
- 일할 수 없을 때는 대기실에서 차례를 기다린다. (락 획득대기, BLOCKED)
- 누군가 일을 끝내면, 신호를 받아야만 스레드 대기실에서 나가서 일할 수 있다. (wait() 대기, WAITING)
사실, wait() 대기 집합도 존재한다.락 대기 집합은 자바 내부에 구현되어 있기 때문에 모니터 락과 같이 개발자가 확인하기는 어렵다.
개념상,락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소이다.
스레드는 2중 감옥을 모두 탈 출해야 임계 영역을 수행할 수 있다.
⬛ 생산자 소비자 문제 디벨롭 과정
1. BoundedQueueV1:
- 스토리: 가게 선반(큐)이 있는데, 선반이 꽉 차면 생산자는 물건을 채울 수 없고, 물건이 없으면 소비자는 아무것도 가져갈 수 없어요.
- 결과: 물건이 많으면 버리고, 없으면 손님(소비자)이 기다릴 수밖에 없는 상황이 발생!
2. BoundedQueueV2:
- 스토리: 이번에는 선반이 꽉 차거나 비면 스레디(생산자/소비자)가 계속 선반을 확인해요. 선반이 비면 생산자, 꽉 차면 소비자가 기다리는데, 가게 문을 닫고 기다려서 다른 사람이 접근할 수가 없어요.
- 결과: 기다리기만 하고 나머지 사람들은 일을 못 해요! 마을이 멈춰요.
3. BoundedQueueV3:
- 스토리: 가게에 신호기가 생겼어요. 물건을 채우면 소비자에게 "물건 채워졌어요!"라고 신호를 보내고, 반대로 소비자가 물건을 가져가면 생산자에게 "빈 자리 생겼어요!"라고 알려줘요.
- 결과: 효율이 좀 더 좋아졌지만, 신호기가 누구를 먼저 불러야 할지 헷갈릴 때가 있어요. 생산자가 소비자를 불러야 하는데 가끔 또 다른 생산자를 깨우기도 해서 비효율 발생!
4. BoundedQueueV4 (V5를 위한 중간코드):
- 스토리: 가게를 좀 더 효율적으로 만들기 위해 신호기 시스템을 개선했어요. 생산자 대기실과 소비자 대기실을 따로 만들었어요. 이제는 생산자와 소비자가 각자 신호를 받아야 할 때만 정확히 불려요.
- 결과: 선반이 비면 생산자, 가득 차면 소비자가 불려서 훨씬 효율적이에요!
5. BoundedQueueV5:
- 스토리: 신호기를 업그레이드해서 생산자 전용 신호기와 소비자 전용 신호기를 만들었어요. 이제 생산자는 소비자에게, 소비자는 생산자에게 정확히 신호를 보내서 혼란이 없어요.
- 결과: 생산자-소비자 문제를 완벽하게 해결! 이제 이 시스템은 다양한 프로젝트에 재사용할 수 있을 만큼 완성도 높은 시스템이 되었어요.
⬛ BlockingQueue
BlockingQueue는 스레드가 데이터를 추가하거나 제거하는 작업이 차단(Blocking)될 수 있는 멀티스레드용 큐이다.
이 큐는 생산자-소비자 문제를 해결하기 위한 도구로, 다음과 같은 상황에서 스레드가 차단된다.
- 데이터 추가 차단: 큐가 가득 차면 데이터를 추가하려는 스레드는 공간이 생길 때까지 대기
- 데이터 획득 차단: 큐가 비어 있으면 데이터를 가져가려는 스레드는 데이터가 들어올 때까지 대기
BlockingQueue는큐가 가득 차거나 비었을 때 대기, 예외 발생, 특정 시간 대기 등의 처리를 지원한다. 이를 통해 생산자와 소비자가 서로의 작업이 완료될 때까지 적절히 대기하고 처리할 수 있다.
대표적인 구현체는 다음과같다.
- ArrayBlockingQueue: 고정된 크기의 배열 기반 큐.
- LinkedBlockingQueue: 크기를 고정할 수도 있고, 무한대로 사용할 수도 있는 링크 기반 큐.
🔶 BlockingQueue 인터페이스
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
//...
}
🔶 BlockingQueue 구현 예시
public class BoundedQueueV6_1 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
queue = new ArrayBlockingQueue<>(max);
}
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}