danuri
오늘의 기록
danuri
전체 방문자
오늘
어제
  • 오늘의 기록 (307)
    • java (150)
      • java (33)
      • spring (63)
      • jpa (36)
      • querydsl (7)
      • intelliJ (9)
    • kotlin (8)
    • python (24)
      • python (10)
      • data analysis (13)
      • crawling (1)
    • ddd (2)
    • chatgpt (2)
    • algorithm (33)
      • theory (9)
      • problems (23)
    • http (8)
    • git (8)
    • database (5)
    • aws (12)
    • devops (10)
      • docker (6)
      • cicd (4)
    • book (44)
      • clean code (9)
      • 도메인 주도 개발 시작하기 (10)
      • 자바 최적화 (11)
      • 마이크로서비스 패턴 (0)
      • 스프링으로 시작하는 리액티브 프로그래밍 (14)
    • tistory (1)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

인기 글

태그

  • 트랜잭션
  • Thymeleaf
  • AWS
  • POSTGIS
  • docker
  • 자바 최적화
  • Spring
  • mockito
  • CICD
  • Jackson
  • DDD
  • SWAGGER
  • 등가속도 운동
  • S3
  • reactive
  • Saving Plans
  • gitlab
  • 도메인 주도 설계
  • nuribank
  • RDS
  • Java
  • ChatGPT
  • Database
  • Bitmask
  • PostgreSQL
  • Kotlin
  • connection
  • 마이크로서비스패턴
  • JPA
  • Security

최근 댓글

최근 글

hELLO · Designed By 정상우.
danuri

오늘의 기록

book/스프링으로 시작하는 리액티브 프로그래밍

[리액티브 프로그래밍] Cold Sequence와 Hot Sequence

2024. 3. 18. 22:46

스프링으로 시작하는 리액티브 프로그래밍 책 정리

 

스프링으로 시작하는 리액티브 프로그래밍 | 황정식 - 교보문고

스프링으로 시작하는 리액티브 프로그래밍 | *리액티브 프로그래밍의 기본기를 확실하게 다진다*리액티브 프로그래밍은 적은 컴퓨팅 파워로 대량의 요청 트래픽을 효과적으로 처리할 수 있는

product.kyobobook.co.kr

 

Cold와 Hot의 의미

✅ 컴퓨터 시스템에서 Cold와 Hot

  • Cold: 어떤 작업을 위해 다시 시작하고, 같은 작업을 매번 반복함.
  • Hot: 어떤 작업에 대해 다시 시작하지 않고, 같은 작업이 반복되지 않음.

 


 

Cold Sequence

Subscriber의 구독 시점이 달라도, 구독할 대마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름.

-> Cold Sequence 흐름으로 동작하는 Publisher를 Cold Publisher라고 한다.

@Slf4j
public class Example7_1 {
    public static void main(String[] args) throws InterruptedException {

        Flux<String> coldFlux =
                Flux
                    .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
                    .map(String::toLowerCase);

        coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
        System.out.println("----------------------------------------------------------------------");
        Thread.sleep(2000L);
        coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
    }
}

 

결과

21:00:44.187 [main] DEBUG- Using Slf4j logging framework
21:00:44.200 [main] INFO - # Subscriber1: korea
21:00:44.202 [main] INFO - # Subscriber1: japan
21:00:44.202 [main] INFO - # Subscriber1: chinese
----------------------------------------------------------------------
21:00:46.215 [main] INFO - # Subscriber2: korea
21:00:46.216 [main] INFO - # Subscriber2: japan
21:00:46.217 [main] INFO - # Subscriber2: chinese

-> 구독이 발생할 때마다 emit된 데이터를 처음부터 다시 전달받고 있다.

 


 

Hot Sequence

구독이 발생한 시점 이전에 Publisher로부터 emit된 데이터는 Subscriber가 전달받지 못하고, 구독이 발생한 시점 이후에 emit된 데이터만 전달받을 수 있다.

@Slf4j
public class Example7_2 {
    public static void main(String[] args) throws InterruptedException {
        String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};

        log.info("# Begin concert:");
        Flux<String> concertFlux =
                Flux
                    .fromArray(singers)
                    .delayElements(Duration.ofSeconds(1))
                    .share();

        concertFlux.subscribe(
                singer -> log.info("# Subscriber1 is watching {}'s song", singer)
        );

        Thread.sleep(2500);

        concertFlux.subscribe(
                singer -> log.info("# Subscriber2 is watching {}'s song", singer)
        );

        Thread.sleep(3000);
    }
}

-> delayElements: 데이터 emit이 1초씩 지연될 수 있도록 한다.

-> share(): 원본 Flux(처음으로 리턴하는 Flux, 여기서는 fromArray로부터 리턴되는 Flux)를 여러 Subscriber가 공유하게끔 한다.

 

결과

21:08:09.874 [main] INFO - # Begin concert:
21:08:11.076 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
21:08:12.092 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
21:08:13.100 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
21:08:13.102 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
21:08:14.111 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
21:08:14.113 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
21:08:15.120 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
21:08:15.123 [parallel-5] INFO - # Subscriber2 is watching Singer E's song

-> 두 번째 구독의 경우, 원본 Flux가 emit한 데이터 중에서 Singer A, Singer B는 전달받지 못했다.

 


 

HTTP 요청과 응답에서 Cold Sequence와 Hot Sequence의 동작 흐름

✅ Cold Sequence

@Slf4j
public class Example7_3 {
    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        Mono<String> mono = getWorldTime(worldTimeUri);
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
}

-> 구독이 두 번 발생했으므로, 두 번의 새로운 HTTP 요청이 발생한다.

 

결과

# dateTime 1: 2024-03-18T22:37:16.922861+09:00
# dateTime 2: 2024-03-18T22:37:18.653781+09:00

-> 두 결과의 시간이 2초 정도 차이 난다.

 

✅ Hot Sequence

@Slf4j
public class Example7_4 {
    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        Mono<String> mono = getWorldTime(worldTimeUri).cache();
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
}

cache() Operator는 Mono를 Hot Sequence로 변경해 주고, emit된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달한다.

-> 결과적으로 캐시된 데이터를 전달하기 때문에 구독이 발생할 때마다 Subscriber는 동일한 데이터를 전달받게 된다.

 

결과

# dateTime 1: 2024-03-18T22:42:24.691041+09:00
# dateTime 2: 2024-03-18T22:42:24.691041+09:00

-> 첫 번째 구독을 통해 발생한 응답 데이터를 캐시한 후에 두 번째 구독에서는 캐시된 데이터를 전달하기 때문에 출력된 시간이 동일하다.

저작자표시 비영리 동일조건 (새창열림)

'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글

[리액티브 프로그래밍] Sinks  (0) 2024.04.21
[리액티브 프로그래밍] Backpressure  (0) 2024.04.21
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram)  (0) 2024.03.11
[리액티브 프로그래밍] Reactor 개요  (0) 2024.03.11
[리액티브 프로그래밍] 리액티브 프로그래밍을 위한 사전 지식  (0) 2024.03.03
    'book/스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
    • [리액티브 프로그래밍] Sinks
    • [리액티브 프로그래밍] Backpressure
    • [리액티브 프로그래밍] 마블 다이어그램(Marble Diagram)
    • [리액티브 프로그래밍] Reactor 개요
    danuri
    danuri
    IT 관련 정보(컴퓨터 지식, 개발)를 꾸준히 기록하는 블로그입니다.

    티스토리툴바