Study/Modern-Java-In-Action

Modern Java Ch07. Stream 병렬 처리와 성능

hongeeii 2023. 1. 21.
728x90
반응형

병렬 데이터 처리와 성능

자바 7이 등장하기 전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠다. 우선 데이터를 서브파트로 분할해야 한다. 그리고 분할된 서브파트를 각각의 스레드로 할당한다. 스레드로 할당한 다음에는 의도치 않은 레이스 컨디션(경쟁 상태)이 발생하지 않도록 적절한 동기화를 추가해야 하며, 마지막으로 부분 결과를 합쳐야 한다.

자바 7은 더 쉽게 병렬화를 수행하면서 에러를 최소화 할 수 있도록 포크/조인 프레임워크기능을 제공 한다.

병렬 스트림

스트림을 이용하면 순차 스트림을 병렬 스트림으로 자연스럽게 바꿀 수 있다. 자바 7에 추가된 포크/조인 프레임워크와 내부적인 병렬 스트림 처리는 어떤 관계가 있는지 살펴본다. 병렬 스트림이 내부적으로 어떻게 처리되는지 알아야만 스트림을 잘못 사용하는 상황을 피할 수 있다. 컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

순차 스트림을 병렬 스트림으로 변환

순차 스트림에 parallel 메서드를 호출하면 기존의 함수령 리듀싱 연산(숫자 합계 연산)이 병렬로 처리된다.

public long parallelSum(long n){
	return Stream.iterate(1L, i -> i + 1)
		     .limit(n)
		     .parallel() // 스트림을 병렬 스트림으로 변환
		     .reduce(0L, Long::sum);
}

순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않는다. 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정된다. 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있다. 이 두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있다.

stream.parallel()
      .filter(...)
      .sequential()
      .map(...)
      .parallel()
      .reduce();

parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 위 코드는 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.

병렬 스트림은 내부적으로 ForkJoinPool을 사용한다. 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().avilableProccessors()가 반환하는 값에 상응하는 스레드를 갖는다.

스트림 성능 측정

성능을 최적화할 때는 세 가지 황금 규칙을 기억해야 한다.

  1. 측정
  2. 측정
  3. 측정

반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.

병렬 프로그래밍을 오용(예를 들어 병렬과 거리가 먼 반복 작업)하면 오히려 전체 프로그램의 성능이 더 나빠질 수도 있다.

병렬화를 이용하기 위해선 특화된 메서드를 사용해야 한다. 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문이다. 상황에 따라서는 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다는 사실을 단적으로 보여준다.

멀티코어 간의 데이터 이동은 우리 생각보다 비싸다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다. 그리고 스트림을 병렬화해서 코드 실행 속도를 빠르게 하고 싶으면 항상 병렬화를 올바르게 사용하고 있는지 확인해야 한다.

병렬 스트림의 올바른 사용법

병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다.

public long sideEffectSum(long n){
	Accmulator acc = new Accmulator();
	LongStream.rangeClosed(1, n).forEach(acc::add);
	return acc.total;
}

public class Accmulator{
	public long total = 0;
	public void add(long value) {total += value;}
}

위 코드는 병렬로 실행하면 참사가 일어난다. 특히 total을 접근할 때마다 (다수의 스레드에서 동시에 데이터 접근하는) 데이터 레이스 문제가 일어난다. 동기화로 문제를 해결하다보면 결국 병렬화라는 특성이 없어져 버릴 것이다.

병렬화로 실행하면 여러 스레드에서 동시에 누적자, 즉 total += value를 실행하면서 이런 문제가 발생한다.

병렬 스트림을 사용했을 때 이상한 결과에 당황하지 않으려면 상태 공유에 따른 부작용을 피해야 한다.

java volatile

  • volatile keyword는 java 변수를 Main Memory에 저장하겠다라는 것을 명시하는 것이다.
  • 매번 변수의 값을 Read할 때마다 CPU cache에 저장된 값이 아닌 Main Memory에서 읽는 것이다.
  • 또한 변수의 값을 Write할 때마다 Main Memory에까지 작성하는 것이다. 

