Study/Mastering-Spring-5-Study

ch13. 리액티브 프로그래밍(Reactive Programming)

hongeeii 2023. 1. 17.
728x90
반응형

리액티브 프로그래밍

리액티브 프로그래밍은 함수형 프로그래밍을 기반으로 대체 스타일을 제공한다.


함수형 프로그래밍

함수형 프로그래밍은 하나의 프로그래밍 패러다임으로 정의되는 일련의 코딩 접근 방식이며,

자료처리를 수학적 함수의 계산으로 취급하고 상태와 가변 데이터를 멀리하는 프로그래밍 패러다임을 의미한다.

최근의 프로그래밍 패러다임은 크게 아래와 같이 구분할 수 있다.

  • 명령형 프로그래밍 : 무엇(What)을 할 것인지 나타내기보다 어떻게(How) 할 건지를 설명하는 방식
    • 절차지향 프로그래밍 : 수행되어야 할 순차적인 처리 과정을 포함하는 방식 (C, C++)
    • 객체지향 프로그래밍 : 객체들의 집합으로 프로그램의 상호작용을 표현 (C++, java, C#)
  • 선언형 프로그래밍 : 어떻게 할건지(How)를 나타내기보다 무엇(What)을 할 건지를 설명하는 방식
    • 함수형 프로그래밍 : 순수 함수를 조합하고 소프트웨어를 만드는 방식 (클로저, 하스켈, 리스프)

명령형 프로그래밍을 기반으로 개발했던 개발자들은 개발하는 소프트웨어의 크기가 커짐에 따라,

복잡하게 엉켜있는 스파게티 코드를 유지보수하는 것이 매우 힘들다는 것을 깨닫게 되었다.

그리고 이를 해결하기 위해 함수형 프로그래밍이라는 프로그래밍 패러다임에 관심을 갖게 되었다.

함수형 프로그래밍은 거의 모든 것을 순수 함수로 나누어 문제를 해결하는 기법으로,

작은 문제를 해결학디 위한 함수를 작성하여 가독성을 높이고 유지보수를 용이하게 해준다.

[함수형을 적용하지 않은 코드]

public class WordCount {

    private static List<String> WORDS = Arrays.asList("TONY", "a", "hULK", "B", "america", "X", "nebula", "Korea");

    private static Map<String, Integer> wordPrefixFreq() {
        Map<String, Integer> wordCountMap = new HashMap<>();
        String prefix;
        Integer count;

        for (String word : WORDS) {
            prefix = word.substring(0, 1);
            count = wordCountMap.get(prefix);
            if (count == null) {
                wordCountMap.put(prefix, 1);
            } else {
                wordCountMap.put(prefix, count + 1);
            }
        }

        return wordCountMap;
    }

    public static void main(String[] args) {
        final Map<String, Integer> map = wordPrefixFreq();
        map.keySet()
            .forEach(k -> System.out.println(k + ": " + map.get(k)));
    }
}

함수형 프로그래밍을 적용하지 않은 코드에서는 List를 루프를 돌면서 접두사를 잘라내고 그 갯수를 Map에 저장하고 있다.

위의 코드는 최선일 것 같아 보이지만 함수형 프로그래밍 기법을 적용하면 더욱 간결하고 가독성있게 코드를 변경할 수 있다.

[함수형을 적용한 코드]

public class WordCount {
    private static List<String> WORDS = Arrays.asList("TONY", "a", "hULK", "B", "america", "X", "nebula", "Korea");

    private static Map<String, Integer> wordPrefixFreq() {
        Map<String, Integer> wordCountMap = new HashMap<>();
        WORDS.stream()
            .map(w -> w.substring(0, 1))
            .forEach(prefix -> wordCountMap.merge(prefix, 1, (oldValue, newValue) -> (newValue += oldValue)));
        return wordCountMap;
    }

    public static void main(String[] args) {
        final Map<String, Integer> map = wordPrefixFreq();
        map.keySet()
            .forEach(k -> System.out.println(k + ": " + map.get(k)));
    }
}

위의 코드는 stream()을 통해 함수형 프로그래밍을 위한 Stream 객체를 생성하고 있고,

map()을 통해 Stream 객체의 단어들을 prefix로 변형시키고 있다.

그리고 foreach를 통해서 prefix를 보고 map에 값을 추가하고 있다.


마이크로 서비스 아키텍처는 메시지 기반 통신을 선호한다.

리액티브 프로그래밍의 중요한 특징은 이벤트(또는 메시지)를 중심으로 애플리케이션을 구축하는 것이다.

몇 년 전부터 대부분의 애플리케이션에는 다음과 같은 특징이 있다.

  • 수 초 단위의 응답 시간
  • 여러 시간의 오프라인 유지 관리
  • 소량의 데이터

시대가 바뀌면서 오늘 날에는 다음과 같은 특징이 있다.

  • 밀리초 단위의 응답 시간
  • 100% 가용성
  • 데이터 볼륨이 기하 급수적으로 증가

지난 몇 년 동안 이러한 새로운 도전 과제를 해결하기 위해 다양한 접근 방식이 등장했다.

리액티브 프로그래밍은 실제로 새로운 현상은 아니짐나 문제를 성공적으로 해결한 접근법 중 하나다.


리액티브 방식으로 구축된 시스템은 보다 유연하고 느슨하게 연결되며 확장 가능하므로 개발하거나 변경이 쉽다.

이들은 장애에 훨씬 더 관대해 장애가 발생하면 큰 장애를 일으키지 않고 간결하게 대처한다.

리액티브 시스템은 반응이 뛰어나 사용자에게 효과적인 대화식 피드백을 제공한다.


리액티브 시스템의 특성

 

back pressure 

전통적인 접근 방식

전통적인 접근 방식은 조사해서 주가가 변경됐는지 여부를 확인한다.

페이지가 랜더링되면 일정한 간격으로 최신 가격을 확인하기 위해 AJAX 요청을 주가 서비스에 보낸다.

웹 페이지에 주가 변동 정보가 없으므로 주가가 변경됐는지 여부에 관계없이 이러한 호출을 수행해야 한다.

리액티브 접근 방식

리액티브 접근 방식은 발생하는 이벤트에 반응할 수 있도록 관련된 여러 구성요소를 연결하는 것을 포함한다.

주가 웹 페이지가 로드되면 웹 페이지는 주가 서비스의 이벤트를 등록한다.

주가 변동 이벤트가 발생하면 이벤트가 트리거 돼 최신 주가는 웹 페이지에서 업데이트 된다.

리액티브 방식은 일반적으로 세 단계로 이뤄진다.

  1. 이벤트 구독
  2. 이벤트 발생
  3. 가입 취소

주가 웹 페이지가 처음 로드되면 주가 변동 이벤트를 구독한다.

특정 주식에 주가 변동 이벤트가 발생하면 이벤트의 모든 구독자에 대해 새 이벤트가 트리거된다.

웹 페이지가 닫히거나 새로 고쳐지면 구독자가 가입 취소 요청을 보낸다.

전통적인 접근 방식과 리액티브 방식의 비교

전통적인 접근 방식에서는 변경사항을 조사한다.

즉 주가 변동 여부에 관계없이 전체 시퀀스가 매분(또는 지정된 간격)마다 트리거된다.

리액티브 방식에서는 일단 이벤트에 등록하면 주가가 변경될 때만 시퀀스가 트리거된다.

구현

리액티브 스트림

public interface Subscriber<T> {
   public void onSubscribe(Subscription s);
   public void onNext(T t);
   public void onError(Throwable t);
   public void onComplete();
}

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s);
}

