자바에서 병렬 스트림을 사용하는 경우
해당 포스팅은 해당 게시글을 토대로 서술되었습니다.
Java 8 에서는 컬렉션을 데이터 스트림으로 쉽게 반복할 수 있는 Stream API를 도입했습니다. Stream은 병렬로 실행할 수 있고 여러 프로세서 코어를 사용하는 것도 매우 쉽습니다. 그에 따라 더 많은 코어에 작업을 할당하여 나누는 것이 항상 더 빠르다고 생각하기 쉽습니다. 하지만 그렇지 않은 경우도 있습니다.
해당 포스팅에서는 순차 스트림과 병렬 스트림의 차이점에 대해서 살펴보겠습니다. 먼저 병렬 스트림에서 사용하는 default fork-join pool을 살펴보겠습니다.
또한, 메모리 지역 및 분할/병합 비용을 포함하여 병렬 스트림 사용의 성능 영향을 다뤄보겠습니다.
마지막으로 순차 스트림을 병렬 스트림으로 변환하는 것이 합리적인 경우를 살펴봅니다.
자바에서의 Stream
자바의 Stream은 단순히 데이터 소스를 감싸는 Wrapper이므로 편리한 방식으로 데이터에 대한 대량 작업을 수행할 수 있습니다. 데이터를 저장하거나 기본 데이터 소스를 변경하지 않고 데이터 파이프라인에서 함수형 처리를 지원합니다.
순차 스트림
기본적으로, 명시적으로 병렬로 지정되지 않는 한 자바의 모든 Stream 작업은 순차적으로 처리됩니다. 순차스트림은 단일 스레드로 처리됩니다.
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);
해당 코드의 출력결과는 쉽게 예측할 수 있습니다. 리스트의 요소는 항상 순서대로 출력됩니다.
실행결과 :
1 main
2 main
3 main
4 main
병렬 스트림을 사용하면 별도의 코어에서 병렬로 코드를 실행할 수 있습니다. 최종 결과는 각 개발 결과를 조합한 결과입니다.
하지만 병렬 스트림의 실행 순서는 개발자가 통제할 수 없습니다. 프로그램을 실행할 때마다 변경될 수 있습니다.
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);
4 ForkJoinPool.commonPool-worker-3
2 ForkJoinPool.commonPool-worker-5
1 ForkJoinPool.commonPool-worker-7
3 main
Fork-Join Framework
병렬 스트림은 Fork-Join 프레임워크와 common pool of worker 스레드를 사용합니다. fork-join 프레임워크는 여러 스레드간의 작업 관리를 처리하기 위해 Java7의 java.util.concurrent에 추가되었습니다.
소스 분할
fork-join 프레임워크는 worker 스레드간에 소스 데이터를 분할하고 작업 완료시 콜백을 처리하는 역할을 합니다.
정수의 합을 병렬로 계산하는 예를 살펴보겠습니다.
우리는 reduce 메소드를 사용하여 0에서 시작하는 대신 시작 합계에 5를 더할 것입니다.
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
assertThat(sum).isNotEqualTo(15);
순차스트림에서의 이 작업 결과는 15입니다. 그러나 reduce 작업이 병렬로 처리되기 때문에 실제 모든 worker 스레드에서 숫자 5가 합산됩니다.
실제 결과는 공통 fork-join 풀에서 사용되는 스레드 수에 따라 다를 수 있습니다. 해당 문제를 해결하기 위해서는, 병렬 스트림 외부에 숫자 5를 더해야합니다.
해결된 소스코드.
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
assertThat(sum).isEqualTo(15);
따라서 어떤 작업을 병렬로 수행할 수 있는지에 대해서는 항상 주의해야 합니다.
common thread pool
common 스레드풀의 스레드 수는 프로세서 코어 수와 같습니다. 그러나 API를 사용하면 JVM 매개변수를 전달하여 사용할 스레드수를 지정할 수 있습니다.
-D java.util.concurrent.ForkJoinPool.common.parallelism=4
이것은 전역으로 설정하는 것이며 common thread pool을 사용하는 모든 병렬 스트림 및 기타 모든 fork-join 작업에 영향을 미친다는 점을 기억해야 합니다. 전역설정으로 인해 더 많은 스레드가 필요한 부분에 4개로 제한하는 꼴이 되버리니까요. 매우 타당한 이유가 있지 않는 한 해당 매개변수를 수정하지 않는 것이 좋습니다.
custom thread pool
기본 common thread pool 외에도 custom thread pool에서 병렬 스트림을 실행할 수도 있습니다.
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
() -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();
customThreadPool.shutdown();
assertThat(sum).isEqualTo(10);
Oracle은 common Thread pool을 사용하는 것을 권장하고 있습니다. custom thread pool에서 병렬 스트림을 실행하는데에도 마찬가지로 타당한 이유가 있어야 할 것 입니다.
성능상의 영향
병렬 처리는 다중 코어를 온전히 활용할 수 있습니다. 그러나 다중 스레드 관리, 메모리 지역성, 소스 분할 및 결과 병합에 대한 오버헤드도 고려해야 합니다.
오버헤드(Overhead)
integer Stream에 대한 예를 살펴보겠습니다. 순차 스트림과 병렬 스트림의 reduce 메소드에 대한 성능 테스트를 실행하겠습니다.
IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);
이런 간단한 합계에서 순차 스트림을 병렬 스트림으로 변환하면 성능이 저하됩니다.
그 이유는 때때로 스레드, 소스 및 결과를 관리하는 오버헤드가 실제 작업을 수행하는 것보다 cost가 많이 드는 작업이기 때문입니다.
분할 비용(Splitting Costs)
데이터 소스(각각의 스트림 단위)를 균등하게 분할하는 것은 병렬 처리를 하는데 필요한 cost이지만 일부 데이터 소스는 다른 데이터 소스보다 더 잘 분할됩니다.
private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();
// 해당 코드로 100만의 정수 목록을 초기화합니다.
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
arrayListOfNumbers.add(i);
linkedListOfNumbers.add(i);
});
}
두가지 유형의 List 구현체에서 순차 및 병렬 reduce 작업에 대한 성능 테스트를 실행해보겠습니다.
arrayListOfNumbers.stream().reduce(0, Integer::sum)
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
linkedListOfNumbers.stream().reduce(0, Integer::sum);
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);
아래의 결과는 순차 스트림을 병렬 스트림으로 변환하는 것이 ArrayList에 대해서만 성능 이점을 가져온다는 것을 보여줍니다.
LinkedList는 병렬처리를 했지만 Score와 Error 값이 증가한 것을 볼 수 있는 반면, ArrayList는 Score와 Error 값이 감소하였습니다.
그 이유는 Array라는 특징이 저렴하게 소스를 분할할 수 있겠지만 LinkedList에는 이러한 속성이 없기 때문입니다. TreeMap과 HashSet은 LinkedList보다 잘 분할되지만 Array만큼은 분할되지 않습니다.
병합 비용(Merging Costs)
병렬 계산을 하기 위해 소스를 분할할 때마다 결국 결과를 병합해야 합니다. 서로 다른 병합 작업으로 sum 및 grouping을 사용하여 순차 및 병렬 스트림에서의 성능 테스트를 실행해보겠습니다.
arrayListOfNumbers.stream().reduce(0, Integer::sum);
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);
arrayListOfNumbers.stream().collect(Collectors.toSet());
arrayListOfNumbers.stream().parallel().collect(Collectors.toSet());
아래의 결과는 순차 스트림을 병렬 스트림으로 변환하는 것이 sum 연산에 대해서만 성능상의 이점을 가져온다는 것을 보여줍니다.
병합 작업은 reduce 와 sum 과 같은 일부 작업의 경우 정말 가볍게 동작하지만 set과 map과 같은 형태로 그룹화하는 것은 병합 작업이 상당히 무겁게 동작할 수 있습니다.
Memory Locality
최신 컴퓨터는 정교한 멀티 레벨 캐시를 활용하여 자주 사용하는 데이터를 프로세서 가까이에 저장합니다. 선형 메모리 엑세스 패턴이 감지되면 하드웨어는 데이터가 곧 필요할 것이라는 가정 하에 다음 데이터 라인을 미리 가져옵니다.(prefetch)
병렬 처리는 프로세서 코어가 유용한 작업을 지속적으로 처리할 수 있을 때 성능 이점을 가져옵니다. cache miss를 기다리는 것은 유용한 작업이 아니기 때문에 메모리 대역폭을 제한 요소로 고려해야 합니다.
하나는 primitive 타입을 사용하고 다른 하나는 wrapper 타입을 사용하는 두개의 배열을 사용하여 이를 증명해보겠습니다.
private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
intArray[i-1] = i;
integerArray[i-1] = i;
});
}
두 배열에서 순차 및 병렬 reduce 작업에 대한 성능 테스트를 실행해보겠습니다.
Arrays.stream(intArray).reduce(0, Integer::sum);
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
Arrays.stream(integerArray).reduce(0, Integer::sum);
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);
아래의 결과는 순차 스트림을 병렬 스트림으로 변환하는 것이 primitive 배열을 사용할 때 약간 더 많은 성능 이점을 가져온다는 것을 보여줍니다. (둘 다 성능상에 이점은 있었지만 primitive 타입이 더 컸다.)
결과적으로 primitive int 타입의 값이 가장 score가 적습니다.
기본 Array는 Java에서 가장 최고의 locality를 제공합니다. 일반적으로 데이터 구조에 포인터가 많을수록 참조 객체를 가져오기 위해 메모리에 더 많은 압력을 가합니다.(여기서 포인터라는 것은 참조로 이해하면 좋을 것 같습니다. primitive 타입은 값 자체를 다루는 타입이고 Wrapper 타입은 결국 인스턴스가 들어있는 참조이기 때문입니다.)
여러 코어가 동시에 메모리에서 데이터를 가져오기 때문에 병렬화에 부정적인 영향을 미칠 수 있습니다.
NQ 모델
오라클은 병렬 처리가 성능 향상을 제공할 수 있는지 여부를 결정하는데 도움이 되는 간단한 모델을 제시했습니다. NQ 모델에서는 N은 소스 데이터의 요소의 수를 나타내고 Q는 데이터 요소당 수행된 계산의 양을 나타냅니다.
N * Q의 곲이 클수록 병렬화로 인해 성능 향상의 가능성이 높아 집니다. 숫자 합산과 같이 사소한 Q가 있는 문제의 경우 경험상 N은 10,000보다 커야 합니다. (List 요소의 size() 값이 1만은 넘어야 성능효과가 미친다는 것.)
따라서 계산 수가 증가하면 병렬 처리를 통해 성능을 높이는 데 필요한 데이터 크기가 줄어듭니다.
-> 복잡한 단계의 계산일수록 상대적으로 N(요소의 수)값이 낮아져도 성능 향상에 대한 효과를 얻을 수 있다는 표현인듯?
파일 검색 비용
// 순차 스트림 처리
Files.walk(Paths.get("src/main/resources/")).map(Path::normalize).filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(".txt")).collect(Collectors.toList());
// 병렬 스트림 처리
Files.walk(Paths.get("src/main/resources/")).parallel().map(Path::normalize).filter(Files::
isRegularFile).filter(path -> path.getFileName().toString().endsWith(".txt")).
collect(Collectors.toList());
아래의 결과에 따르면 순차 스트림을 병렬 스트림으로 변환하여 더 많은 수의 파일을 검색할 때 약간 더 많은 성능이점을 얻을 수 있습니다.
병렬 스트림을 사용하는 경우
지금까지 살펴본 것처럼 병렬 스트림을 사용할 때에는 매우 신중하게 사용해야합니다.
병렬 처리는 특정 사용 사레에서 성능 이점을 가져올 수 있습니다. 그러나 병렬 스트림은 마법의 성능 이점을 가져오지는 않습니다. 따라서 개발중에는 순차 스트림을 계속 기본으로 사용해야 합니다.
실제 성능 요구 사항이 있을 때 순차 스트림을 병렬 스트림으로 변환할 수 있습니다. 이러한 요구사항이 주어지면 먼저 성능 측정을 실행하고 가능한 최적화 전략으로 병렬 처리를 고려할 수 있습니다.
많은 양의 데이터와 요소당 소행되는 많은 계산은 병렬 처리가 좋은 옵션이 될 수 있음을 나타냅니다.
반면에 적은 양의 데이터, 고르지 않은 소스 분할, 값비싼 병합 작업 및 열악한 메모리 지역성은 병렬 실행에 대한 잠재적인 문제를 나타낼 수 있습니다.
결론
해당 포스팅에서는 자바에서 순차 스트림과 병렬 스트림의 차이점에 대해서 살펴보았습니다. 병렬 스트림이 기본 fork-join pool과 해당 worker 스레드를 사용한다는 것을 배웠습니다.
그런 다음 병렬 스트림이 항상 성능 이점을 가져오는 것은 아님을 확인했습니다. 다중 스레드 관리, 메모리 지역성, 소스 분할 및 결과 병합의 오버헤드를 고려해보았습니다. 여기서 Array가 가능한 최고의 지역성을 제공하고 저렴하고 균등하게 분할할 수 있기 때문에 병렬 처리를 위한 훌륭한 데이터 소스라는 것을 알았습니다.
마지막으로 NQ모델을 살펴보고 실제 성능 요구사항이 있는 경우에만 병렬 스트림을 사용하는 것이 좋다는 것을 알았습니다.
감사합니다.