Modern Java Ch17. Reactive Programming 이해하기
java observer 패턴 이해하기
https://futurecreator.github.io/2018/06/04/java-observer-pattern/
수년 전까지 대규모 애플리케이션은 수십 대의 서버 , 기가바이트의 데이터, 수초의 응답 시간, 몇 시간의 유지보수 시간 등의 특징을 가졌다.
오늘날에는 다음과 같은 적어도 세 가지 이유로 상황이 변하고 있다.
- 빅 데이터 : 보통 빅데이터는 페타바이트 단위로 구서오디며 매일 증가.
- 다양한 환경 : 모바일 디바이스에서부터 수천개의 멀티코어 프로세서로 실행되는 클라우드 기반 클러스터에 이르기까지 다양한 환경에 애플리케이션 배포
- 사용 패턴 : 1년 내내 항상 서비스를 이용할 수 있으며, 밀리초 단위의 응답 시간을 기대
예전 소프트웨어 아키텍처로는 이런 요구사항을 만족시킬 수 없었다.
인터넷 트래픽을 가장 많이 일으키는 디바이스가 모바일인 요즘은 이런 양상이 두드러지고 있으며 사물인터넷(IOT) 같은 기기들로 가까운 미래에는 상황이 더욱 심화될 것이다.
※ IOT: 다른 사물과 데이터를 송수신할 수 있는 센서와 소프트웨어, 기타 기술을 장착하고 서로 연결된 사물
리액티브 프로그래밍에서는 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리하고 합처서 이런 문제를 해결. 이런 패러다임에 맞게 설계된 애플리케이션은 발생한 데이터항목을 바로 처리함으로써 사용자에게 높은 응답성 제공.
리액티브 매니패스토
리액티브 매니패스토(https://www.reactivemanifesto.org/)는 리액티브 애플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의.
- 반응성(responsive)
- 리액티브 시스템은 빠를 뿐 아니라 더 중요한 특징으로 일정하고 예상할 수 있는 반응 시간 제공.
- 사용자에게 신뢰성 있는 빠른 응답을 제공하는 것을 의미
- 회복성(resilient)
- 장애가 발생해도 시스템은 반응해야한다.
- 회복성을 달성할 수 있는 다양한 기법 제시
- 회복력이 있다는 것은 장애가 발생하더라도 , 부분적으로 고장이 나더라도 전체가 고장 나지 않고 빠르게 복구하는 능력을 의미한다.
- 탄력성(elastic)
- 애플리케이션의 생명주기 동안 다양한 작업 부하로 반응성이 위협받을 수 있다.
- 무서운 작업 부하가 발생하면 자동으로 관련 컴포넌트에 할당된 자원 수를 늘림.
- 메시지 주도(Message-driven)
- 회복성과 탄력성을 지원하려면 약한 결합, 고립, 위치 투명성등을 지원할 수 있도록 시스템을 구성하는 컴포넌트의 경계를 명확하게 정의해야함
- 비동기 메시지를 전달해 컴포넌트끼리의 통신이 이루어짐
- 회복성(장애를 메시지로 처리)과 탄력성(주고 받은 메시지의 수를 감시하고 메시지의 양에 따라 적절하게 리소스를 할당)을 얻을 수 있다.
- 비동기 메시지 전달을 통해 위치 투명성 느슨한 결합 ,논 블로킹 통신을 지향
4가지 요소는 모두 reactive (반응이 빠른) 시스템을 만들기 위한 요소들이고 각 요소들은 상호 보완적이다.
애플리케이션 수준의 리액티브
애플리케이션 수준 컴포넌트의 리액티브 프로그래밍의 기능은 비동기로 작업을 수행할 수 있다는 점이다.
자바 개발자에게 동시성 = 많은 쓰레드 였다.
쓰레드 별로 다른 일을 하도록하면 쓰레드 갯수만 늘리면 동시에 여러 일을 처리하게 할 수 있었으니까.
그런데 시스템이 점점 분산되고(MSA) API 호출, 데이터 액세스등의 이유로 IO 수행시간이 늘어났다.(=쓰레드 점유)
많은 쓰레드로 해결을 할 때는 몇가지 문제가 있다.
CPU와 메모리가 충분해도 쓰레드가 부족하면 처리율이 내려가고, 쓰레드를 늘리면 CPU와 메모리에 엄청난 부하가 간다.
쓰레드를 변경할 때 사용되는 비용이 CPU 에 부하를 주기때문에 이 역시 문제다. 쓰레드는 그래서 상대적으로 비싸고 희귀한 자원이다.
리액티브 프레임워크와 라이브러리는 쓰레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프등과 공유하고 처리할 이벤트를 변환하고 관리한다. 이 기술은 쓰레드보다 가볍다 🕊 !
거기에 별도로 지정된 스레드 풀에서 블록 동작을 실행시켜서, 스레드 블록의 문제를 해결한다.
메인 풀의 모든 스레드는 방해받지 않고 실행되므로 가장 최적의 상황에서 동작할 수 있다.
비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류.
시스템 수준의 리액티브
리액티브 시스템은 여러 애플리케이션이 하나의 일관적이며 회복이 가능한 플랫폼을 구성해주는 아키텍처를 말한다.
시스템 수준에서는, 애플리케이션을 조립하고 상호 소통을 조절한다. 이런 과정에는 메시지 주도 (message-driven) 이 사용된다.
메시지는 정의된 목적지 하나만 바라보고 가는 반면, 이벤트는 옵저버들이 모두 수신한다는게 다른점이다.
리액티브 시스템에서는 수신자, 발신자가 수신 메시지와 발신 메시지와 강하게 결합하지 않고, 독립적인 구조를 유지하도록 메시지를 비동기로 처리한다.
그래야 시스템이 (장애로부터의) 회복성 ,(높은 부하로부터의) 탄력성 에서도 반응성을 유지할 수 있다.
시스템에서 장애가 발생했을 때, 리액티브 시스템은 성능이 저하되는 것이 아니라 문제를 완전히 고립시켜서 시스템을 복구한다.
예를 들어 에러 전파를 방지하고, 메시지 방향성을 바꾸어 다른 컴포넌트로 보내는 등 감독자 역할을 수행해서 문제를 고립시킬 수 있다.
이렇게 하여, 컴포넌트 자체로 문제가 한정되고, 외부로는 안정성을 보장할 수 있다.
회복성은 고립, 비결합 이 핵심이다.
그리고 탄력성의 핵심은 위치 투명성이다 위치 투명성은 리액티브 시스템의 모든 컴포넌트가 다른 모든 서비스와 통신할 수 있음을 의미한다.
위치에 상관없이 모두 서로 통신이 가능하기때문에 시스템을 복제할 수 있으며, 작업 부하에 따라 자동으로 애플리케이션을 확장할 수 있다.
리액티브 스트림과 플로 API
리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다.
리액티브 스트림은 무한의 비동기 데이터를 순서대로, 블락하지 않는 역압력을 전재해 처리하는 표준기술.
역압력이란 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자가 이벤트를 제공하는 속도보다 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 것.
부하가 발생한 컴포넌트는 이벤트 발생속도를 늦추라고 알리거나, 얼마나 많은 이벤트를 수신할 수 있는지 알리거나,
다른 데이터를 받기전에 기존의 데이터를 처리하는 데 얼마나 시간이 걸리는지를 업스트림 발행자에게 알릴수 있어야한다.
스트림 처리의 비동기적인 특성상 역압력 기능의 내장은 필수이다.
실제 비동기 작업이 실행되는동안 암묵적으로 블록API로 인해 역압력이 제공되는 것인다, 그 작업이 완료될 때까지 다른 유용한 작업을 실행할 수 없으므로 기다리면서 많은 자원을 낭비하게 된다.
반면 비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만, 다른 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성도 생긴다.
따라서 역압력이나 제어 흐름 기법을 이용해 이런 상황을 방지해야한다.
이런 기법은 데이터 수신자가 스레드를 블록하지 않고도 데이터 수신자가 처리할 수 없을 만큼 많은 데이터를 받는 일을 방지하는 프로토콜을 제공한다.
넷플릭스, 레드햇, 트웨터 등 리액티브 스트림 프로젝트에서는 모든 리액티브 스트림 구현이 제공해야 하는 최소 기능집합을 네 개의 관련 인터페이스로 정의했다. java9의 새로운 java.util.concurrent.Flow 클래스, 리액터(Reactor, 피보탈), RxJava(넷플릭스) 등 많은 서드 파티 라이브러리에서 구현한다.
Flow 클래스 소개
리액티브 스트림 프로그래밍 발행-구독 모델을 지원할 수 있도록 Flow클래스는 중첩된 인터페이스 네 개를 포함한다.
- Publisher
- Subscriber
- Subsciption
- Processor
Publisher가 항목을 발행하면 Subscriber가 한 개씩 또는 한 번에 여러 항목을 소비하는데 Subscription이 이 과정을 관리할 수 있도록 제공한다.
Publisher는 수많은 일련의 이벤트를 제공할 수있지만 Subscriber의 요구사항에 따라 역압력 기법에 의해 이벤트 제공 속도가 제한된다.
SubScriber는 Publisher가 발행한 이벤트의 리스너로 자신을 등록할 수 있다.
Subscription은 Publisher와 Subscriber 사이의 제어흐름, 역압력을 관리한다.
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
이들 이벤트는 다음 프로토콜에서 정의한 순서로 지정된 메서드 호출을 통해 발행되어야한다.
onSubscribe onNext* (onError | onComplete)
onSubscribe 메서드가 항상 처음 호출되고 이어서 onNext가 여러번 호출될 수 있음을 의미.
이벤트 스트림은 영원히 지속되거나 onComplete 콜백을 통해 더 이상의 데이터가 없고 종료됨을 알릴 수 있으며,
Publisher에 장애가 발생했을 때는 onError를 호출할 수 있다.
subScriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달한다.
Subscription의 request메서드는 Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알릴 수 있다.
cancel메서드는 Subscription을 취소, Publisher에게 더 이상 이벤트를 받지 않음을 통지한다.
Processor 인터페이스는 단지 Publisher와 Subscriber을 상속받을 뿐 아무 메서드도 추가하지 않는다.
이 인터페이스는 리액티브 스트림에서 처리하는 이벤트의 변환 단계를 나타냄.
Processor가 에러를 수신하면 이로부터 회복하거나(Subscription은 취소로 간주) 즉시 onError신호로 모든 Subscriber에 에러를 전파할 수 있다.
Subscriber가 Subscription을 취소하면 Processor는 자신의 업스트림 Subscription도 취소함으로 취소 신호를 전파해야한다.
java9 플로 API에서는 Subscriber 인터페이스의 모든 메서드 구현이 Publisher를 블록하지 않도록 강제하지만 이들 메서드가 이벤트를 동기적으로 처리해야하는지 아니면 비동기적으로 처리해야하는지는 지정하지 않는다.
하지만 이들 인터페이스에 정의된 모든 메서드는 void를 반환하므로 온전히 비동기 방식으로 이들 메서드를 구현할 수 있다.
이들 인터페이스 규칙
- Publishers는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달해야한다.
- Subscriber는 요소를 받아 처리할 수 있음을 Publisher에 알려야한다. 이런 방식으로 역압력을 행사할 수 있다. onComplete나 onError를 처리하는 상황에서 Subscriber는 Publisher나 Subscription의 어떤 메서드도 호출할 수 없으며 Subscription이 취소되었다고 가정해야한다.
Subscriber는 Subscription의 request 메서드 호출이 없어도 언제든 종료 시그널을 받을 준비가 되어있어야 하며 cancel 메서드가 호출된 이후에라도 한 개이상의 onNext를 받을 준비가 되어있어야한다. - Publisher와 Subscriber는 정확하게 Subscription을 공유해야하며 각각의 고유한 역할을 수행해야한다.
그러려면 onNext메서드에서 Subscriber는 request메서드를 동기적으로 호출할 수 있어야한다.
cancel 메서드는 몇 번을 호출해도 한 번 호출한 것과 같은 효과를 가져야하며, 여러 번 이메서드를 호출해도 다른 추가 호출에 별 영향이 없도록 스레드에 안전해야한다.
플로 API에서 정의하는 인터페이스를 구현한 애플리케션의 생명주기
첫 번째 리액티브 애플리케이션 만들기
Flow 클래스에 정의된 인터페이스 대부분은 직접 구현하도록 의도된 것이 아니다.
java9라이브러리는 이 인터페이스를 구현하는 클래스를 제공하지 않는다.
인터페이스를 직접 구현해보면서 프로그래밍의 구조를 만드는데 도움이 될 순있지만 더 빨리 구현하는데는 적합하지 않다.
API만들 당시 Akka, RxJava등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 때문이다.
같은 발행-구독 사상에 기반해 구현한 것이 아닌 이들 라이브러리는 독립적으로 개발되었고 서로 다른 이름 규칙과 API를 사용했다.
JAVA9의 표준화 과정에서 java.util.concurrent.Flow 인터페이스를 기반으로 리액티브 개념을 구현하도록 진화한 것이다.
Flow명세는 이들 라이브라리가 준수해야 할 규칙과 서로 협동하고 소통할 수 있는 공용어를 제시한다.
- 목표 ! :: java9 Flow API를 직접 이용하는 애플리케이션을 개발하면서 어떻게 동작하는지 확인해보자.
- TempInfo : 원격 온도계를 흉내냄(0에서 99 사이의 화씨 온도를 임의로 만들어 연속적으로 보고)
- TempSubscriber : 레포트를 관찰하면서 각 도시에 설치된 센서에서 보고한 온도 스트림 출력
현재 보고된 온도를 전달하는 자바 빈
public class TempInfo {
public static final Random rand = new Random();
private final String town;
private final int temp;
public TempInfo(String town, int temp) {
this.temp = temp;
this.town = town;
}
public static TempInfo fetch(String town) {
if (rand.nextInt(10) == 0) {
throw new RuntimeException("Error!"); //10분의 1확률로 온도가져오기 작업 실패
}
return new TempInfo(town, rand.nextInt(100));
}
@Override
public String toString() {
return town + " : " + temp;
}
public int getTemp() {
return temp;
}
public String getTown() {
return town;
}
}
간단한 도메인 모델을 정의한 다음에는 Subscriber가 요청할 때마다 해당 도시의 온도를 전송하도록 Subscrition 구현
Subscriber에게 TempInfo를 전송하는 Subscription
public class TempSubscription implements Subscription {
private final Subscriber<? super TempInfo> subscriber;
private final String town;
// ? super TempInfo : TempInfo의 조상클래스만 가능
// ? extends TempInfo : TempInfo를 상속받은 클래스만 가능
public TempSubscription(Subscriber<? super TempInfo> subscriber, String town) {
this.subscriber = subscriber;
this.town = town;
}
@Override
public void request(long n) {
for (long i = 0L; i < n; i++) { // Subscriber가 만든 요청을 한개씩 반복
try {
subscriber.onNext(TempInfo.fetch(town));
} catch (Exception e) {
subscriber.onError(e); // 온도 가져오기를 실패하면 Subscriber로 에러 전달.
break;
}
}
}
@Override
public void cancel() {
subscriber.onComplete(); // 구독이 취소되면 완료신호를 Subscriber로 전달
}
}
새 요소를 얻을 때마다 Subscription이 전달한 온도를 출력하고 새 레포트를 요청하는 Subscriber 클래스 구현
public class TempSubscriber implements Subscriber<TempInfo> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(TempInfo item) {
System.out.println(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Done!");
}
}
리액티브 애플리케이션이 실제 동작할 수 있도록 Publisher를 만들고 TempSubscriber을 이용해 Publisher에 구독하도록 Main클래스 구현
public class Main {
public static void main(String[] args) {
// 뉴욕에 새 Publisher를 만들고 TempSubscriber를 구독시킴
getTemperatures("New York").subscribe(new TempSubscriber());
}
private static Publisher<TempInfo> getTemperatures(String town) {
// 구독한 Subscriber에게 TempSubscrition을 전송하는 Publisher를 반환
return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
}
}
람다의 시그니처가 Publisher의 함수형 인터페이스와 같은 시그니처를 가지므로 람다를 Publisher로 바꿀 수 있다.
StackOverFlow
임의로 에러를 발생시키는 코드를 없앤다면 무슨일이 일어날까?
JVM은 각 쓰레드의 각 스택에게 메모리를 할당. 그리고 메소드를 부르려는 시도로 메모리가 꽉찰 경우 JVM은 에러를 발생시킴.
스택 오버플로가 발생하는 문제를 Executor를 구현해서 다른 스레드에서 실행하는 방법이 있다.
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
@Override
public void request(long n) {
executor.submit(() -> {
for (long i = 0L; i < n; i++) { // Subscriber가 만든 요청을 한개씩 반복
try {
System.out.println(Thread.currentThread().getName());
subscriber.onNext(TempInfo.fetch(town));
} catch (Exception e) {
subscriber.onError(e); // 온도 가져오기를 실패하면 Subscriber로 에러 전달.
break;
}
}
});
}
Processor로 데이터 변환하기
Processor는 Subscriber이며 동시에 Publisher이다.
Publisher의 목적은 Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것.
! 화씨로 제공된 데이터를 섭씨로 변환해 다시 방출하는 예제를 보자.
public class TempProcessor implements Processor<TempInfo, TempInfo> {//TempInfo를 다른 TempInfo로 변환하는 프로세서
private Subscriber<? super TempInfo> subscriber;
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(TempInfo temp) {
subscriber.onNext(new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9)); //섭씨로 변환한 다음 TempInfo를 다시 전송
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public void subscribe(Subscriber<? super TempInfo> subscriber) {
this.subscriber = subscriber;
}
}
onNext는 화씨를 섭씨로 변환한 다음 온도를 제전송.
private static Publisher<TempInfo> getTemperatures(String town) {
// 구독한 Subscriber에게 TempSubscrition을 전송하는 Publisher를 반환
return subscriber -> {
TempProcessor processor = new TempProcessor();
processor.subscribe(subscriber);
processor.onSubscribe(new TempSubscription(processor, town));
};
}
'Study > Modern-Java-In-Action' 카테고리의 다른 글
Modern Java Ch16. 안정적 비동기 프로그래밍 (0) | 2023.01.29 |
---|---|
Modern Java Ch15. completableFuture와 Reactive 프로그래밍 (0) | 2023.01.25 |
Modern Java Ch13. Default Method (0) | 2023.01.24 |
Modern Java Ch12. 날짜와 시간 API (0) | 2023.01.24 |
Modern Java Ch11. Optional Class (0) | 2023.01.24 |
댓글