public interface Subscription {
   public void request(long n);
   public void cancel();
}
  1. Subscriber가 Publisher에게 subscribe하면 Publisher가 데이터 또는 시퀀스 전달.
  2. 전달하기 전에 Publisher는 Subscribe에 정의된 onSubscribe()를 호출하고, Subscriber는 request(n)를 호출하여 몇 개의 데이터를 보내달라고 요청.
  3. 이때 Subscription을 사용하는데, Request(n)를 호출하여 데이터 전송 요청을 하게 되면 Publisher에서는 0에서 N개의 데이터 또는 시퀀스를 Subscriber에 전달.
  4. 이 과정에서 에러가 발생하면 onError()를 호출, 데이터(시퀀스)전달이 완료가 되면 onComplete()호출.
  5. Subscriber가 Publisher에 Request하는 과정을 보통 Back-Pressure라고 표현하는데, Push하는 데이터(시퀀스)의 흐름을 제어할 수 있다. Request(1)을 호출하면 1개만 보내도록�요청할 수 있고, Request(MAX)를 호출하면 최대값에 해당하는 데이터를 요청.

Publisher

끊임없이 data를 생성한다. Mono/FLux가 Spring Reactor가 구현한 Publisher의 역할로 이해할 수 있다.

Subscriber

data를 요청해서 받아간다. reactive programming의 핵심 중 하나는 back pressure이다.

