⬛ 생산자 소비자 문제
🟢 생상자 소비자 문제 (producer-consumer problem)
생산자 스레드와 소비자 스레드가 특정 자원을 함께 생산하고, 소비하면서 발생하는 문제
🟢 주요 개념
- 생산자(Producer): 데이터를 생성하는 스레드로, 예를 들어 파일에서 데이터를 읽거나 네트워크에서 데이터를 받아오는 역할
- 소비자(Consumer): 생산된 데이터를 사용하는 스레드로, 데이터를 처리하거나 저장하는 역할
- 버퍼(Buffer): 생산자가 생성한 데이터를 임시로 저장하는 공간으로, 크기가 한정되어 있으며 생산자와 소비자가 이 공간을 통해 데이터를 주고받음
⬛ 예제1 - 기본적 생산자 소비자 문제
구현:
- BoundedQueueV1이라는 한정된 버퍼를 사용
- 버퍼에 데이터가 가득 차거나 비어 있을 때, 특별한 처리 없이 데이터를 버리거나 null을 반환하는 방식
주요 내용:
- BoundedQueue 인터페이스: 데이터를 저장하고 가져가는 두 가지 메서드인 put()과 take()를 정의
- BoundedQueueV1:
- put(): 버퍼가 가득 찬 경우 데이터를 버림
- take(): 버퍼가 비어 있으면 null 반환
- 생산자와 소비자 스레드:
- 생산자(ProducerTask): 데이터를 생성하여 버퍼에 넣는다. 버퍼가 가득 차면 데이터를 버린다.
- 소비자(ConsumerTask): 버퍼에서 데이터를 꺼내 사용한다. 버퍼가 비어 있으면 null을 반환한다.
단점:
- 생산자가 데이터를 버리거나, 소비자가 빈 데이터를 받아올 수 있어 비효율적
🔶 예제 코드
public class BoundedQueueV1 implements BoundedQueue{
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV1(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
if (queue.size() == max) {
log("[put] 큐가 가득 참, 버림: " + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if (queue.isEmpty()) return null;
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
⬛ 예제2 - 기본적 생산자 소비자 문제
구현:
- BoundedQueueV2에서는 버퍼가 가득 찬 경우 생산자 스레드가 대기하고, 버퍼가 비어 있는 경우 소비자 스레드가 대기하는 방식으로 개선했다.
주요 내용:
- BoundedQueueV2 (대기 메커니즘 사용):
- put(): 버퍼가 가득 차면 생산자는 대기(sleep())하여 소비자가 데이터를 가져가 빈 공간이 생길 때까지 기다린다.
- take(): 버퍼가 비면 소비자는 대기(sleep())하여 생산자가 데이터를 넣을 때까지 기다린다.
- 주요 변경점:
- 생산자가 데이터를 버리지 않고, 소비자가 빈 데이터를 받지 않는 방식으로 개선했다.
단점:
- sleep()을 이용한 대기 방식은 비효율적이며, 스레드가 일정한 주기로 계속 상태를 확인해야 하므로 성능에 영향을 줄 수 있다.
- 스레드가 락을 가지고 있는 상태에서 대기하기 때문에, 무한 대기 문제가 발생한다.
🔶 예제 코드
public class BoundedQueueV2 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV2(int max) {this.max = max;}
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
sleep(1000);
}
queue.offer(data);
}
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
sleep(1000);
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
위 내용을 더 자세히보자.
버퍼가 다 찬 상태에서 생산자가 진입한다고 가정해보자. 그러면 생산자는 우선 락을 획득하고, 버퍼가 비워질때까지 대기한다.
하지만 소비자가 락을 획득해 버퍼의 데이터를 소비하려면, 생산자 스레드가 먼저 락을 반납해야 한다.
결국 생산자는 절대 비워지지 않는 큐를 계속 확인하는 것이다.
결국 위의 문제는, 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다면 해결할 수 있을 것이다.
⬛ Object - wait, notify 개념
🟢 wait(), notify()
⚪️ Object.wait()
- 현재 스레드가 가지고 있는 락(모니터 락)을 반납하고 대기 상태(WAITING)로 전환
- notify() 또는 notifyAll()이 호출될 때까지 대기 상태로 유지
⚪️ Object.notify()
기능
- 대기 중인 스레드 중 하나를 깨워 락을 획득할 수 있는 기회 제공
- 여러 스레드가 대기 중인 경우 그 중 한 개의 스레드만이 깨워짐
⚪️ Object.notifyAll()
기능
- 대기 중인 모든 스레드를 깨워 락을 획득할 기회 제공
요약하면, wait()는 스레드가 락을 반납하고 대기 상태로 들어가며, notify()는 대기 중인 하나의 스레드를 깨우고, notifyAll()은 모든 대기 중인 스레드를 깨운다. 이들 모두 synchronized 블록 또는 메서드 내에서 호출해야 한다.
⬛ 예제3 - Object : wait, notify를 이용한 동기화 구현
구현:
- 자바의 Object 클래스에 내장된 wait()와 notify() 메서드를 활용하여 대기 및 알림 시스템을 보다 효율적으로 구현
- 동기화된 임계 영역 내에서 스레드가 안전하게 대기하고 깨어나도록 설계
주요 내용:
- BoundedQueueV3 (대기와 알림):
- wait(): 버퍼가 가득 차면 생산자는 wait()로 대기 상태에들어가고, 버퍼가 비면 소비자도 wait()로 대기
- notify(): 소비자가 데이터를 가져가면 notify()로 생산자에게 알리고, 생산자가 데이터를 넣으면 소비자에게 notify()로 알림
장점:
- wait()와 notify()를 활용한 대기는 스레드가 자원을 사용할 준비가 되었을 때만 깨어나므로, 시스템 자원을 절약 가능
🔶 예제 코드
public class BoundedQueueV3 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV3(int max) {
this.max = max;
}
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
// notifyAll(); // 모든 대기 스레드, WAIT -> BLOCKED
}
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
wait();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
// notifyAll(); // 모든 대기 스레드, WAIT -> BLOCKED
return data;
}
@Override
public String toString() {
return queue.toString();
}
}
🟢 스레드 대기 집합(wait set)
- 대기상태에 들어간 스레드를 관리한다.
- 모든 객체는 락(모니터 락)과 대기 집합을 가지고 있다.
예를들어, 버퍼가 가득차 생산자가 대기하는 상황을보자.
아래처럼 wait()을 호출하면 락을 반납하고, 스레드 상태가 RUNNABLE→WAITING으로 변경된다.
또다른 케이스로, 소비자가 먼저 실행되어 대기하는 상황을 보자.
생산자는 데이터 생성후 notify()로 대기 스레드 중 하나를 깨운다.
대기 스레드는 WAITING→BLOCKED 상태로 대기하며 락을 기다린다.
c1이 락을 획득하면, 데이터 생성 후 스레드 대기집합에 알린다. 이때 대기집합엔 생산자 스레드가 아니라 소비자 스레드만 있다.
따라서 의도 와는 다르게 소비자 스레드인 c2가 대기 상태에서 깨어난다. c2는 깨어났지만 데이터가 없기에 다시 대기집합에 들어간다.
만약 소비자인 c2 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c3 를 깨우지는 않았을 것이다. 하지만 notify() 는 이런 선택을 할 수 없다.
⬛ Object : wait, notify - 한계
notify()를 호출할 때, 임의의 스레드가 선택되기 때문에, 잘못된 스레드가 깨워져서 불필요하게 자원이 낭비될 수 있다.
- 소비자가 같은 소비자를 깨우는 상황: 큐에 데이터가 없는데, 대기 중인 소비자가 또 다른 소비자를 깨우는 비효율.
- 생산자가 같은 생산자를 깨우는 상황: 큐가 가득 찼는데, 대기 중인 생산자가 또 다른 생산자를 깨우는 비효율.
🟢 스레드 기아 (thread starvation)
notify()가 랜덤으로 스레드를 깨우기 때문에, 실행 순서를 계속 얻지 못해서 스레드가 실행되지 않는 상황이 올 수 있다.
이를 해결하기위해, notifyAll() 을 사용하면 스레드 대기 집합에 있는 모든 스레드를 한번에 다 깨울 수 있다.
notifyAll() 호출시, 모든 스레드가 깨어나며 임계영역으로 들어가 락을 획득하려한다.
락을 획득하지못하면 다시 스레드 대기집합에 들어간다.
만약 c1이 먼저 락을 획득하면 큐에 데이터가없으므로 다시 스레드 대기 집합에들어간다. (c2~c5 도 마찬가지)
결국 p1이 가장 늦게 락 획득을 시도해도, 마지막엔 p1만남기에 락을 획득한다.
⬛ notify() 한계
생산자 스레드는 데이터를 생성한 후 대기 중인 소비자 스레드에게 알림을 보내야 하고, 소비자 스레드는 데이터를 소비한 후 대기 중인 생산자 스레드에게 알림을 보내도록 했다.
하지만, 생산자와 소비자가 같은 대기 집합에 속해 있고, notify()는 특정 스레드를 지정하지 못해 비효율적이다.
notifyAll()을 사용하면 모든 대기 중인 스레드를 깨우지만, 이는 원하지 않는 스레드까지 깨어난다.
이러한 문제를 해결하기 위한 방법이 필요하다...!
'Java' 카테고리의 다른 글
[Java] 김영한의 실전 자바 - 고급 1편 섹션9 생산자 소비자 문제2 (1) | 2024.09.26 |
---|---|
[Java] 김영한의 실전 자바 - 고급 1편 섹션11 동시성 컬렉션 (0) | 2024.09.23 |
[Java] 김영한의 실전 자바 - 고급 1편 섹션7 고급 동기화 - concurrent.Lock (4) | 2024.08.28 |
[Java] 김영한의 실전 자바 - 고급 1편 섹션6 동기화 - synchronized (0) | 2024.08.19 |
[Java] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 섹션2,3 스레드 제어와 생명주기 (0) | 2024.07.28 |