스프링으로 시작하는 리액티브 프로그래밍 | 황정식 - 교보문고
스프링으로 시작하는 리액티브 프로그래밍 | *리액티브 프로그래밍의 기본기를 확실하게 다진다*리액티브 프로그래밍은 적은 컴퓨팅 파워로 대량의 요청 트래픽을 효과적으로 처리할 수 있는
product.kyobobook.co.kr
Sinks란?
✅ Processor
리액티브 스트림즈의 구성요소 중 하나인, Publisher와 Subscriber의 기능을 모두 지닌 Processor는 Reactor 3.5.0부터 제거되었다.
대신 Processor의 기능을 개선한 Sinks가 3.4.0부터 지원되기 시작했다.
Sinks는 Signal을 프로그래밍 방식으로 전송할 수 있는 구조이다.
-> Reactor에서 프로그래밍 방식으로 Signal을 전송하는 방법은 전통적인 방식과 Sinks를 사용하는 방식이 있는데, 이를 비교해보자.
✅ 전통적인 방식
generate()나 create() Operator를 사용해서 프로그래밍 방식으로 Signal을 전송할 수 있다.
@Slf4j public class Example9_1 { public static void main(String[] args) throws InterruptedException { int tasks = 6; Flux .create((FluxSink<String> sink) -> { IntStream .range(1, tasks) .forEach(n -> sink.next(doTask(n))); }) .subscribeOn(Schedulers.boundedElastic()) .doOnNext(n -> log.info("# create(): {}", n)) .publishOn(Schedulers.parallel()) .map(result -> result + " success!") .doOnNext(n -> log.info("# map(): {}", n)) .publishOn(Schedulers.parallel()) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(500L); } private static String doTask(int taskNumber) { // now tasking. // complete to task. return "task " + taskNumber + " result"; } }
지금은 Sinks에 집중하기 위해, subscribeOn(), publishOn()은 새로운 스레드에서 작업을 수행하게 해준다 정도로 이해하고 넘어가자.
create(): 처리해야 할 작업의 개수만큼 doTask() 메서드를 호출해서 작업을 처리한 후, 결과를 리턴받는다.
이 때, 각 단계별로 별도의 스레드에서 작업하게 된다.
- doTask() 작업을 처리하는 스레드: subscribeOn()에서 지정.
- map() 작업을 처리하는 스레드: 첫 번째 publishOn()에서 지정.
- Subscriber에게 결과를 전달하는 스레드: 두 번째 publishOn()에서 지정.
<결과>
15:20:52.363 [boundedElastic-1] INFO - # create(): task 1 result 15:20:52.370 [boundedElastic-1] INFO - # create(): task 2 result 15:20:52.371 [boundedElastic-1] INFO - # create(): task 3 result 15:20:52.371 [boundedElastic-1] INFO - # create(): task 4 result 15:20:52.371 [boundedElastic-1] INFO - # create(): task 5 result 15:20:52.383 [parallel-2] INFO - # map(): task 1 result success! 15:20:52.383 [parallel-2] INFO - # map(): task 2 result success! 15:20:52.383 [parallel-2] INFO - # map(): task 3 result success! 15:20:52.383 [parallel-1] INFO - # onNext: task 1 result success! 15:20:52.383 [parallel-2] INFO - # map(): task 4 result success! 15:20:52.383 [parallel-1] INFO - # onNext: task 2 result success! 15:20:52.383 [parallel-2] INFO - # map(): task 5 result success! 15:20:52.383 [parallel-1] INFO - # onNext: task 3 result success! 15:20:52.383 [parallel-1] INFO - # onNext: task 4 result success! 15:20:52.383 [parallel-1] INFO - # onNext: task 5 result success!
- doTask() 작업을 처리하는 스레드: boundedElastic-1
- map() 작업을 처리하는 스레드: parallel-2
- Subscriber에게 결과를 전달하는 스레드: parallel-1
-> 이처럼 create Operator를 사용해서 프로그래밍 방식으로 Signal을 전송할 수 있으며, Reactor Sequence를 단계적으로 나누어서 여러 개의 스레드로 처리할 수 있다.
✅ Sinks
만약 doTask()가 호출될 때마다 다른 스레드에서 작업을 처리하고 싶다면?
-> 이 때 적절하게 사용할 수 있는 방식이 Sinks이다.
@Slf4j public class Example9_2 { public static void main(String[] args) throws InterruptedException { int tasks = 6; Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer(); Flux<String> fluxView = unicastSink.asFlux(); IntStream .range(1, tasks) .forEach(n -> { try { new Thread(() -> { unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST); log.info("# emitted: {}", n); }).start(); Thread.sleep(100L); } catch (InterruptedException e) { log.error(e.getMessage()); } }); fluxView .publishOn(Schedulers.parallel()) .map(result -> result + " success!") .doOnNext(n -> log.info("# map(): {}", n)) .publishOn(Schedulers.parallel()) .subscribe(data -> log.info("# onNext: {}", data)); Thread.sleep(200L); } private static String doTask(int taskNumber) { // now tasking. // complete to task. return "task " + taskNumber + " result"; } }
Sinks와 관련된 코드는 뒤에서 설명할 예정이니 지금은 코드의 구조를 몰라도 된다.
<결과>
15:38:16.914 [Thread-0] INFO - # emitted: 1 15:38:16.989 [Thread-1] INFO - # emitted: 2 15:38:17.092 [Thread-2] INFO - # emitted: 3 15:38:17.224 [Thread-3] INFO - # emitted: 4 15:38:17.303 [Thread-4] INFO - # emitted: 5 15:38:17.474 [parallel-2] INFO - # map(): task 1 result success! 15:38:17.475 [parallel-2] INFO - # map(): task 2 result success! 15:38:17.475 [parallel-1] INFO - # onNext: task 1 result success! 15:38:17.475 [parallel-2] INFO - # map(): task 3 result success! 15:38:17.475 [parallel-1] INFO - # onNext: task 2 result success! 15:38:17.475 [parallel-2] INFO - # map(): task 4 result success! 15:38:17.475 [parallel-2] INFO - # map(): task 5 result success! 15:38:17.475 [parallel-1] INFO - # onNext: task 3 result success! 15:38:17.476 [parallel-1] INFO - # onNext: task 4 result success! 15:38:17.476 [parallel-1] INFO - # onNext: task 5 result success!
- doTask() 작업을 처리하는 스레드: doTaks()를 호출할 때마다 새로운 스레드에서 수행 (Thread-0 ~ Thread-4)
- map() 작업을 처리하는 스레드: parallel-2
- Subscriber에게 결과를 전달하는 스레드: parallel-1
-> 이처럼 Sinks는 프로그래밍 방식으로 Signal을 전송할 수 있고, 멀티스레드 환경에서 스레드 안전성을 보장받을 수 있다는 장점이 있다.
Q: 스레드 안전성?
A: 함수나 변수 같은 공유 자원에 동시 접근할 경우에도 프로그램의 실행에 문제가 없음을 의미. Sinks는 동시 접근을 감지하고, 동시 접근하는 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장한다.
Sinks 종류 및 특징
✅ Sinks.One
한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세.
public static <T> Sinks.One<T> one() { return SinksSpecs.DEFAULT_SINKS.one(); }
한 건의 데이터를 프로그래밍 방식으로 emit하는 역할 + Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 하는 Sinks의 스펙.
-> 스펙 DefaultSinksSpec을 사용한다.
@Slf4j public class Example9_4 { public static void main(String[] args) throws InterruptedException { Sinks.One<String> sinkOne = Sinks.one(); Mono<String> mono = sinkOne.asMono(); sinkOne.emitValue("Hello Reactor", FAIL_FAST); sinkOne.emitValue("Hi Reactor", FAIL_FAST); sinkOne.emitValue(null, FAIL_FAST); mono.subscribe(data -> log.info("# Subscriber1 {}", data)); mono.subscribe(data -> log.info("# Subscriber2 {}", data)); } }
Sinks.one(): Sinks.One이라는 기능 명세를 리턴.
emitValue(): 데이터를 emit한다. 두 번째 파라미터는 에러 핸들러로 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 한다.
<결과>
15:57:27.756 [main] DEBUG- onNextDropped: Hi Reactor 15:57:27.758 [main] INFO - # Subscriber1 Hello Reactor 15:57:27.760 [main] INFO - # Subscriber2 Hello Reactor
Sinks.One은 하나의 데이터만 정상적으로 emit하고 나머지 데이터들을 Drop한다.
즉, 처음 emit했던 "Hello Reactor"만 출력되고, Hi Reactor부터는 Drop되었다는 것을 디버그 로그로 출력해 주고 있다.
+) null을 emit하면 별도 조치 없이 무시하는 것 같다.
✅ Sinks.Many
여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세.
public static ManySpec many() { return SinksSpecs.DEFAULT_SINKS.many(); }
-> ManySpec이라는 명세를 리턴한다.
public interface ManySpec { UnicastSpec unicast(); MulticastSpec multicast(); MulticastReplaySpec replay(); }
ManySpec은 총 세 가지 기능을 정의하는데, 이 세 기능은 별도의 Spec을 정의하고 있다.
1. unicast
하나의 Subscriber에게만 데이터를 emit하는 방식.
@Slf4j public class Example9_8 { public static void main(String[] args) throws InterruptedException { Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer(); Flux<Integer> fluxView = unicastSink.asFlux(); unicastSink.emitNext(1, FAIL_FAST); unicastSink.emitNext(2, FAIL_FAST); fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); unicastSink.emitNext(3, FAIL_FAST); // fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); } }
<결과>
16:04:29.946 [main] INFO - # Subscriber1: 1 16:04:29.948 [main] INFO - # Subscriber1: 2 16:04:29.949 [main] INFO - # Subscriber1: 3
-> 여러 데이터를 잘 처리하는 것을 볼 수 있다.
그런데 만약 주석 처리된 마지막 코드를 해제하고 실행한다면?
<결과>
Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
-> UnicastProcessor는 하나의 Subscriber만 허용한다는 메시지의 예외가 발생한다.
2. muticast
여러 Subscriber에게 데이터를 emit하는 방식.
@Slf4j public class Example9_9 { public static void main(String[] args) { Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer(); Flux<Integer> fluxView = multicastSink.asFlux(); multicastSink.emitNext(1, FAIL_FAST); multicastSink.emitNext(2, FAIL_FAST); fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); multicastSink.emitNext(3, FAIL_FAST); } }
<결과>
16:06:58.950 [main] INFO - # Subscriber1: 1 16:06:58.952 [main] INFO - # Subscriber1: 2 16:06:58.953 [main] INFO - # Subscriber1: 3 16:06:58.953 [main] INFO - # Subscriber2: 3
-> 두 Subscriber 모두 데이터를 처리할 수 있다.
Q: 그런데, Subscriber 1은 emit된 세 개의 데이터 모두를 전달받은 반면에, Subscriber 2는 세 번째 데이터만 전달받은 이유?
A: Sinks가 Publisher 역할을 할 경우, 기본적으로 Warm up 특징을 가지는 Hot Sequence로 동작한다. 즉, 첫 번째 구독이 발생한 시점에 처리된 1, 2 데이터를 두 번째 구독이 발생한 시점에는 처리를 하지 않는 것이다.
3. reply
구독 전에 이미 emit된 데이터라도 Subscriber가 전달받을 수 있게 한다.
@Slf4j public class Example9_10 { public static void main(String[] args) { Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2); Flux<Integer> fluxView = replaySink.asFlux(); replaySink.emitNext(1, FAIL_FAST); replaySink.emitNext(2, FAIL_FAST); replaySink.emitNext(3, FAIL_FAST); fluxView.subscribe(data -> log.info("# Subscriber1: {}", data)); replaySink.emitNext(4, FAIL_FAST); fluxView.subscribe(data -> log.info("# Subscriber2: {}", data)); } }
limit(): emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 나중에 emit된 데이터부터 Subscriber에게 전달하는 기능.
<결과>
16:10:16.142 [main] INFO - # Subscriber1: 2 16:10:16.144 [main] INFO - # Subscriber1: 3 16:10:16.144 [main] INFO - # Subscriber1: 4 16:10:16.145 [main] INFO - # Subscriber2: 3 16:10:16.146 [main] INFO - # Subscriber2: 4
첫 번째 Subscriber가 구독한 시점에 이미 세 개의 데이터가 emit되었기 때문에 나중에 emit된 2, 3만 처리하게 된다.
두 번째 Subscriber가 구독한 시점에 이미 숫자 4가 emit되었기 때문에 나중에 emit된 3, 4만 처리하게 된다.
참고자료
https://www.baeldung.com/java-thread-safety
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Context (0) | 2024.05.06 |
---|---|
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
[리액티브 프로그래밍] Backpressure (0) | 2024.04.21 |
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence (0) | 2024.03.18 |
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram) (0) | 2024.03.11 |