data를 소비하는 쪽에서 충분히 여유가 있을 때 요청하여 받아가는 형태로 이해할 수 있다.

Subscription

publisher와 subscriber사이에 위치하여 이벤트를 통하여 data를 전달해 주는 역할로 이해할 수 있다.

리액터 프레임워크

Mono : 0~1개의 결과만을 처리하기 위한 Reactor의 객체

Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");

Flux : 0~N개인 여러 개의 결과를 처리하는 객체

 Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
 List<String> iterable = Arrays.asList("foo", "bar", "foobar");
 Flux<String> seq2 = Flux.fromIterable(iterable);
  • Flux는 Iterable한 데이터들을 mono로 바꿔서 나누고 합침으로 해서 Subscriber에게 전달. 즉, mono가 뭉치면 Flux

Mono

  1. Mono의 경우 다수의 item이 없기 때문에 onComplete와 onError 메소드를 제공한다. (onNext는 없음)
  2. Mono를 사용하면 완료 개념만 있는 값이 없는 비동기 프로세스를 나타낼 수 있다. 하나를 만들려면 empty Mono를 사용할 수 있다.
  3. Optional과 유사하고 null을 사용하지 않아야 하기 때문에 Void를 제공하여 empty 객체를 쓰게 한다.

Flux

  1. 첫 줄의 Flux는 Timeline을 의미하고 왼쪽에서 오른쪽으로 시간이 흐른다.
  2. 첫 줄의 동그라미 3개는 Flux에서 방출된 아이템이다.
  3. 첫 줄의 마지막 수직 라인은 Flux가 성공적으로 종료되었음을 나타낸다.
  4. 점선과 중간의 box는 Flux에 변환이 적용되고 있음을 나타내며 박스 안의 본문은 해당 변환이 어떤 것인지 나타낸다.
  5. 마지막 줄은 변환된 결과 Flux 이다.
  6. 만약 변환 과정 중 어떤 이유로 인해 Flux가 중단된 경우 수직 라인 대신 X로 표시를 한다.

Mono를 사용해 하나의 요소 방출

    @Test
    void test() throws InterruptedException {
        Mono<String> stubMonoWithADelay = Mono.just("Ranga")
            .delayElement(Duration.ofSeconds(5));

        stubMonoWithADelay.subscribe(t -> log.info(t));

        Thread.sleep(10000);
    }

위의 코드는 5초후 하나의 요소를 방출한다.

람다식을 사용하는 대신 Consumer를 명시적으로 정의할 수 있다.

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

accept를 재정의해서 사용한다.

class MonoTest implements Consumer<String> {

    @Override
    public void accept(String t) {
        log.info("Received {} at {}", t, new Date());
    }

    @Test
    void monoExample() throws InterruptedException {
        Mono<String> stubMonoWithDelay = Mono.just("Ranga")
            .delayElement(Duration.ofSeconds(5));

        stubMonoWithDelay.subscribe(new MonoTest());

        Thread.sleep(10000);
    }
}

  • stubMonoWithDelay.subscribe(new MonoTest()) : 이벤트를 구독하기 위해 MonoTest의 인스터스를 생성한다.

다음 그림처럼 출력된다.

모노나 플럭스의 이벤트를 수신하는 구독자를 여러 명 보유할 수 있다.

