danuri
오늘의 기록
danuri
전체 방문자
오늘
어제
  • 오늘의 기록 (307)
    • java (150)
      • java (33)
      • spring (63)
      • jpa (36)
      • querydsl (7)
      • intelliJ (9)
    • kotlin (8)
    • python (24)
      • python (10)
      • data analysis (13)
      • crawling (1)
    • ddd (2)
    • chatgpt (2)
    • algorithm (33)
      • theory (9)
      • problems (23)
    • http (8)
    • git (8)
    • database (5)
    • aws (12)
    • devops (10)
      • docker (6)
      • cicd (4)
    • book (44)
      • clean code (9)
      • 도메인 주도 개발 시작하기 (10)
      • 자바 최적화 (11)
      • 마이크로서비스 패턴 (0)
      • 스프링으로 시작하는 리액티브 프로그래밍 (14)
    • tistory (1)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

인기 글

태그

  • Spring
  • 도메인 주도 설계
  • AWS
  • reactive
  • Saving Plans
  • Bitmask
  • nuribank
  • 트랜잭션
  • PostgreSQL
  • DDD
  • 마이크로서비스패턴
  • JPA
  • S3
  • 자바 최적화
  • 등가속도 운동
  • connection
  • docker
  • Security
  • Kotlin
  • POSTGIS
  • SWAGGER
  • Thymeleaf
  • Java
  • ChatGPT
  • RDS
  • CICD
  • Jackson
  • Database
  • gitlab
  • mockito

최근 댓글

최근 글

hELLO · Designed By 정상우.
danuri

오늘의 기록

[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram)
book/스프링으로 시작하는 리액티브 프로그래밍

[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram)

2024. 3. 11. 22:34

스프링으로 시작하는 리액티브 프로그래밍 책 정리

 

스프링으로 시작하는 리액티브 프로그래밍 | 황정식 - 교보문고

스프링으로 시작하는 리액티브 프로그래밍 | *리액티브 프로그래밍의 기본기를 확실하게 다진다*리액티브 프로그래밍은 적은 컴퓨팅 파워로 대량의 요청 트래픽을 효과적으로 처리할 수 있는

product.kyobobook.co.kr

 

마블 다이어그램(Marble Diagram)이란?

✅ 마블 다이어그램

비동기적인 데이터 흐름을 시간의 흐름에 따라 시작적으로 표시한 다이어그램.

 

마블 다이어그램의 구성
마블 다이어그램의 구성

 

그림에 나와있는 번호 순서대로 설명하자면,

  1. Publisher가 데이터를 emit하는 타임라인. 가운데 Operator를 기준으로 Upstream의 Publisher라고 볼 수 있다. (Source Flux)
  2. Publisher가 emit하는 데이터. 타임라인이 왼쪽에서 오른쪽으로 흐르기 때문에, 가장 왼쪽에 있는 1번 구슬이 가장 먼저 emit된다.
  3. 수직으로된 바는 데이터의 emit이 정상적으로 끝났음을 의미한다. (onComplete Signal)
  4. Publisher로부터 emit된 데이터가 Operator 함수의 입력으로 전달된다.
  5. Operator 함수이다. 각각의 Operator는 해당 Operator를 잘 설명하는 마블 다이어그램을 가진다.
  6. Operator 함수에서 리턴하는 새로운 Publisher를 이용해 Downstream에 데이터를 전달한다.
  7. Operator 함수에서 가공 처리되어 출력으로 내보내진 데이터의 타임라인이다. (Output Flux)
  8. Operator 함수에서 가공 처리된 데이터.
  9. X 표시는 에러가 발생해 데이터 처리가 종료되었음을 의미한다. (onError Signal)

-> Operator API 설명을 보기전에 마블 다이어그램부터 먼저 확인하는 습관을 들이기를 권장한다.

 


 

마블 다이어그램으로 Reactor의 Publisher 이해하기

✅ Mono

 

Mono 마블 다이어그램
Mono 마블 다이어그램

 

Mono는 0개 또는 1개의 데이터를 emit하는 Publisher다.

<예제 1>

public class Example6_1 {
    public static void main(String[] args) {
        Mono.just("Hello Reactor")
                .subscribe(System.out::println);
    }
}

just()는 한 개 이상의 데이터를 emit하기 위한 Operator다.

여기서는 Mono를 사용하기 때문에, 1개의 데이터만 넣어준다.

 

결과

Hello Reactor

 

<예제 2>

public class Example6_2 {
    public static void main(String[] args) {
        Mono
            .empty()
            .subscribe(
                    none -> System.out.println("# emitted onNext signal"),
                    error -> {},
                    () -> System.out.println("# emitted onComplete signal")
            );
    }
}

데이터를 한 건도 emit하지 않는 코드다.

empty() Operator를 사용하면 데이터를 emit하지 않고 onComplete Signal을 전송한다.

 

결과

# emitted onComplete signal

실행결과를 보면, 6번 라인의 첫 번째 람다 표현식이 실행되는 것이 아니라, 8번 라인의 세 번째 람다 표현식이 실행되는 것을 알 수 있다.

여기서 알아둘 점은, subscribe() 메서드의

  1. 첫 번째 람다 표현식은 Publisher가 onNext Signal을 전송하면 실행되고,
  2. 두 번째 람다 표현식은 Publisher가 onError Signal을 전송하면 실행되고,
  3. 세 번째 람다 표현식은 Publisher가 onComplete Signal을 전송하면 실행된다.

