Java

[Java] 김영한의 실전 자바 - 고급 1편 섹션9 생산자 소비자 문제2

고쩡이 2024. 9. 26. 11:27

본문은 인프런 김영한T 자바 고급편1을 공부하고 정리한 내용입니다:)

⬛ Lock Condition

ReentrantLockCondition을 사용하여 생산자와 소비자가 서로 독립적인 대기 집합을 사용하도록 분리한다. 이는 효율적인 대기를 가능하게 하여 불필요한 스레드 깨어남을 방지한다.

 

ReentrantLock 은 내부에 락과, 락 획득을 대기하는 스레드를 관리하는 대기 큐가 있다.

condition.await(): Object.wait()와 유사하며, 현재 스레드를 대기 상태로 보관하면서 ReentrantLock에서 획득한 락을 반납한다.
condition.signal(): Object.notify()와 유사하며, 대기 중인 스레드 하나를 깨워 대기 상태에서 빠져나오게 한다.

 

기존 코드를 ReentrantLock과 Condition을 사용하도록 바꾸면 아래와 같다.

 

🔶 예제코드

public class BoundedQueueV4 implements BoundedQueue{

	private final Lock lock = new ReentrantLock();
	// ReentrantLock의 스레드 대기 공간 생성
	private final Condition condition = lock.newCondition();

	private final Queue<String> queue = new ArrayDeque<>();
	private final int max;

	public BoundedQueueV4(int max) {
		this.max = max;
	}