volatile 변수를 사용하고 있지 않는 Multi Thread 어플리케이션에서는 task를 수행하는 동안 성능 향상을 위해 Main Memory에서 읽은 변수 값을 CPU cache에 저장하게 된다. 만약에 Multi Thread환경에서 Thread가 변수 값을 읽어올 때 각각의 CPU cache에 저장된 값이 다르기 때문에 변수 값 불일치문제가 발생하게 된다.

Multi Thread 환경에서 하나의 Thread만 read & write하고 나머지 Thread가 read하는 상황에서 가장 최신의 값을 보장한다. 하나의 Thread가 아닌 여러 Thread가 write하는 상황에서는 적합하지 않다.

병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않으면 직접 측정하라.
  • 박싱을 주의하라.
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. (특히 limit나 findFirst처럼 요소의 순서에 의존하는 연산)
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
  • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
  • 스트림을 구성하는 자료구조가 적절한지 확인하라.
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
  • 최종 연산의 병합 과정 비용을 살펴보라.

포크/조인 프레임워크

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

포크(Fork) : 다른 프로세스 혹은 스레트(테스크태스크)를 여러 개로 쪼개서 새롭게 생성

조인(Join) : 포크해서 실행한 프로세스 혹은 스레드(태스크)의 결과를 취합한다.

일을 나눌 수 있다면 여러개로 쪼개서 실행(포크), 하고 최종적으로 실행이 완료된 내용을(조인)해서 결과를 만들어 낸다.

만약 듀얼코어에서 작업을 두 개로 쪼갰는데, 하나의 실행 시간이 10초이고 나머지 하나가 1초라면 전체 시간이 10초가 걸린다. 위 예제야 단순 배열을 분할하므로 반반씩 정확히 나누는게 가능하지만 실제 작업들에서는 단순하지가 않아 각 작업간의 시간 차이가 발생할 가능성이 항상 존재한다. 따라서 코어가 2개라고 해서 2개로 나누면 나머지 하나의 코어는 idle 상태가 되어 놀게 된다.

하지만 듀얼 코어에서 작업을 4개로 쪼갰다면 한 코어가 작업을 끝내고 다른 작업을 찾아 실행하게 된다. 이런 이유 때문에 작업을 작게 쪼갤수록 각 코어에 할당되는 작업의 총량이 균형 있게 배분된다.

포크 조인에서 작은 태스크 하나를 끝내면 다른 큐(Queue)에서 다른 태스크를 가져와서 실행하는데, 이러한 것을 작업 훔치기(work stealing) 이라고 한다.

결론적으로 작업 훔치기라는 메커니즘에 의해 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직하다.

RecursiveTask 활용

스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다. RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.

public class Task extend RecursiveTask<V> {
  @Override
  protected V compute() {
    if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
      순차적으로 태스크 계산
    } else {
	태스크를 두 서브태스크로 분할
	태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
	모든 서브태스크의 연산이 완료될 때까지 기다림
	각 서브태스크의 결과를 합침
    }
  }
}

// 호출시..
ForkJoinTask<V> task = new Task();
new ForkJoinPool().invoke(task);
// RecursiveTask를 상속받아 포크조인 프레임워크에서 사용할 테스크 생성
public class ForkJoinSumCalculator extends RecursiveTask<Long> {  
        
    private final long[] numbers; // 더할 숫자 배열
    private final int start; // 이 서브테스크에서 처리할 배열의 초기 위치와 최종 위치
    private final int end;
    public static final long THRESHOLD = 10_000; // 이 값 이하의 서브테스크는 더 이상 분할 x


    // 메인 테스크를 생성할 때 사용할 공개 생성자
    public ForkJoinSumCalculator(long[] numbers) { 
        this(numbers, 0, numbers.length);
    }

