Study/Java

webflux 기초 실습 (with - kakao tech)

hongeeii 2023. 12. 11.
728x90
반응형

사용하면서 알게 된 Reactor, 예제 코드로 살펴보기 – tech.kakao.com

 

사용하면서 알게 된 Reactor, 예제 코드로 살펴보기

Reactor는 Pivotal의 오픈소스 프로젝트로, JVM 위에서 동작하는 논블럭킹 애플리케이션을 만들기 위한 리액티브 라이브러리입니다. Reactor는 RxJava 2와 함께 Reactive Stream의 구현체이기도 하고, Spring Fra

tech.kakao.com

위 링크를 보고 실습했습니다.  

 

 

바구니 속 과일 종류(중복 없이) 및 각 종류별 개수 나누기

FruitInfo가 주어짐.

  final List<String> basket1 =
                Arrays.asList(new String[] {"kiwi", "orange", "lemon", "orange", "lemon", "kiwi"});
  final List<String> basket2 =
                Arrays.asList(new String[] {"banana", "lemon", "lemon", "kiwi"});
  final List<String> basket3 = Arrays
                .asList(new String[] {"strawberry", "orange", "lemon", "grape", "strawberry"});
  final List<List<String>> baskets = Arrays.asList(basket1, basket2, basket3);
  final Flux<List<String>> basketFlux = Flux.fromIterable(baskets);

class FruitInfo {
    private final List<String> distinctFruits;
    private final Map<String, Long> countFruits;

    public FruitInfo(List<String> distinctFruits, Map<String, Long> countFruits) {
        this.distinctFruits = distinctFruits;
        this.countFruits = countFruits;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;

        FruitInfo fruitInfo = (FruitInfo) o;

        if (distinctFruits != null ? !distinctFruits.equals(fruitInfo.distinctFruits)
                : fruitInfo.distinctFruits != null)
            return false;
        return countFruits != null ? countFruits.equals(fruitInfo.countFruits)
                : fruitInfo.countFruits == null;
    }

