마블 다이어그램(Marble Diagram)이란?
✅ 마블 다이어그램
비동기적인 데이터 흐름을 시간의 흐름에 따라 시작적으로 표시한 다이어그램.
그림에 나와있는 번호 순서대로 설명하자면,
- Publisher가 데이터를 emit하는 타임라인. 가운데 Operator를 기준으로 Upstream의 Publisher라고 볼 수 있다. (Source Flux)
- Publisher가 emit하는 데이터. 타임라인이 왼쪽에서 오른쪽으로 흐르기 때문에, 가장 왼쪽에 있는 1번 구슬이 가장 먼저 emit된다.
- 수직으로된 바는 데이터의 emit이 정상적으로 끝났음을 의미한다. (onComplete Signal)
- Publisher로부터 emit된 데이터가 Operator 함수의 입력으로 전달된다.
- Operator 함수이다. 각각의 Operator는 해당 Operator를 잘 설명하는 마블 다이어그램을 가진다.
- Operator 함수에서 리턴하는 새로운 Publisher를 이용해 Downstream에 데이터를 전달한다.
- Operator 함수에서 가공 처리되어 출력으로 내보내진 데이터의 타임라인이다. (Output Flux)
- Operator 함수에서 가공 처리된 데이터.
- X 표시는 에러가 발생해 데이터 처리가 종료되었음을 의미한다. (onError Signal)
-> Operator API 설명을 보기전에 마블 다이어그램부터 먼저 확인하는 습관을 들이기를 권장한다.
마블 다이어그램으로 Reactor의 Publisher 이해하기
✅ Mono
Mono는 0개 또는 1개의 데이터를 emit하는 Publisher다.
<예제 1>
public class Example6_1 {
public static void main(String[] args) {
Mono.just("Hello Reactor")
.subscribe(System.out::println);
}
}
just()는 한 개 이상의 데이터를 emit하기 위한 Operator다.
여기서는 Mono를 사용하기 때문에, 1개의 데이터만 넣어준다.
결과
Hello Reactor
<예제 2>
public class Example6_2 {
public static void main(String[] args) {
Mono
.empty()
.subscribe(
none -> System.out.println("# emitted onNext signal"),
error -> {},
() -> System.out.println("# emitted onComplete signal")
);
}
}
데이터를 한 건도 emit하지 않는 코드다.
empty() Operator를 사용하면 데이터를 emit하지 않고 onComplete Signal을 전송한다.
결과
# emitted onComplete signal
실행결과를 보면, 6번 라인의 첫 번째 람다 표현식이 실행되는 것이 아니라, 8번 라인의 세 번째 람다 표현식이 실행되는 것을 알 수 있다.
여기서 알아둘 점은, subscribe() 메서드의
- 첫 번째 람다 표현식은 Publisher가 onNext Signal을 전송하면 실행되고,
- 두 번째 람다 표현식은 Publisher가 onError Signal을 전송하면 실행되고,
- 세 번째 람다 표현식은 Publisher가 onComplete Signal을 전송하면 실행된다.
empty() Operator를 사용하면 내부적으로 emit할 데이터가 없는 것으로 간주하고, onComplete Signal을 전송하기 때문에, 세 번째 람다 표현식만 실행된다.
-> empty() Operator는 데이터를 전달 받을 필요는 없고, 주로 작업이 끝났음을 알리고 이에 따른 후처리를 하고 싶을 때 사용할 수 있다.
<예제 3>
public class Example6_3 {
public static void main(String[] args) {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
Mono.just(
restTemplate
.exchange(worldTimeUri,
HttpMethod.GET,
new HttpEntity<String>(headers),
String.class)
)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response.getBody());
String dateTime = jsonContext.read("$.datetime");
return dateTime;
})
.subscribe(
data -> System.out.println("# emitted data: " + data),
error -> {
System.out.println(error);
},
() -> System.out.println("# emitted onComplete signal")
);
}
}
여기서는 애플리케이션 외부 시스템 API 호출을 통해 데이터를 요청하도록 했다.
just() Operator에 외부 시스템의 API를 호출해서 응답으로 수신한 데이터를 전달한다.
결과
# emitted data: 2024-03-11T22:18:49.621786+09:00
# emitted onComplete signal
-> Mono는 단 한 건의 데이터를 응답으로 보내는 HTTP 요청/응답에 사용하기 적합한 Publisher 타입이다.
✅ Flux
Flux는 0개부터 N개의 데이터를 emit하는 Publisher다. (Mono의 범위를 포함한다)
<예제 1>
public class Example6_4 {
public static void main(String[] args) {
Flux.just(6, 9, 13)
.map(num -> num % 2)
.subscribe(System.out::println);
}
}
결과
0
1
1
<예제 2>
public class Example6_5 {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{3, 6, 7, 9})
.filter(num -> num > 6)
.map(num -> num * 2)
.subscribe(System.out::println);
}
}
fromArray() Operator는 배열 데이터를 데이터 소스로 사용할 수 있다.
+) just() 메서드가 varargs 타입을 지원하기 때문에, Flux.just(new Integerp[]{...})로 사용해도 된다. (just() 내부에서 fromArray()를 호출한다)
filter() Operator가 6보다 큰 숫자만 필터링하고,
map() Operator가 해당 숫자들에 2를 곱한다.
결과
14
18
<예제 3>
public class Example6_6 {
public static void main(String[] args) {
Flux<String> flux =
Mono.justOrEmpty("Steve")
.concatWith(Mono.justOrEmpty("Jobs"));
flux.subscribe(System.out::println);
}
}
Mono를 연결해서 Flux로 변환할 수 있다.
just() Operator는 null을 허용하지 않지만, justOrEmpty() Operator는 null을 허용한다. (null이면 empty()를 호출)
concatWith() Operator가 데이터 소스를 연결해서 하나의 데이터 소스를 만든다.
결과
Steve
Jobs
-> String의 concat처럼 데이터 자체를 연결하는 것이 아니라 두 개의 데이터 소스를 연결해서 하나의 데이터 소스를 만든다.
<예제 4>
public class Example6_7 {
public static void main(String[] args) {
Flux.concat(
Flux.just("Mercury", "Venus", "Earth"),
Flux.just("Mars", "Jupiter", "Saturn"),
Flux.just("Uranus", "Neptune", "Pluto"))
.collectList()
.subscribe(planets -> System.out.println(planets));
}
}
concatWith() Operator는 두 개의 데이터 소스만 연결할 수 있지만, concat() Operator는 여러 개의 데이터 소스를 연결할 수 있다.
여기서 중요한 점은 각 Operator마다 리턴하는 Publisher 타입이다.
concat() Operator는 Flux를 리턴한다. (아홉 개의 데이터를 하나의 데이터 소스로 연결한다.)
collectList() Operator는 Mono를 리턴한다. (아홉 개의 데이터를 하나의 List로 만든다 -> 하나의 List도 한 개의 데이터로 본다)
따라서 subscribe() 메서드는 아홉 개의 데이터를 반복적으로 출력하는 것이 아닌, 한 개의 List를 그대로 한 번 출력한다.
결과
[Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto]
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Backpressure (0) | 2024.04.21 |
---|---|
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence (0) | 2024.03.18 |
[리액티브 프로그래밍] Reactor 개요 (0) | 2024.03.11 |
[리액티브 프로그래밍] 리액티브 프로그래밍을 위한 사전 지식 (0) | 2024.03.03 |
[리액티브 프로그래밍] Blocking I/O와 Non-Blocking I/O (2) | 2024.02.26 |