다음 코드는 추가 구독자를 만드는 방법을 보여준다.

public class WelComeTest implements Consumer<String> {

    @Override
    public void accept(String t) {
        log.info("Welcome {}", t);
    }

    @Test
    void MonoExample() throws InterruptedException {
        Mono<String> stubMonoWithDelay = Mono.just("Ranga")
            .delayElement(Duration.ofSeconds(5));

        stubMonoWithDelay.subscribe(new MonoTest());
        stubMonoWithDelay.subscribe(new WelComeTest());

        Thread.sleep(10000);
    }
}
  • stubMonoWithDelay.subscribe(new WelComeTest()) : WelcomeTest의 인스터스를 Mono의 이벤트 구독자로 추가한다.

출력은 다음 그림처럼 표시된다.

Flux를 사용해 여러 요소 방출

[한 명의 구독자가 있는 Flux]

@Test
    void simpleFluxStream() {
        Flux<String> stubFluxStream = Flux.just("Jane", "Joe");
        stubFluxStream.subscribe(new MonoTest());
    }
    
  • Flux.just("Jane", "Joe") : Flux.just 메소드를 사용해 Flux를 생성한다.
  • stubFluxStream.subscribe(new MonoTest()) : Flux에 구독자로 MonoTest 인스턴스를 등록한다.

[두 명의 구독자가 있는 Flux]

private static List<String> streamOfNames = Arrays.asList("Ranga", "Adam", "Joe", "Doe", "Jane");

    @Test
    void fluxStreamWithDelay() throws InterruptedException {
        Flux<String> stubFluxWithNames = Flux.fromIterable(streamOfNames)
            .delayElements(Duration.ofMillis(1000));
        stubFluxWithNames.subscribe(new MonoTest());
        stubFluxWithNames.subscribe(new WelComeTest());
        Thread.sleep(10000);
    }

  • Flux.fromIterable(streamOfNames).delayElements(Duration.ofMillis(1000)) : 지정된 문자열 리스트에서 Flux를 생성한다. 요소는 지정된 시간으로 지연 방출된다.

스프링 웹 리액티브

스프링 리액티브 컨트롤러는 스프링 MVC 컨트롤러 생성과 매우 비슷한 방법으로 생성할 수 있다.

@RestController
public class StockPriceEventController {
    @GetMapping("/stocks/price/{stockCode}")
    Flux<String> retrieveStockPriceHardcoded(@PathVariable("stockCode") String stockCode) {
        return Flux.interval(Duration.ofSeconds(5))
            .map(i -> getCurrentDate() + " : " + getRandomNumber(100, 125))
            .log();
    }

    private int getRandomNumber(int min, int max) {
        return ThreadLocalRandom.current()
            .nextInt(min, max + 1);
    }

    private String getCurrentDate() {
        return (new Date()).toString();
    }
}

/////////////////////// javaScript

    function registerEventSourceAndAddResponseTo(uri, elementId) {
        var stringEvents = document.getElementById(elementId);
        var stringEventSource = new EventSource(uri);
        stringEventSource.onmessage = function (e) {
            var newElement = document.createElement("li");
            newElement.innerHTML = e.data;
            stringEvents.appendChild(newElement);
        }
    }

    function addEvent(evnt, elem, func) {
        if (typeof (EventSource) !== "undefined") {
            elem.addEventListener(evnt, elem, func)
        }
        else {
            elem[evnt] = func;
        }
    }

    function add() {
        console.log("확인");
        registerEventSourceAndAddResponseTo("/stocks/price/IBM", "display");
    }

[뷰]

[로그]

브라우저 창이 닫히면 cancel() 메소드가 호출돼 스트림이 종료된다.

리액티브 프로그래밍의 이점을 충분히 활용하려면 엔드-투-엔드 통신이 리액티브해야 한다. 즉, 이벤트 스트림을 기반으로 해야 한다.

End-to-End

RestTemplate, WebClient

스프링 어플리케이션에서 HTTP 요청할 때 사용하는 방법으로 RestTemplate과 WebClient가 있다.