    @Override
    public int hashCode() {
        int result = distinctFruits != null ? distinctFruits.hashCode() : 0;
        result = 31 * result + (countFruits != null ? countFruits.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        return "FruitInfo{" +
                "distinctFruits=" + distinctFruits +
                ", countFruits=" + countFruits +
                '}';
    }
}

 

1번 방법

WebFlux API만 썻지, 모두 다 MAIN Thread에서 동기, 블락킹 방식으로 동작하는 것을 볼 수 있다.

=> spring5의 webFlux에서 제공하는 webClient를 사용하면 다른 스레드로 이미 바뀌어 비동기로 동작

 

* concatMap, flatMapSequential -> 순서 보장, flatMap -> 순서 보장 안됌

 

 

 

2번 방법 - 병렬 스케줄러

subscribeOn으로 스케줄러 전환하기

 

.subscribeOn(Schedulers.parallel()을 사용해서 CPU 코어 개수만큼 워커로 만들어 병렬로 실행

추가로 Main스레드가 끝나면, parallel 스케줄러는 데몬스레드라 일을 끝내지 못하고 종료되기 때문에 CountDownLatch를 이용하여 await()으로 main스레드를 기다리도록 함.

 

 

 

3번 방법 - basket당 하나의 스트림만 공유하며 과일 종류와 개수 뽑아내기

Flux.fromIterable(basket

distinctFruits와 countFruitsMono 모두 Flux.fromIterable(basket)에서 출발하기 때문에 병렬로 동작해도 baskets를 각각 순회하여 중복되는 동작이다.  

basket을 순회하는 공통작업을 하나의 스트림에서 하는 방법!!

 

Hot, Cold 개념

cold는 Flux나 Mono를 subscribe호출 전까지 아무런 동작을 하지 않고, subscribe를 호출하면 새로운 데이터를 생성한다.

 

지금까지 예제에서는 basket으로부터 값을 꺼내어 각각 따로 새로운 데이터를 생성하기 떄문에 중복된 작업을 새로 시작하게 동작된다.  

 

Hot은 구독하기 전부터 데이터의 스트림이 동작한다. 

Hot에 해당하는 스트림을 여러곳에서 구독을 하면, 현재 스트림에서 나오는 값을 구독하는 구독자들은 동일하게 받을 수 있다. 

 

Cold->Hot으로 바꾸는 연산자가 있다.

Cold에서 Hot으로 바꾸면 구독여부와 상관없이 값을 생성안하다가 특정 시점에 값을 생성하도록 제어하여 구독하는 구독자들이 동일한 값을 받을 수 있도록 할 수 있다.  

 

Connectable Flux

Connectable Flux는 Cold에서 Hot으로 바꾸기 위해 Connectable Flux로 변환하는 과정이 필요하다.

'publish' 라는 연산자를 호출하면 바꿀 수 있다.  

이렇게 변환된 Flux에서 connect()라는 메서드를 호출하면, 여러 구독자들이 Connectable Flux를 구독한 후 값을 생성하여 각 구독자에게 보내기 시작하게 하는 메서드이다.  

 

즉, 예제에서 distinctFruits와 countFruitsMono가 구독을 모두 완료한 후에, connect()를 호출하게 하면 된다.

  • autoConnect() : 최소 구독하는 구독자의 개수를 지정해서 자동으로 connect()호출
  • refCount() : autoConnect + 하나도 구독하는 곳이 없으면 기존 소스의 스트림도 구독 해제

 

 Flux<FruitInfo> concatMap = basketFlux.concatMap(basket -> {
            final Flux<String> source = Flux.fromIterable(basket).log().publish().autoConnect(2);
            final Mono<List<String>> distinctFruits = source.distinct().collectList();
            final Mono<Map<String, Long>> countFruitsMono = source
                    .groupBy(fruit -> fruit) // 바구니로 부터 넘어온 과일 기준으로 group을 묶는다.
                    .concatMap(groupedFlux -> groupedFlux.count()
                            .map(count -> {
                                final Map<String, Long> fruitCount = new LinkedHashMap<>();
                                fruitCount.put(groupedFlux.key(), count);
                                return fruitCount;
                            }) // 각 과일별로 개수를 Map으로 리턴
                    ) // concatMap으로 순서보장
                    .reduce((accumulatedMap, currentMap) -> new LinkedHashMap<String, Long>() { {
                        putAll(accumulatedMap);
                        putAll(currentMap);
                    }}); // 그동안 누적된 accumulatedMap에 현재 넘어오는 currentMap을 합쳐서 새로운 Map을 만든다. // map끼리 putAll하여 하나의 Map으로 만든다.
            return Flux.zip(distinctFruits, countFruitsMono, (distinct, count) -> new FruitInfo(distinct, count));
        });


        concatMap.subscribe(
                System.out::println,  // 값이 넘어올 때 호출 됨, onNext(T)
                error -> {
                    System.err.println(error);
                    countDownLatch.countDown();
                }, // 에러 발생시 출력하고 countDown, onError(Throwable)
                () -> {
                    System.out.println("complete");
                    countDownLatch.countDown();
                } // 정상적 종료시 countDown, onComplete()
            );
            log.info("end");
    }

위처럼 autoConnect(2)를 사용하니, 한 번만 공통 소스를 읽는 것을 확인하였다.  

=> 한번만 구독하도록 한다면, 아무일이 일어나지 않음.

모두 main스레드에서 실행이 되었다.  

 

Hot이후에 각 스트림만 비동기로 처리하기

.subscribeOn(Schedulers.parallel())를 사용하여 비동기 처리를 하면

처음엔 parallel-1과 parallel-2로 동작하는 듯했다가 결국 source는 parallel-1에서 실행.

2개의 구독자가 구독을 한 후 source가 parallel-1에서 시작되니 그 이후 동작도 다 parallel-1에서 실행되는 것. subscribeOn으로 스케줄러를 지정하여 스위칭이 되면 이후 스트림은 계속 그 스케줄러에 의해 동작된다.

 

 

728x90
반응형

추천 글