    // 메인 테스트의 서브테스크를 재귀적으로 만들 때 사용할 private 생성자
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start; // 이 테스크에서 더할 배열의 길이
        if (length <= THRESHOLD) {
            // 기준값과 같거나 작으면 순차적으로 결과 계산
            return computeSequentially();
        } 
        // 배열의 첫 번째 절반을 더하도록 서브테스크 생성
        ForkJoinSumCalculator leftTask = 
                new ForkJoinSumCalculator(numbers, start, start+length/2);
        // ForkJoinPool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행
        leftTask.fork();
        // 배열의 나머지 절반 더하는 서브테스크 생성
        ForkJoinSumCalculator rightTask =
                new ForkJoinSumCalculator(numbers, start + length/2, end);
        // 두번째 서브테스크 동기 실행. 추가 분할 발생할 수 있음
        Long rightResult = rightTask.compute();
        // 첫번째 서브테스크 결과를 읽거나 아직 결과가 없으면 기다림
        Long leftResult = leftTask.join();
        // 두 서브테스크의 결과를 조합한 값이 이 테스크의 결과값
        return rightResult + leftResult; 
    }

    // 더 분할할 수 없을 때 서브테스크의 결과를 계산하는 알고리즘
    private Long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

위는 n까지의 자연수 덧셈을 병렬로 수행하는 방법을 직관적으로 보여준다.

long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
new ForkJoinPool().invoke(task);

일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다.

즉, 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다.


포크/조인 프레임워크를 제대로 사용하는 방법

포크/조인 프레임워크는 쉽게 사용할 수 있는 편이지만 항상 주의를 기울여야 한다.

다음은 포크/조인 프레임워크를 효과적으로 사용하는 방법이다.

  • 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.

그렇지 않으면 각각의 서브태스크가 다른 태스크가 끄타길 기다리는 일이 발생하며 원래 순차 알고리즘보다 느리고 복잡한 프로그램이 되어버릴 수 있다.

  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다.

대신 compute나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.

  • 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.

양쪽 작업 모두 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에 fork를 호출하는 것 보다는 compute를 호출하는 것이 효율적이다. 그러면 두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.

  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다.

포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.

  • 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠르지 않다.

각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 빠르다. 그러므로 상황에 맞게 결과를 측정하며 사용해야 한다.


Spliterator

Spliterator는 8버전에서 새롭게 소개된 인터페이스다. 소스의 원소를 traverse하고 partition하기 위한 인터페이스다.

Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화할 것인지 정의한다.

Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다.

자바8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공하고 있다.

public interface Spliterator<T> {
	boolean tryAdvance(Consumer<? super T> action);
	Spliterator<T> trySplit();
	long estimateSize();
	int characteristics();
}

  • forEachRemaning
  • 더이상 splitting이 필요하지 않을 때 특정 action을 하는 함수이다.
  • 기본적으로 각 남아있는 Element에 따라 action을 취할 수 있고, 다 처리될 때 까지 현재 스레드에서 sequential하게 처리된다.
  • 디폴트로 구현된 메소드를 살펴보면 번복적으로 tryAdvance를 호출하여, spliterator 원소를 순차적으로 처리한다.
  • splitting을 수행하는 동안, spliterator는 순차 처리해도 될 정도로 원소의 수가 줄어들면, forEachRemaing 메소드를 통해 순차 처리한다.

Spliterator 특성

Spliterator는 characteristics라는 추상 메서드도 정의한다. Characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.

Parallel Stream은 ForkJoinPool과 Spliterator를 이용하여 동시에 원소를 처리한다.

하지만 순차 처리의 실행시간과 비교 후에 병렬 스트림을 사용하는 것이 현명한 선택이다.

예를 들어 1,000,000개를 처리할 때 병렬로 처리하는게 유의미 할 수 있는데, 5000개 정도면 병렬 스트림은 오버헤드도 있기 때문에 순차 처리가 빠를 수 있다.

https://java-8-tips.readthedocs.io/en/stable/quickintro.html

728x90
반응형

추천 글