스프링 5.0 이전까지는 클라이언트에서 HTTP 접근을 위해 사용한 것은 RestTemplate 이었다.

스프링 5.0 에서 WebClient가 나왔고 현재는 WebClient를 사용하기를 권고하고 있다.

RestTemplate

RestTemplate 이란? 스프링 3.0 에서 부터 지원하는 RestTemplate은 HTTP 통신에 유용하게 쓸 수 있는 템플릿이다.

REST 서비스를 호출하도록 설계되어 HTTP 프로토콜의 메소드 (GET, POST, DELETE, PUT)에 맞게 여러 메소드를 제공한다.

RestTemplate은 다음과 같은 특징이 있다.

RestTemplate 특징

  • 통신을 단순화하고 RESTful 원칙을 지킨다.
  • 멀티쓰레드 방식을 사용
  • Blocking 방식을 사용

Thread pool은 요청자 어플리케이션 구동시에 미리 만들어 놓는다.

Request는 먼저 Queue에 쌓이고 가용한 쓰레드가 있으면 그 쓰레드에 할당되어 처리된다.

즉, 1요청 당 1쓰레드가 할당된다.

각 쓰레드에서는 Blocking방식으로 처리되어 응답이 올 때까지 그 쓰레드는 다른 요청에 할당될 수 없습니다.

Rest Template 동작원리 

출처: https://ddoriya.tistory.com/entry/RestTemplate-VS-WebClient [또리야 개발하자]

  1. 어플리케이션이 RestTemplate를 생성하고 URI, HTTP 메소드 등의 헤더를 담아 요청한다.
  2. RestTemplate은 HttpMessageConverter를 사용하여 requestEntity를 요청메시지로 변환한다.
  3. RestTemplate은 ClientHtttpRequestFactory로 부터 ClientHttpRequest를 가져와서 요청을 보낸다.
  4. ClientHttpRequest는 요청메시지를 만들어 HTTP 프로토콜을 통해 서버와 통신한다.
  5. RestTemplate은 ResponseErrorHandler로 오류를 확인하고 있다면 처리로직을 태운다.
  6. ResponseErrorHandler는 오류가 있다면 ClientHttpResponse에서 응답데이터를 가져와서 처리한다.
  7. RestTemplate은 HttpMessageConverter를 이용해서 응답메시지를 java object로 변환한다.
  8. 어플리케이션에 반환된다.

RestTemplate 사용

@Configuration
public class RestTemplateClient {

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }
}

RestTemplate을 생성할 때는 builder를 통하여 생성해 줄 수 있다.

builder를 통해 생성하고 스프링 빈으로 사용할 수 있도록 설정해준다.

public class RestTemplateTestClass {
    @Autowired 
    RestTemplate restTemplate;

    public TestClass(RestTemplate restTemplate){
            this.restTemplate = restTemplate;
    }

    public String getSthFromServer(){
        return restTemplate.getForObject("https://example.com",String.class);
    }
}

RestTemplate을 사용하기 위해서는 restTemplate.메소드명() 을 사용하면 된다.

// RestTemplate의 처리방식 예제 코드
// 요청 당 20개의 RestTemplate client를 만들고, 최대 50개까지 증가할 수 있다.

@Configuration
public class RestTemplateConfig {
	public RestTemplate getRestTemplate(int defaultMaxPerRoute, int maxTotal) {
		PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
		connManager.setDefaultMaxPerRoute(defaultMaxPerRoute);
		connManager.setMaxTotal(maxTotal);

		HttpClient client = HttpClientBuilder.create().setConnectionManager(connManager).build();

		HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(client);
		factory.setConnectTimeout(3000);
		factory.setReadTimeout(3000);

		return new RestTemplate(factory);

	}

	@Bean
	public RestTemplate coffeeRestTemplate() {
		return getRestTemplate(20, 50);
	}
}

요청을 처리할 쓰레드가 있으면 아무런 문제가 없지만, 쓰레드가 다 차는 경우 이후의 요청은 Queue에 대기하게 된다.

대부분의 문제는 네트워킹이나 DB와의 통신에서 생기는데 이런 문제가 여러 쓰레드에서 발생하면

