고급 3편 자바 섹션12 병렬 스트림
1️⃣ 단일 스트림 (Sequential Stream)
- 한 줄씩 순서대로 실행됨 → main 스레드 1개 사용
- 성능: 1초 작업 × 8개 → 8초 소요
int sum = IntStream.rangeClosed(1, 8)
.map(HeavyJob::heavyTask) // 1초 작업
.sum();
[main] calculate 1 → 10
...
[main] calculate 8 → 80
총 소요 시간: 약 8000ms
2️⃣ 스레드 직접 사용 (Thread 객체)
- 직접 Thread를 만들어 작업 분할 처리
- 예: thread-1, thread-2가 병렬 수행
Thread t1 = new Thread(() -> task(1, 4));
Thread t2 = new Thread(() -> task(5, 8));
t1.start(); t2.start();
t1.join(); t2.join(); // main 스레드 대기
3️⃣ 스레드 풀 사용 (ExecutorService)
- ExecutorService와 Callable로 스레드 풀 활용
- submit() → Future로 결과 받고, .get()으로 대기
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> f1 = pool.submit(() -> task(1, 4));
Future<Integer> f2 = pool.submit(() -> task(5, 8));
int sum = f1.get() + f2.get();
pool.shutdown();
✅ 장점:
- 스레드 재사용으로 효율적
- Future로 결과 처리도 쉬움
❗ 단점: - 여전히 분할/합치기 코드를 직접 작성해야 함
4️⃣ Fork/Join 프레임워크 (RecursiveTask 사용)
- ForkJoinPool + RecursiveTask 활용
- 자동 분할, 작업 훔치기(Work Stealing)로 병렬성 향상
🔹 핵심 클래스
- ForkJoinPool: 병렬 작업 실행
- RecursiveTask<T>: 결과 반환 있음
- RecursiveAction: 결과 반환 없음
🔹 작업 클래스 작성
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 4; // 임계값
private final List<Integer> list;
public SumTask(List<Integer> list) {
this.list = list;
}
@Override
protected Integer compute() {
if (list.size() <= THRESHOLD) {
return list.stream()
.mapToInt(HeavyJob::heavyTask)
.sum();
} else {
int mid = list.size() / 2;
SumTask left = new SumTask(list.subList(0, mid));
SumTask right = new SumTask(list.subList(mid, list.size()));
left.fork(); // 비동기 처리
int rightResult = right.compute(); // 현재 스레드 처리
int leftResult = left.join(); // 왼쪽 결과 기다림
return leftResult + rightResult;
}
}
}
🔹 실행 코드 (커스텀 풀 or 공용 풀)
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> list = IntStream.rangeClosed(1, 8).boxed().toList();
SumTask task = new SumTask(list);
int result = pool.invoke(task);
작업 훔치기 (Work Stealing)란?
할 일 없는 스레드가 바쁜 스레드의 작업을 몰래 훔쳐서 도와주는 것
- 병렬 작업을 나눠서 여러 스레드에 맡겼는데,어떤 스레드는 일찍 끝났고, 어떤 스레드는 아직도 바쁘다고해보자.
- 이때 남는 스레드가 바쁜 스레드의 작업을 도와주면 전체 속도가 빨라진다.
- 각 스레드는 자기 전용 작업 큐를 갖고 있다.
- 자신은 큐의 앞쪽(Top)에서 꺼내서 사용한다.
- 남의 작업은 큐의 뒤쪽(Bottom)에서 조심스럽게 하나만 가져와 충돌을 최소화한다.
- 작업이 크면 fork로 쪼개자
- 작업이 작으면 그냥 계산하자
- fork는 한 번만, 나머지는 compute + join
- 성능 튜닝 = 임계값 설정 + 실험
left.fork(); // 왼쪽 작업: 다른 스레드에게 맡기고
int right = right.compute(); // 오른쪽 작업: 지금 스레드에서 직접 실행
int left = left.join(); // 왼쪽 결과 기다리기
return left + right; // 결과 합치기
작업 크기 | 너무 작으면 오버헤드, 너무 크면 병렬 효과 ↓ | THRESHOLD 조절 (코어 수 기준으로 4~10배 작업 추천) |
작업 시간 균일성 | 시간이 다르면 나눠야 효율 ↑ | 작업 훔치기(Work Stealing) 효과 발휘 |
작업 복잡도 | 단순 연산은 굳이 안 나눠도 됨 | 1+2+3+4처럼 가벼운 건 직접 처리 |
시스템 자원 | CPU 코어 수, 메모리 등 고려 | availableProcessors()로 코어 수 확인 |
문맥 전환 비용 | 스레드 전환 많아지면 오히려 느림 | 너무 작은 단위로 나누지 말기 |
🛠️ fork() 사용 위치 | 하나만 fork하고 나머지는 compute() | 왼쪽 fork, 오른쪽 compute → 순서 유지 & 메모리 절약 |
5️⃣ Fork/Join 프레임워크 (공용풀사용)
공용 풀(Common Pool): 자바-8부터 도입된 ForkJoinPool.commonPool()을 통한 단일 인스턴스 풀
- 자동 생성·공유
- ForkJoinPool.commonPool()으로 꺼내 쓰기만 하면 되고, 직접 생성·종료할 필요가 없음.
- 애플리케이션 어디서든 같은 풀을 씀.
- CPU 코어 수 기반 스레드 수
- 시스템의 물리적 코어 수에서 1개를 뺀 만큼 워커 스레드를 만듬.
- 예) 코어 4개 → 스레드 3개 + 메인 스레드가 함께 일함
- 병렬 스트림과 연동
- Stream.parallel()을 쓰면 내부적으로 이 공용 풀이 동작해서 데이터를 자동으로 나누고 합침.
- 자원 효율성
- 여러 군데서 풀을 만들지 않고 한 군데서 공유하니, 메모리·스레드 관리가 간단해.
ForkJoinPool commonPool = ForkJoinPool.commonPool();
SumTask task = new SumTask(data); // RecursiveTask 구현
int result = task.invoke(); // 공용 풀로 작업 실행
log("최종 결과: " + result);
주요 포인트
- 별도 풀 생성/종료 코드 없이 간편
- CPU 바운드 작업에 최적화된 스레드 수 자동 설정
- 커스텀 풀 대비 리소스 효율성 높음
cf.) 커스텀풀과 공용풀의 차이
생성·관리 | 커스텀풀: 직접 생성·종료 필요 | 공용풀: 자동 생성·JVM 종료 시 정리 |
스레드 수 조정 | new ForkJoinPool(스레드수) | availableProcessors() - 1 만큼 자동 설정 |
재사용·공유 | 해당 코드 영역에서만 사용 | 애플리케이션 전체에서 공유 |
6️⃣ 자바 병렬 스트림
- 선언만으로 병렬 처리: IntStream.rangeClosed(...).parallel()...
- 내부적으로 Fork/Join 공용 풀 사용: 데이터 소스를 자동으로 쪼개고, 여러 워커 스레드가 동시에 처리한 뒤 결과를 합쳐 줌
int sum = IntStream.rangeClosed(1, 8)
.parallel() // 한 줄만 추가
.map(HeavyJob::heavyTask)
.sum();
장점
- 선언형 코드 한 줄로 멀티스레딩 구현
- 내부적으로 Spliterator가 데이터 분할·분배
7️⃣ 병렬 스트림 사용 시 주의점1 – CPU 바운드 전용
- Fork/Join 공용 풀은 계산 집약적 작업에 최적화
- I/O 바운드 작업에 사용 시
- 스레드 블로킹 → CPU 낭비
- 컨텍스트 스위칭 비용 증가
- 작업 훔치기 기법 무력화
- 결론: I/O 작업은 반드시 별도 스레드 풀(ExecutorService) 사용
8️⃣ 병렬 스트림 사용 시 주의점2 – 별도 풀 도입
I/O 바운드나 동시 요청이 많을 땐 공용 풀 대신 전용 풀 사용 권장
ExecutorService requestPool = Executors.newFixedThreadPool(100);
ExecutorService logicPool = Executors.newFixedThreadPool(400);
// 요청 처리 스레드(requestPool)에서 logicPool.submit(...) 호출
효과: 블로킹 대기 작업이 공용 풀을 점유하지 않고, 안정적인 처리 보장
CPU 연산과 I/O 작업을 각각 전용 풀에 나눠 담아서 처리하는 예제.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class SeparatePoolsExample {
// 무거운 계산(CPU 바운드) 예시
static int heavyCalc(int x) {
// 예: 1초짜리 계산 시뮬레이션
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
return x * x;
}
// I/O 바운드 예시 (파일 읽기, 네트워크 등)
static void doIoTask(int id) {
// 예: 1초짜리 I/O 시뮬레이션
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
System.out.println("I/O 작업 완료: " + id);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1) CPU 작업 전용: 공용 Fork/Join 풀 사용
ForkJoinPool cpuPool = ForkJoinPool.commonPool();
// 예: 1~4를 제곱하는 CPU 연산을 parallelStream으로 처리
List<Integer> numbers = Arrays.asList(1, 2, 3, 4);
Future<?> cpuFuture = cpuPool.submit(() -> {
numbers.parallelStream()
.map(SeparatePoolsExample::heavyCalc)
.forEach(result -> System.out.println("계산 결과: " + result));
});
// 2) I/O 작업 전용: 별도 고정 스레드 풀 생성
ExecutorService ioPool = Executors.newFixedThreadPool(4);
for (int i = 1; i <= 4; i++) {
final int id = i;
ioPool.submit(() -> doIoTask(id));
}
// 3) 둘 다 끝날 때까지 대기
cpuFuture.get(); // CPU 작업 완료 대기
ioPool.shutdown();
ioPool.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("모든 작업 완료");
}
}
9️⃣ CompletableFuture 주의사항
- CompletableFuture.runAsync(...) 기본으로 공용 풀 사용
- I/O 작업 시 반드시 커스텀 풀 지정
CompletableFuture.runAsync(task); // 공용 풀 → 위험
CompletableFuture.runAsync(task, es); // 안전: 별도 풀 지정
지정하지 않으면 병목·장애 발생
🔟 정리
- 단 한 줄(.parallel())로 선언형 병렬 처리 가능하나, 내부적으로는 Fork/Join 공용 풀 활용
- CPU 바운드 작업만으로 한정, I/O 바운드·동시 요청 시 별도 풀 사용
- Fork/Join 공용 풀과 private 풀, CompletableFuture 풀 등을 작업 특성과 시스템 환경에 맞춰 조합해 사용해야 한다
'Java' 카테고리의 다른 글
[Java] 김영한의 실전 자바 - 고급 3편 섹션12 함수형 프로그래밍 (1) | 2025.04.28 |
---|---|
[Java] 김영한의 실전 자바 - 고급 3편 섹션11 Optional (1) | 2025.04.17 |
[Java] 김영한의 실전 자바 - 고급 3편 섹션7 스트림API (1) | 2025.04.12 |
[Java] 김영한의 실전 자바 - 고급 3편 섹션6 메서드 참조 (0) | 2025.04.12 |
[Java] 김영한의 실전 자바 - 고급 3편 섹션4,5 람다활용,익명클래스와의 차이 (+정적 팩토리 메서드) (0) | 2025.04.07 |