Sinks란?
✅ Processor
리액티브 스트림즈의 구성요소 중 하나인, Publisher와 Subscriber의 기능을 모두 지닌 Processor는 Reactor 3.5.0부터 제거되었다.
대신 Processor의 기능을 개선한 Sinks가 3.4.0부터 지원되기 시작했다.
Sinks는 Signal을 프로그래밍 방식으로 전송할 수 있는 구조이다.
-> Reactor에서 프로그래밍 방식으로 Signal을 전송하는 방법은 전통적인 방식과 Sinks를 사용하는 방식이 있는데, 이를 비교해보자.
✅ 전통적인 방식
generate()나 create() Operator를 사용해서 프로그래밍 방식으로 Signal을 전송할 수 있다.
@Slf4j
public class Example9_1 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(n -> log.info("# create(): {}", n))
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
// now tasking.
// complete to task.
return "task " + taskNumber + " result";
}
}
지금은 Sinks에 집중하기 위해, subscribeOn(), publishOn()은 새로운 스레드에서 작업을 수행하게 해준다 정도로 이해하고 넘어가자.
create(): 처리해야 할 작업의 개수만큼 doTask() 메서드를 호출해서 작업을 처리한 후, 결과를 리턴받는다.
이 때, 각 단계별로 별도의 스레드에서 작업하게 된다.
- doTask() 작업을 처리하는 스레드: subscribeOn()에서 지정.
- map() 작업을 처리하는 스레드: 첫 번째 publishOn()에서 지정.
- Subscriber에게 결과를 전달하는 스레드: 두 번째 publishOn()에서 지정.
<결과>
15:20:52.363 [boundedElastic-1] INFO - # create(): task 1 result
15:20:52.370 [boundedElastic-1] INFO - # create(): task 2 result
15:20:52.371 [boundedElastic-1] INFO - # create(): task 3 result
15:20:52.371 [boundedElastic-1] INFO - # create(): task 4 result
15:20:52.371 [boundedElastic-1] INFO - # create(): task 5 result
15:20:52.383 [parallel-2] INFO - # map(): task 1 result success!
15:20:52.383 [parallel-2] INFO - # map(): task 2 result success!
15:20:52.383 [parallel-2] INFO - # map(): task 3 result success!
15:20:52.383 [parallel-1] INFO - # onNext: task 1 result success!
15:20:52.383 [parallel-2] INFO - # map(): task 4 result success!
15:20:52.383 [parallel-1] INFO - # onNext: task 2 result success!
15:20:52.383 [parallel-2] INFO - # map(): task 5 result success!
15:20:52.383 [parallel-1] INFO - # onNext: task 3 result success!
15:20:52.383 [parallel-1] INFO - # onNext: task 4 result success!
15:20:52.383 [parallel-1] INFO - # onNext: task 5 result success!
- doTask() 작업을 처리하는 스레드: boundedElastic-1
- map() 작업을 처리하는 스레드: parallel-2
- Subscriber에게 결과를 전달하는 스레드: parallel-1
-> 이처럼 create Operator를 사용해서 프로그래밍 방식으로 Signal을 전송할 수 있으며, Reactor Sequence를 단계적으로 나누어서 여러 개의 스레드로 처리할 수 있다.
✅ Sinks
만약 doTask()가 호출될 때마다 다른 스레드에서 작업을 처리하고 싶다면?
-> 이 때 적절하게 사용할 수 있는 방식이 Sinks이다.
@Slf4j
public class Example9_2 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> {
try {
new Thread(() -> {
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted: {}", n);
}).start();
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
fluxView
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
private static String doTask(int taskNumber) {
// now tasking.
// complete to task.
return "task " + taskNumber + " result";
}
}
Sinks와 관련된 코드는 뒤에서 설명할 예정이니 지금은 코드의 구조를 몰라도 된다.
<결과>
15:38:16.914 [Thread-0] INFO - # emitted: 1
15:38:16.989 [Thread-1] INFO - # emitted: 2
15:38:17.092 [Thread-2] INFO - # emitted: 3
15:38:17.224 [Thread-3] INFO - # emitted: 4
15:38:17.303 [Thread-4] INFO - # emitted: 5
15:38:17.474 [parallel-2] INFO - # map(): task 1 result success!
15:38:17.475 [parallel-2] INFO - # map(): task 2 result success!
15:38:17.475 [parallel-1] INFO - # onNext: task 1 result success!
15:38:17.475 [parallel-2] INFO - # map(): task 3 result success!
15:38:17.475 [parallel-1] INFO - # onNext: task 2 result success!
15:38:17.475 [parallel-2] INFO - # map(): task 4 result success!
15:38:17.475 [parallel-2] INFO - # map(): task 5 result success!
15:38:17.475 [parallel-1] INFO - # onNext: task 3 result success!
15:38:17.476 [parallel-1] INFO - # onNext: task 4 result success!
15:38:17.476 [parallel-1] INFO - # onNext: task 5 result success!
- doTask() 작업을 처리하는 스레드: doTaks()를 호출할 때마다 새로운 스레드에서 수행 (Thread-0 ~ Thread-4)
- map() 작업을 처리하는 스레드: parallel-2
- Subscriber에게 결과를 전달하는 스레드: parallel-1
-> 이처럼 Sinks는 프로그래밍 방식으로 Signal을 전송할 수 있고, 멀티스레드 환경에서 스레드 안전성을 보장받을 수 있다는 장점이 있다.
Q: 스레드 안전성?
A: 함수나 변수 같은 공유 자원에 동시 접근할 경우에도 프로그램의 실행에 문제가 없음을 의미. Sinks는 동시 접근을 감지하고, 동시 접근하는 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장한다.
Sinks 종류 및 특징
✅ Sinks.One
한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세.
public static <T> Sinks.One<T> one() {
return SinksSpecs.DEFAULT_SINKS.one();
}
한 건의 데이터를 프로그래밍 방식으로 emit하는 역할 + Mono 방식으로 Subscriber가 데이터를 소비할 수 있도록 하는 Sinks의 스펙.
-> 스펙 DefaultSinksSpec을 사용한다.
@Slf4j
public class Example9_4 {
public static void main(String[] args) throws InterruptedException {
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", FAIL_FAST);
sinkOne.emitValue("Hi Reactor", FAIL_FAST);
sinkOne.emitValue(null, FAIL_FAST);
mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));
}
}
Sinks.one(): Sinks.One이라는 기능 명세를 리턴.
emitValue(): 데이터를 emit한다. 두 번째 파라미터는 에러 핸들러로 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리를 한다.
<결과>
15:57:27.756 [main] DEBUG- onNextDropped: Hi Reactor
15:57:27.758 [main] INFO - # Subscriber1 Hello Reactor
15:57:27.760 [main] INFO - # Subscriber2 Hello Reactor
Sinks.One은 하나의 데이터만 정상적으로 emit하고 나머지 데이터들을 Drop한다.
즉, 처음 emit했던 "Hello Reactor"만 출력되고, Hi Reactor부터는 Drop되었다는 것을 디버그 로그로 출력해 주고 있다.
+) null을 emit하면 별도 조치 없이 무시하는 것 같다.
✅ Sinks.Many
여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세.
public static ManySpec many() {
return SinksSpecs.DEFAULT_SINKS.many();
}
-> ManySpec이라는 명세를 리턴한다.
public interface ManySpec {
UnicastSpec unicast();
MulticastSpec multicast();
MulticastReplaySpec replay();
}
ManySpec은 총 세 가지 기능을 정의하는데, 이 세 기능은 별도의 Spec을 정의하고 있다.
1. unicast
하나의 Subscriber에게만 데이터를 emit하는 방식.
@Slf4j
public class Example9_8 {
public static void main(String[] args) throws InterruptedException {
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
unicastSink.emitNext(3, FAIL_FAST);
// fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
}
<결과>
16:04:29.946 [main] INFO - # Subscriber1: 1
16:04:29.948 [main] INFO - # Subscriber1: 2
16:04:29.949 [main] INFO - # Subscriber1: 3
-> 여러 데이터를 잘 처리하는 것을 볼 수 있다.
그런데 만약 주석 처리된 마지막 코드를 해제하고 실행한다면?
<결과>
Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
-> UnicastProcessor는 하나의 Subscriber만 허용한다는 메시지의 예외가 발생한다.
2. muticast
여러 Subscriber에게 데이터를 emit하는 방식.
@Slf4j
public class Example9_9 {
public static void main(String[] args) {
Sinks.Many<Integer> multicastSink =
Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
multicastSink.emitNext(3, FAIL_FAST);
}
}
<결과>
16:06:58.950 [main] INFO - # Subscriber1: 1
16:06:58.952 [main] INFO - # Subscriber1: 2
16:06:58.953 [main] INFO - # Subscriber1: 3
16:06:58.953 [main] INFO - # Subscriber2: 3
-> 두 Subscriber 모두 데이터를 처리할 수 있다.
Q: 그런데, Subscriber 1은 emit된 세 개의 데이터 모두를 전달받은 반면에, Subscriber 2는 세 번째 데이터만 전달받은 이유?
A: Sinks가 Publisher 역할을 할 경우, 기본적으로 Warm up 특징을 가지는 Hot Sequence로 동작한다. 즉, 첫 번째 구독이 발생한 시점에 처리된 1, 2 데이터를 두 번째 구독이 발생한 시점에는 처리를 하지 않는 것이다.
3. reply
구독 전에 이미 emit된 데이터라도 Subscriber가 전달받을 수 있게 한다.
@Slf4j
public class Example9_10 {
public static void main(String[] args) {
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
replaySink.emitNext(4, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
}
limit(): emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 나중에 emit된 데이터부터 Subscriber에게 전달하는 기능.
<결과>
16:10:16.142 [main] INFO - # Subscriber1: 2
16:10:16.144 [main] INFO - # Subscriber1: 3
16:10:16.144 [main] INFO - # Subscriber1: 4
16:10:16.145 [main] INFO - # Subscriber2: 3
16:10:16.146 [main] INFO - # Subscriber2: 4
첫 번째 Subscriber가 구독한 시점에 이미 세 개의 데이터가 emit되었기 때문에 나중에 emit된 2, 3만 처리하게 된다.
두 번째 Subscriber가 구독한 시점에 이미 숫자 4가 emit되었기 때문에 나중에 emit된 3, 4만 처리하게 된다.
참고자료
https://www.baeldung.com/java-thread-safety
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Context (0) | 2024.05.06 |
---|---|
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
[리액티브 프로그래밍] Backpressure (0) | 2024.04.21 |
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence (0) | 2024.03.18 |
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram) (0) | 2024.03.11 |