가용한 쓰레듯수가 현저하게 줄어들게 되고, 결국 전체 서비스는 매우 느려지게 된다.

WebClient

Web Client란? WebClient는 스프링 5.0에서 추가된 인터페이스다.

스프링 5.0 이전에는 비동기 클라이언트로 AsyncRestTemplate를 사용했었다.

하지만 스프링 5.0 이후부터는 Deprecated 되었다.

스프링 5.0 이후는 WebClient 사용을 권장한다.

WebClient는 다음과 같은 특징이 있다.

WebClient 특징

  • 싱글 스레드 방식을 사용
  • Non-Blocking 방식을 사용
  • JSON, XML을 쉽게 응답받는다.

각 요청은 Event Loop내에 Job으로 등록이 된다.

Event Loop는 각 Job을 제공자에게 요청한 후, 결과를 기다리지 않고 다른 Job을 처리한다.

Event Loop는 제공자로부터 CallBack으로 응답이 오면, 그 결과를 요청자에게 제공한다.

WebClient는 이렇게 이벤트에 반응형으로 동작하도록 설계됐다.

그래서 반응성, 탄력성, 가용성, 비동기성을 보장하는 Spring React 프레임워크를 사용한다.

또한, React Web 프레임워크인 Spring WebFlux에서 Http Client로 사용된다.

Web Client 사용

  1. Static Factory Methods
  • WebClient.create()
  • WebClient.create(String baseUrl)
WebClient webClient = WebClient.create();

WebClient wClient = WebClient.create("http://localhost:9011");
  1. WebClient.builder() Builder 사용
  • uriBuilderFactory : base URL을 사용하기 위한 Customized UriBuilderFactory
  • defaultUriVariables : URI 템플릿을 확장할 때, 사용하는 기본 값
  • defaultHeader : 모든 요청에 대한 기본 헤더
  • defaultCookie : 모든 요청에 대한 쿠키 설정
  • defaultRequest : 모든 요청을 customize하기 위한 Consumer
  • filter : 모든 요청에 대한 Client filter
  • exchangeStrategies: HTTP message reader/writer customization
  • clientConnector : HTTP client library settings
WebClient webClient2 = WebClient.builder()
       .baseUrl("http://localhost:9011")
       .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
       .build();

Default 값이나 Filter 또는 ConnectionTimeOut 같은 값을 지정하여 생성하기 위해서는 Builder 클래스를 통해 생성하는 것이 좋다.

  • WebClient.create()
    • 이렇게 직접 만들면, Spring auto config 적용 받지 않는다.
    • 따라서 application.properties 설정도 적용되지 않는다.
  • WebClient.Builder
    • Spring Boot는 WebClient를 기본 Bean으로 제공하고 있지 않음.
    • 그렇다면 올바른 방법은 Spring으로 부터 WebClient.Bulider를 DI 받아 WebClient를 각 컴포넌트에서 생성.
    • Spring auto config 적용된다.
    • 내부적으로는 Spring 전역 ObjectMapper 사용하게 된다.

RestTemplate과 WebClient의 차이

RestTemplate과 WebClient의 가장 큰 차이점은 Non-Blocking과 비동기화 가능 여부일 것이다.

결국 이러한 차이점이 스프링에서 RestTemplate을 사용하는 것보다 WebClient의 사용을 권장하는 이유라고 생각한다.

성능비교

Boot1은 RestTemplate을 사용하고 Boot2는 WebClient를 사용한다.

동시 사용자가 1000명 까지는 처리속도가 거의 비슷하지만 그 이후에서는 Boot1이 급격하게 느려지는 것을 볼 수 있다.

동시 사용자의 규모가 어느정도 있다면 WebClient를 선택하는 것이 바람직하다.

Spring Document에서는 동기 방식이 코드 작성, 이해, 디버깅하기 더 쉽다고 한다.

즉, 높은 생산성을 가진다는 말과 같은 것으로 보인다.

그렇기에 이해타산을 잘 따져서 선택해야 할 필요가 있다.

https://happycloud-lee.tistory.com/220

728x90
반응형

추천 글