	@Override
	public void put(String data) {
		lock.lock();
		try {
			while (queue.size() == max) {
				log("[put] 큐가 가득 참, 생산자 대기");
				try {
					condition.await();
					log("[put] 생산자 깨어남");
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			queue.offer(data);
			log("[put] 생산자 데이터 저장, signal() 호출"); condition.signal();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public String take() {
		lock.lock();
		try {
			while (queue.isEmpty()) {
				log("[take] 큐에 데이터가 없음, 소비자 대기");
				try {
					condition.await();
					log("[take] 소비자 깨어남");
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			String data = queue.poll();
			log("[take] 소비자 데이터 획득, signal() 호출"); condition.signal();
			return data;
		} finally {
			lock.unlock();
		}
	}
}

⬛ 생상자 소비자 대기공간 분리

대기공간을 분리해보자. 핵심은 생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다는 점이다.

public class BoundedQueueV5 implements BoundedQueue{
	private final Lock lock = new ReentrantLock();
	private final Condition producerCond = lock.newCondition();
	private final Condition consumerCond = lock.newCondition();

	private final Queue<String> queue = new ArrayDeque<>();
	private final int max;

	public BoundedQueueV5(int max) {
		this.max = max;
	}

	@Override
	public void put(String data) {
		lock.lock();
		try {
			while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기");
				try {
					producerCond.await(); // 생산자 스레드를 생산자 전용 스레드 대기에 보관
					log("[put] 생산자 깨어남");
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			queue.offer(data);
			log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
			consumerCond.signal(); // 소비자 전용 스레드 대기 공간에 신호
		} finally {
			lock.unlock();
		}
	}

	@Override
	public String take() {
		try {
			while (queue.isEmpty()) {
				log("[take] 큐에 데이터가 없음, 소비자 대기");
				try {
					consumerCond.await(); // 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관
					log("[take] 소비자 깨어남");
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			String data = queue.poll();
			log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
			producerCond.signal(); // 생산자 전용 스레드 대기 공간에 신호
			return data;
		} finally {
			lock.unlock();
		}
	}
}

 

단순한 예시 그림을 보자. 아래와같은 상황에서 출발한다고해보자.(위 코드와는 관련이 없다.)

 

🔶 생산자 실행 상황

생산자는 데이터 생성후 소비자 대기공간에 signal()을 통해 알려준다.:)

🔶 소비자 실행 상황

소비자는 데이터 소비후 생산자 대기공간에 signal()을 통해 알려준다.:)

 

🟢 Object.notify() vs Condition.signal() 차이

Object.notify()는 누구나 깨어날 수 있는 반면, Condition.signal()은 먼저 대기한 스레드가 깨어날 가능성이 더 높은 방식이다.

Object.notify():

  • 무작위로 깨어남: 대기 중인 스레드 중 아무나 하나를 깨운다.누가 깨어날지는 정해져 있지 않으며, JVM에 따라 다르다.
  • 순서 없음: 들어온 순서와 관계없이 깨어날 수 있다.
  • synchronized 블록 내에서 사용: notify()는 synchronized로 보호된 코드 안에서만 호출 가능하다.

Condition.signal():

  • 순서에 따라 깨어남: 대기 중인 스레드 중 하나를 깨우되, 보통은 FIFO(First In, First Out) 방식으로 깨어난다. 즉, 먼저 기다린 스레드가 먼저 깨어난다.
  • 더 예측 가능: Condition은 보통 Queue 구조를 사용하므로, 스레드가 깨어나는 순서를 좀 더 예측할 수 있다.
  • ReentrantLock과 함께 사용: signal()은 ReentrantLock과 함께 사용된다.

⬛ 스레드 대기

스레드 대기 규칙은 아래와 같다.

  • 일할 수 없을 때는 대기실에서 차례를 기다린다. (락 획득대기, BLOCKED)
  • 누군가 일을 끝내면, 신호를 받아야만 스레드 대기실에서 나가서 일할 수 있다. (wait() 대기, WAITING)

사실, wait() 대기 집합도 존재한다.락 대기 집합은 자바 내부에 구현되어 있기 때문에 모니터 락과 같이 개발자가 확인하기는 어렵다.

개념상,락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소이다.

스레드는 2중 감옥을 모두 탈 출해야 임계 영역을 수행할 수 있다.

이전 그림과 같은 상태를 좀 더 자세히 그린 그림이다.:)

 

⬛ 생산자 소비자 문제 디벨롭 과정

1. BoundedQueueV1:

  • 스토리: 가게 선반(큐)이 있는데, 선반이 꽉 차면 생산자는 물건을 채울 수 없고, 물건이 없으면 소비자는 아무것도 가져갈 수 없어요.
  • 결과: 물건이 많으면 버리고, 없으면 손님(소비자)이 기다릴 수밖에 없는 상황이 발생!

2. BoundedQueueV2:

  • 스토리: 이번에는 선반이 꽉 차거나 비면 스레디(생산자/소비자)가 계속 선반을 확인해요. 선반이 비면 생산자, 꽉 차면 소비자가 기다리는데, 가게 문을 닫고 기다려서 다른 사람이 접근할 수가 없어요.
  • 결과: 기다리기만 하고 나머지 사람들은 일을 못 해요! 마을이 멈춰요.

3. BoundedQueueV3:

  • 스토리: 가게에 신호기가 생겼어요. 물건을 채우면 소비자에게 "물건 채워졌어요!"라고 신호를 보내고, 반대로 소비자가 물건을 가져가면 생산자에게 "빈 자리 생겼어요!"라고 알려줘요.
  • 결과: 효율이 좀 더 좋아졌지만, 신호기가 누구를 먼저 불러야 할지 헷갈릴 때가 있어요. 생산자가 소비자를 불러야 하는데 가끔 또 다른 생산자를 깨우기도 해서 비효율 발생!

4. BoundedQueueV4 (V5를 위한 중간코드):

  • 스토리: 가게를 좀 더 효율적으로 만들기 위해 신호기 시스템을 개선했어요. 생산자 대기실소비자 대기실을 따로 만들었어요. 이제는 생산자와 소비자가 각자 신호를 받아야 할 때만 정확히 불려요.
  • 결과: 선반이 비면 생산자, 가득 차면 소비자가 불려서 훨씬 효율적이에요!

5. BoundedQueueV5:

  • 스토리: 신호기를 업그레이드해서 생산자 전용 신호기소비자 전용 신호기를 만들었어요. 이제 생산자는 소비자에게, 소비자는 생산자에게 정확히 신호를 보내서 혼란이 없어요.
  • 결과: 생산자-소비자 문제를 완벽하게 해결! 이제 이 시스템은 다양한 프로젝트에 재사용할 수 있을 만큼 완성도 높은 시스템이 되었어요.

⬛ BlockingQueue

BlockingQueue는 스레드가 데이터를 추가하거나 제거하는 작업이 차단(Blocking)될 수 있는 멀티스레드용 큐이다.

이 큐는 생산자-소비자 문제를 해결하기 위한 도구로, 다음과 같은 상황에서 스레드가 차단된다.

  • 데이터 추가 차단: 큐가 가득 차면 데이터를 추가하려는 스레드는 공간이 생길 때까지 대기
  • 데이터 획득 차단: 큐가 비어 있으면 데이터를 가져가려는 스레드는 데이터가 들어올 때까지 대기

BlockingQueue는큐가 가득 차거나 비었을 때 대기, 예외 발생, 특정 시간 대기 등의 처리를 지원한다. 이를 통해 생산자와 소비자가 서로의 작업이 완료될 때까지 적절히 대기하고 처리할 수 있다.

대표적인 구현체는 다음과같다.

  • ArrayBlockingQueue: 고정된 크기의 배열 기반 큐.
  • LinkedBlockingQueue: 크기를 고정할 수도 있고, 무한대로 사용할 수도 있는 링크 기반 큐.

🔶 BlockingQueue 인터페이스

 package java.util.concurrent;
 
 public interface BlockingQueue<E> extends Queue<E> {
     boolean add(E e);
     boolean offer(E e);
     void put(E e) throws InterruptedException;
     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
     E take() throws InterruptedException;
     E poll(long timeout, TimeUnit unit) throws InterruptedException;
     boolean remove(Object o);
     //...
}

 

🔶 BlockingQueue 구현 예시

public class BoundedQueueV6_1 implements BoundedQueue {
    private BlockingQueue<String> queue;
    public BoundedQueueV6_1(int max) {
        queue = new ArrayBlockingQueue<>(max);
}
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
}
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
}
    @Override
    public String toString() {
        return queue.toString();
    }
}