리액티브 스트림즈(Reactive Streams)란?
✅ 리액티브 스트림즈
리액티브 라이브러리를 어떻게 구현할지 정의해 놓은 표준 사양.
-> 데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
리액티브 스트림즈 구성요소
✅ 리액티브 스트림즈 컴포넌트
컴포넌트 | 설명 |
Publisher | 데이터를 생성하고 통지하는 역할. |
Subscriber | 구독한 Publisher로부터 통지된 데이터를 전달받아 처리하는 역할. |
Subscription | Publisher에 요청할 데이터 개수를 지정하거나 구독을 취소하는 역할. |
Processor | Publisher와 Subscriber의 기능을 모두 가지고 있다. |
- Subscriber는 전달받을 데이터를 구독한다. (subscribe)
- Publisher는 데이터를 통지할 준비가 되었음을 Subscriber에 알린다. (onSubscribe)
- Subscriber는 전달받기를 원하는 데이터의 개수를 Pulisher에 요청한다. (Subscription.request)
- Publisher는 요청받은 만큼의 데이터를 통지한다. (onNext)
- Publisher와 Subscriber 간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복하다가 Publisher가 모든 데이터를 통지하게 되면 데이터 전송이 완료되었음을 Subscriber에게 알린다. (onComplete)
- 만약 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 Subscriber에게 알린다. (onError)
✅ Subscriber가 데이터의 요청 개수를 지정하는 이유
실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용하는 경우가 대부분이다.
이럴 경우 Publisher가 통지하는 속도가 Subscriver가 처리하는 속도보다 빠르면, 처리를 기다리는 데이터가 쌓이게 되고, 시스템 부하가 커지는 결과를 낳을 수 있다.
-> 이러한 문제를 방지하기 위해 Subscription.request를 통해 데이터 개수를 제어하는 것이다.
코드로 보는 리액티브 스트림즈 컴포넌트
리액티브 스트림즈 컴포넌트는 실제 코드에서 인터페이스 형태로 정의되며, 이 인터페이스들을 구현해서 해당 컴포넌트를 사용하게 된다.
Publisher
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
Publisher는 subscribe 메서드 하나만 구현하면 된다. 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.
-> 개념상으로는 Subscriber가 구독하는 것이 맞는데, 실제 코드상에서는 Publisher가 Subscriber를 등록하는 형태로 구독이 이루어진다.
Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
onSubscribe
구독 시작 시점에 어떤 처리를 하는 역할을 한다.
Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지한다. (Subscription 객체를 통해 이루어진다)
onNext
Publisher가 통지한 데이터를 처리하는 역할을 한다.
onError
Publisher의 통지 과정에서 에러가 발생했을 때, 해당 에러를 처리하는 역할을 한다.
onComplete
Publisher가 데이터 통지를 완료했을 때 후처리 역할을 한다.
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
request
Publisher에게 데이터의 개수를 요청한다.
cancel
구독을 해지한다.
✅ 코드 관점에서의 리액티브 스트림즈 동작 과정
앞선 그림에서 설명한 Publisher와 Subscriber의 동작 과정을 다시 설명하자면 다음과 같다.
- Publisher가 Subscriber 구현 객체를 subscribe 메서드의 파라미터로 전달한다.
- Publisher는 전달받은 Subscriber의 onSubscribe 메서드를 호출하면서 Subscruption 구현 객체를 Subscriber에게 전달한다.
- Subscriber는 Subscription 객체를 통해 전달받을 데이터의 개수를 요청한다.
- Publisher는 요청 개수만큼 onNext() 메서드를 호출해서 Subscriber에게 전달한다.
- Publisher는 통지할 데이터가 더 이상 없을 경우 onComplete 메서드를 호출해서 Subscriber에게 데이터 처리 종료를 알린다.
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Processor는 구현해야 하는 메서드는 업속, Subscriber와 Publisher의 기능을 모두 가지고 있기 때문에, 둘을 상속한다.
리액티브 스트림즈 관련 용어 정의
✅ Signal
Publisher와 Subscriber 간에 주고받는 상호작용.
onSubscribe, onNext, onComplete, onError
Subscriber 인터페이스에 정의되지만, 실제 사용하는 주체는 Publisher이기 때문에, Publisher가 Subscriber에게 보내는 Signal.
request, cancel
Subscription 인터페이스에 정의되지만, 실제 사용하는 주체는 Subscriber이기 때문에, Subscriber가 Publisher에게 보내는 Signal.
✅ Demand
Publisher가 아직 Subscriber에게 전달하지 않은 Subscriber가 요청한 데이터.
✅ Emit
Publisher가 Subscriber에게 데이터를 전달하는 것.
+) 그동안 '통지한다'라는 용어를 사용했는데, 이제부터 Publisher가 emit하는 Signal 중에 데이터를 전달하기 위한 onNext Signal을 '데이터를 emit한다'라고 표현하겠다.
✅ Upstream/Downstream
public class Example2_5 {
public static void main(String[] args) {
Flux
.just(1, 2, 3, 4, 5, 6)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(System.out::println);
}
}
이렇게 리액티브 프로그래밍 코드에서 메서드 체인 방식으로 호출할 수 있는 이유는 호출하는 각각의 메서드들이 모두 같은 타입의 객체를 반환하기 때문이다.
-> 예제에서는 반환 값이 모두 Flux 타입의 객체이다.
junt 메서드를 통해 반환된 Flux 입장에서, 그 다음 라인의 filter 메서드 호출을 통해 반환된 Flux가 자신보다 더 하위에 있기 때문에 Downstream이고,
filter 메서드 호출을 통해 반환된 Flux 입장에서 just 메서드 호출을 통해 반환된 Flux가 자신보다 더 상위에 있기 때문에, Upstream이 된다.
✅ Sequence
위 예제처럼 다양한 Operator로 데이터의 연속적인 흐름을 정의한 것.
✅ Operator
위 예제처럼 just, filter, map 같은 메서드들을 리액티브 프로그래밍에서 연산자라고 부른다.
✅ Source
리액티브 프로그래밍 관련 문서에서, Data source, Source Publisher, Source Flux 등 용어가 보인다면, 이는 '최초의'라는 의미로 해석하면 된다.
리액티브 스트림즈의 구현 규칙
✅ Publisher
번호 | 규칙 | 추가 설명 |
1 | Publisher가 Subscirber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다. | |
2 | Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다. | |
3 | Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다. | Publisher에서 발생한 실패를 Subscriber가 처리할 수 있는 기회를 가지도록 한다. |
4 | Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다. | |
5 | Publisher가 Subscriber에게 onError 또는 onComplete singal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다 | onError 또는 onComplete Signal은 구독 취소와 동일한 기능을 한다. |
6 | 일단 종료 상태 signal을 받으면(onError, onComplete) 더 이상 signal이 발생되지 않아야 한다. | |
7 | 구독이 취소되면 Subscirber는 결국 signal을 받는 것을 중지해야 한다. |
✅ Subscriber
번호 | 규칙 | 추가 설명 |
1 | Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)를 통해 Demand signal을 Publisher에게 보내야 한다. | 데이터를 얼마나 수신할 수 있는지 결정하는 책임이 Subscriber에게 있다. 리액티브 스트림즈는 한 번에 하나씩 요청하기 보다 Subscriber가 처리할 수 있는 적절한 상한선만큼의 데이터 개수 요청을 권장한다. |
2 | Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher의 메서드를 호출해서는 안 된다. | Publisher/Subscription과 Subscriber 간의 순환 및 경쟁 조건을 방지하기 위함. |
3 | Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 signal을 수신한 후 구독이 취소된 것으로 간주해야 한다. | |
4 | 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel()을 호출해야 한다. | |
5 | Subscriber.onSubscribe()는 지정된 Subscriber에 대해 최대 한 번만 호출되어야 한다. | 동일한 구독자가 최대 한 번만 구독할 수 있다. |
✅ Subscription
번호 | 규칙 | 추가 설명 |
1 | 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다. | Publisher가 데이터를 준비하기 전에, Subscriber가 계속 데이터를 요청하게 되면, 문제를 야기할 수 있다. 따라서, Subscription.request를 onNext나 onSubscribe 내에서 동기적으로 호출하게끔 하여, 데이터 흐름을 적절히 관리할 수 있다. |
2 | 구독이 취소된 후 추가적으로 호출되는 Subscription.request(long n)는 효력이 없어야 한다. | |
3 | 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 한다. | |
4 | 구독이 취소되지 않은 동안 Subscription.request(long n)의 매개변수가 0보다 작거나 같으면 java.lang.IllegalArgumentException과 함께 onError signal을 보내야 한다. | |
5 | 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다. | 구독 취소의 의미를 넘어 Publisher에게 더 이상 signal을 보내지 말라는 의미이기도 하다. |
6 | 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다. | 이 규칙을 통해 가비지 컬렉터가 더 이상 유효하지 앟은 구독자의 객체를 수집하여 메모리를 확보할 수 있도록 한다. |
7 | Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다. | 리액티브 스트림즈는 예외가 발생하면 해당 예외를 onError signal과 함께 보내도록 규정한다. |
8 | 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63 - 1개의 Demand를 지원해야 한다. | 이렇게 무한히 발생하는 데이터의 흐름을 무한 스트림이라고 한다. |
리액티브 스트림즈 구현체
✅ RxJava
.NET 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅하여 만든 라이브러리.
✅ Project Reactor
Spring 팀에서 개발한 구현체. Spring WebFlux 기반의 리액티브 애플리케이션 도입을 위해 Reactor를 충분히 이해해야 한다.
✅ Akka Streams
JVM상에서의 동시성과 분산 애플리케이션을 단순화해 주는 오픈소스 툴킷
✅ Java Flow API
리액티브 스트림즈의 구현체가 아니라 리액티브 스트림즈의 표준 사양이 SPI로써 Java API에 정의되어 있다.
참고자료
https://velog.io/@korea3611/리액티브-스트림즈Reactive-Streams란
https://github.com/reactive-streams/reactive-streams-jvm
https://doc.akka.io/docs/akka/2.4/intro/what-is-akka.html
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram) (0) | 2024.03.11 |
---|---|
[리액티브 프로그래밍] Reactor 개요 (0) | 2024.03.11 |
[리액티브 프로그래밍] 리액티브 프로그래밍을 위한 사전 지식 (0) | 2024.03.03 |
[리액티브 프로그래밍] Blocking I/O와 Non-Blocking I/O (2) | 2024.02.26 |
[리액티브 프로그래밍] 리액티브 시스템과 리액티브 프로그래밍 (0) | 2024.02.19 |