empty() Operator를 사용하면 내부적으로 emit할 데이터가 없는 것으로 간주하고, onComplete Signal을 전송하기 때문에, 세 번째 람다 표현식만 실행된다.

-> empty() Operator는 데이터를 전달 받을 필요는 없고, 주로 작업이 끝났음을 알리고 이에 따른 후처리를 하고 싶을 때 사용할 수 있다.

 

<예제 3>

public class Example6_3 {
    public static void main(String[] args) {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));


        Mono.just(
                    restTemplate
                            .exchange(worldTimeUri,
                                    HttpMethod.GET,
                                    new HttpEntity<String>(headers),
                                    String.class)
                )
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response.getBody());
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                })
                .subscribe(
                        data -> System.out.println("# emitted data: " + data),
                        error -> {
                            System.out.println(error);
                        },
                        () -> System.out.println("# emitted onComplete signal")
                );
    }
}

여기서는 애플리케이션 외부 시스템 API 호출을 통해 데이터를 요청하도록 했다.

just() Operator에 외부 시스템의 API를 호출해서 응답으로 수신한 데이터를 전달한다.

 

결과

# emitted data: 2024-03-11T22:18:49.621786+09:00
# emitted onComplete signal

-> Mono는 단 한 건의 데이터를 응답으로 보내는 HTTP 요청/응답에 사용하기 적합한 Publisher 타입이다.

 

✅ Flux

 

Flux 마블 다이어그램
Flux 마블 다이어그램

 

Flux는 0개부터 N개의 데이터를 emit하는 Publisher다. (Mono의 범위를 포함한다)

 

<예제 1>

public class Example6_4 {
    public static void main(String[] args) {
        Flux.just(6, 9, 13)
                .map(num -> num % 2)
                .subscribe(System.out::println);
    }
}

 

결과

0
1
1

 

<예제 2>

public class Example6_5 {
    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{3, 6, 7, 9})
                .filter(num -> num > 6)
                .map(num -> num * 2)
                .subscribe(System.out::println);
    }
}

fromArray() Operator는 배열 데이터를 데이터 소스로 사용할 수 있다.

+) just() 메서드가 varargs 타입을 지원하기 때문에, Flux.just(new Integerp[]{...})로 사용해도 된다. (just() 내부에서 fromArray()를 호출한다)

filter() Operator가 6보다 큰 숫자만 필터링하고,

map() Operator가 해당 숫자들에 2를 곱한다.

 

결과

14
18

 

<예제 3>

public class Example6_6 {
    public static void main(String[] args) {
        Flux<String> flux =
                Mono.justOrEmpty("Steve")
                        .concatWith(Mono.justOrEmpty("Jobs"));
        flux.subscribe(System.out::println);
    }
}

Mono를 연결해서 Flux로 변환할 수 있다.

just() Operator는 null을 허용하지 않지만, justOrEmpty() Operator는 null을 허용한다. (null이면 empty()를 호출)

concatWith() Operator가 데이터 소스를 연결해서 하나의 데이터 소스를 만든다.

 

결과

Steve
Jobs

-> String의 concat처럼 데이터 자체를 연결하는 것이 아니라 두 개의 데이터 소스를 연결해서 하나의 데이터 소스를 만든다.

 

<예제 4>

public class Example6_7 {
    public static void main(String[] args) {
        Flux.concat(
                        Flux.just("Mercury", "Venus", "Earth"),
                        Flux.just("Mars", "Jupiter", "Saturn"),
                        Flux.just("Uranus", "Neptune", "Pluto"))
                .collectList()
                .subscribe(planets -> System.out.println(planets));
    }
}

concatWith() Operator는 두 개의 데이터 소스만 연결할 수 있지만, concat() Operator는 여러 개의 데이터 소스를 연결할 수 있다.

여기서 중요한 점은 각 Operator마다 리턴하는 Publisher 타입이다.

concat() Operator는 Flux를 리턴한다. (아홉 개의 데이터를 하나의 데이터 소스로 연결한다.)

collectList() Operator는 Mono를 리턴한다. (아홉 개의 데이터를 하나의 List로 만든다 -> 하나의 List도 한 개의 데이터로 본다)

따라서 subscribe() 메서드는 아홉 개의 데이터를 반복적으로 출력하는 것이 아닌, 한 개의 List를 그대로 한 번 출력한다.

 

결과

[Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto]

 

 

저작자표시 비영리 동일조건

'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글

[리액티브 프로그래밍] Backpressure  (0) 2024.04.21
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence  (0) 2024.03.18
[리액티브 프로그래밍] Reactor 개요  (0) 2024.03.11
[리액티브 프로그래밍] 리액티브 프로그래밍을 위한 사전 지식  (0) 2024.03.03
[리액티브 프로그래밍] Blocking I/O와 Non-Blocking I/O  (2) 2024.02.26
    'book/스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
    • [리액티브 프로그래밍] Backpressure
    • [리액티브 프로그래밍] Cold Sequence와 Hot Sequence
    • [리액티브 프로그래밍] Reactor 개요
    • [리액티브 프로그래밍] 리액티브 프로그래밍을 위한 사전 지식
    danuri
    danuri
    IT 관련 정보(컴퓨터 지식, 개발)를 꾸준히 기록하는 블로그입니다.

    티스토리툴바