Cold와 Hot의 의미
✅ 컴퓨터 시스템에서 Cold와 Hot
- Cold: 어떤 작업을 위해 다시 시작하고, 같은 작업을 매번 반복함.
- Hot: 어떤 작업에 대해 다시 시작하지 않고, 같은 작업이 반복되지 않음.
Cold Sequence
Subscriber의 구독 시점이 달라도, 구독할 대마다 Publisher가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름.
-> Cold Sequence 흐름으로 동작하는 Publisher를 Cold Publisher라고 한다.
@Slf4j
public class Example7_1 {
public static void main(String[] args) throws InterruptedException {
Flux<String> coldFlux =
Flux
.fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
.map(String::toLowerCase);
coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
System.out.println("----------------------------------------------------------------------");
Thread.sleep(2000L);
coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
}
}
결과
21:00:44.187 [main] DEBUG- Using Slf4j logging framework
21:00:44.200 [main] INFO - # Subscriber1: korea
21:00:44.202 [main] INFO - # Subscriber1: japan
21:00:44.202 [main] INFO - # Subscriber1: chinese
----------------------------------------------------------------------
21:00:46.215 [main] INFO - # Subscriber2: korea
21:00:46.216 [main] INFO - # Subscriber2: japan
21:00:46.217 [main] INFO - # Subscriber2: chinese
-> 구독이 발생할 때마다 emit된 데이터를 처음부터 다시 전달받고 있다.
Hot Sequence
구독이 발생한 시점 이전에 Publisher로부터 emit된 데이터는 Subscriber가 전달받지 못하고, 구독이 발생한 시점 이후에 emit된 데이터만 전달받을 수 있다.
@Slf4j
public class Example7_2 {
public static void main(String[] args) throws InterruptedException {
String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};
log.info("# Begin concert:");
Flux<String> concertFlux =
Flux
.fromArray(singers)
.delayElements(Duration.ofSeconds(1))
.share();
concertFlux.subscribe(
singer -> log.info("# Subscriber1 is watching {}'s song", singer)
);
Thread.sleep(2500);
concertFlux.subscribe(
singer -> log.info("# Subscriber2 is watching {}'s song", singer)
);
Thread.sleep(3000);
}
}
-> delayElements: 데이터 emit이 1초씩 지연될 수 있도록 한다.
-> share(): 원본 Flux(처음으로 리턴하는 Flux, 여기서는 fromArray로부터 리턴되는 Flux)를 여러 Subscriber가 공유하게끔 한다.
결과
21:08:09.874 [main] INFO - # Begin concert:
21:08:11.076 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
21:08:12.092 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
21:08:13.100 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
21:08:13.102 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
21:08:14.111 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
21:08:14.113 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
21:08:15.120 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
21:08:15.123 [parallel-5] INFO - # Subscriber2 is watching Singer E's song
-> 두 번째 구독의 경우, 원본 Flux가 emit한 데이터 중에서 Singer A, Singer B는 전달받지 못했다.
HTTP 요청과 응답에서 Cold Sequence와 Hot Sequence의 동작 흐름
✅ Cold Sequence
@Slf4j
public class Example7_3 {
public static void main(String[] args) throws InterruptedException {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
Mono<String> mono = getWorldTime(worldTimeUri);
mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);
mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);
}
private static Mono<String> getWorldTime(URI worldTimeUri) {
return WebClient.create()
.get()
.uri(worldTimeUri)
.retrieve()
.bodyToMono(String.class)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response);
String dateTime = jsonContext.read("$.datetime");
return dateTime;
});
}
}
-> 구독이 두 번 발생했으므로, 두 번의 새로운 HTTP 요청이 발생한다.
결과
# dateTime 1: 2024-03-18T22:37:16.922861+09:00
# dateTime 2: 2024-03-18T22:37:18.653781+09:00
-> 두 결과의 시간이 2초 정도 차이 난다.
✅ Hot Sequence
@Slf4j
public class Example7_4 {
public static void main(String[] args) throws InterruptedException {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
Mono<String> mono = getWorldTime(worldTimeUri).cache();
mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
Thread.sleep(2000);
mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));
Thread.sleep(2000);
}
private static Mono<String> getWorldTime(URI worldTimeUri) {
return WebClient.create()
.get()
.uri(worldTimeUri)
.retrieve()
.bodyToMono(String.class)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response);
String dateTime = jsonContext.read("$.datetime");
return dateTime;
});
}
}
cache() Operator는 Mono를 Hot Sequence로 변경해 주고, emit된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달한다.
-> 결과적으로 캐시된 데이터를 전달하기 때문에 구독이 발생할 때마다 Subscriber는 동일한 데이터를 전달받게 된다.
결과
# dateTime 1: 2024-03-18T22:42:24.691041+09:00
# dateTime 2: 2024-03-18T22:42:24.691041+09:00
-> 첫 번째 구독을 통해 발생한 응답 데이터를 캐시한 후에 두 번째 구독에서는 캐시된 데이터를 전달하기 때문에 출력된 시간이 동일하다.
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |
---|---|
[리액티브 프로그래밍] Backpressure (0) | 2024.04.21 |
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram) (0) | 2024.03.11 |
[리액티브 프로그래밍] Reactor 개요 (0) | 2024.03.11 |
[리액티브 프로그래밍] 리액티브 프로그래밍을 위한 사전 지식 (0) | 2024.03.03 |