스프링으로 시작하는 리액티브 프로그래밍 | 황정식 - 교보문고
스프링으로 시작하는 리액티브 프로그래밍 | *리액티브 프로그래밍의 기본기를 확실하게 다진다*리액티브 프로그래밍은 적은 컴퓨팅 파워로 대량의 요청 트래픽을 효과적으로 처리할 수 있는
product.kyobobook.co.kr
Backpressure란?
✅ Backpressure의 역할
Publisher가 데이터를 emit하는 속도에 비해, Downstream Publisher(Downstream Consumer) 혹은 Subscriber가 데이터를 처리하는 속도가 느리다면 오버플로가 발생하거나 최악의 경우에는 시스템이 다운되는 문제가 발생한다.
-> Backpressure는 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 역할을 한다.
Reactor에서의 Backpressure 처리 방식
데이터 개수 제어
✅ BaseSubscriber
Subscriber의 구현클래스
-> Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청할 수 있다.
@Slf4j public class Example8_1 { public static void main(String[] args) { Flux.range(1, 5) .doOnRequest(data -> log.info("# doOnRequest: {}", data)) .subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); } @SneakyThrows @Override protected void hookOnNext(Integer value) { Thread.sleep(2000L); log.info("# hookOnNext: {}", value); request(1); } }); } }
doOnRequest(): Subscriber가 데이터를 요청할 때마다 동작을 수행한다.
hookOnSubscribe(): onSubscribe()를 대신해 구독 시점에 request() 메서드를 호출해서 최초 데이터 요청 개수를 제어한다.
hookOnNext(): onNext()를 대신해 Publisher가 emit한 데이터를 처리한 후에 request() 메서드를 통해 다시 데이터를 요청한다.
<결과>
00:18:05.129 [main] INFO - # doOnRequest: 1 00:18:07.137 [main] INFO - # hookOnNext: 1 00:18:07.138 [main] INFO - # doOnRequest: 1 00:18:09.142 [main] INFO - # hookOnNext: 2 00:18:09.143 [main] INFO - # doOnRequest: 1 00:18:11.148 [main] INFO - # hookOnNext: 3 00:18:11.149 [main] INFO - # doOnRequest: 1 00:18:13.155 [main] INFO - # hookOnNext: 4 00:18:13.156 [main] INFO - # doOnRequest: 1 00:18:15.157 [main] INFO - # hookOnNext: 5 00:18:15.160 [main] INFO - # doOnRequest: 1
첫 번째 doOnRequest는 hookOnSubscribe()에서 request()를 호출함으로써 출력된 결과이고,
나머지 doOnReqeust는 hookOnNext()에서 request()를 호출함으로써 출력된 결과이다.
Backpressure 전략 사용
✅ Backpressure 전략
Reactor에서 제공하는 Backpressure 전략.
종류 | 설명 |
IGNORE 전략 | Backpressure를 사용하지 않는다. |
ERROR 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략 |
DROP 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시키는 전략 |
LATEST 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략 |
BUFFER 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시키는 전략 |
✅ IGNORE 전략
Backpressure를 적용하지 않는 전략.
✅ ERROR 전략
Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateException을 발생시킨다.
-> 이 경우 Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.
@Slf4j public class Example8_2 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(1L)) .onBackpressureError() .doOnNext(data -> log.info("# doOnNext: {}", data)) .publishOn(Schedulers.parallel()) .subscribe(data -> { try { Thread.sleep(5L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2000L); } }
interval(): 0부터 1씩 증가한 숫자를 1ms에 한번씩 emit한다.
onBackpressureError(): ERROR 전략을 사용하기 위함.
doOnNext(): Publisher가 emit한 데이터를 확인하거나 추가적인 동작을 정의하는 용도로 사용된다. 주로 디버깅 용도로 사용할 수 있다.
publishOn(): Reactor Squence의 일부를 별도의 스레드에서 실행할 수 있도록 해준다.
-> Publisher는 1ms마다 데이터를 emit, Subscriber는 5ms마다 데이터를 처리하도록 세팅하여 Backpressure 전략을 테스트한다.
<결과>
00:27:37.117 [parallel-2] INFO - # doOnNext: 0 00:27:37.168 [parallel-2] INFO - # doOnNext: 1 00:27:37.168 [parallel-2] INFO - # doOnNext: 2 00:27:37.169 [parallel-2] INFO - # doOnNext: 3 00:27:37.169 [parallel-2] INFO - # doOnNext: 4 00:27:37.169 [parallel-2] INFO - # doOnNext: 5 ... 00:27:37.172 [parallel-2] INFO - # doOnNext: 18 00:27:37.175 [parallel-1] INFO - # onNext: 0 00:27:37.174 [parallel-2] INFO - # doOnNext: 19 00:27:37.176 [parallel-2] INFO - # doOnNext: 20 00:27:37.176 [parallel-2] INFO - # doOnNext: 21 ... 00:27:37.182 [parallel-1] INFO - # onNext: 1 00:27:37.183 [parallel-2] INFO - # doOnNext: 67 00:27:37.183 [parallel-2] INFO - # doOnNext: 68 00:27:37.184 [parallel-2] INFO - # doOnNext: 69 00:27:37.185 [parallel-2] INFO - # doOnNext: 70 00:27:37.186 [parallel-2] INFO - # doOnNext: 71 00:27:37.187 [parallel-2] INFO - # doOnNext: 72 00:27:37.188 [parallel-2] INFO - # doOnNext: 73 00:27:37.189 [parallel-1] INFO - # onNext: 2 00:27:37.189 [parallel-2] INFO - # doOnNext: 74 00:27:37.190 [parallel-2] INFO - # doOnNext: 75 00:27:37.191 [parallel-2] INFO - # doOnNext: 76 00:27:37.192 [parallel-2] INFO - # doOnNext: 77 00:27:37.193 [parallel-2] INFO - # doOnNext: 78 00:27:37.194 [parallel-1] INFO - # onNext: 3 00:27:37.195 [parallel-2] INFO - # doOnNext: 79 00:27:37.195 [parallel-2] INFO - # doOnNext: 80 00:27:37.196 [parallel-2] INFO - # doOnNext: 81 00:27:37.197 [parallel-2] INFO - # doOnNext: 82 00:27:37.198 [parallel-2] INFO - # doOnNext: 83 00:27:37.199 [parallel-2] INFO - # doOnNext: 84 00:27:37.200 [parallel-2] INFO - # doOnNext: 85 00:27:37.201 [parallel-1] INFO - # onNext: 4 ... 00:27:37.221 [parallel-2] INFO - # doOnNext: 106 00:27:37.222 [parallel-2] INFO - # doOnNext: 107 00:27:37.223 [parallel-2] INFO - # doOnNext: 108 00:27:37.224 [parallel-2] INFO - # doOnNext: 109 00:27:37.225 [parallel-2] INFO - # doOnNext: 110 00:27:37.226 [parallel-2] INFO - # doOnNext: 111 00:27:37.227 [parallel-2] INFO - # doOnNext: 112 00:27:37.227 [parallel-1] INFO - # onNext: 8 ... 00:27:37.369 [parallel-2] INFO - # doOnNext: 254 00:27:37.370 [parallel-2] INFO - # doOnNext: 255 00:27:37.375 [parallel-1] INFO - # onNext: 31 00:27:37.383 [parallel-1] INFO - # onNext: 32 00:27:37.390 [parallel-1] INFO - # onNext: 33 00:27:37.396 [parallel-1] INFO - # onNext: 34 00:27:37.403 [parallel-1] INFO - # onNext: 35 ... 00:27:38.849 [parallel-1] INFO - # onNext: 249 00:27:38.855 [parallel-1] INFO - # onNext: 250 00:27:38.861 [parallel-1] INFO - # onNext: 251 00:27:38.868 [parallel-1] INFO - # onNext: 252 00:27:38.874 [parallel-1] INFO - # onNext: 253 00:27:38.879 [parallel-1] INFO - # onNext: 254 00:27:38.887 [parallel-1] INFO - # onNext: 255 00:27:38.902 [parallel-1] ERROR- # onError reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220) at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:6739) ...
doOnNext()가 출력한 로그에서는 Publisher가 거의 1ms에 한 번씩 데이터를 emit하는 것을 확인할 수 있다.
onNext()가 출력한 로그에서는 5ms에 한 번씩 로그를 출력하다가 255라는 숫자를 출력하고 OverflowException이 발생했다.
-> OverflowException은 IllegalStateException을 상속한 하위 클래스다.
Q: 왜 255까지만 출력할 수 있는가?
A: publishOn()의 내부 구현을 살펴보자.
public final Flux<T> publishOn(Scheduler scheduler) { return publishOn(scheduler, Queues.SMALL_BUFFER_SIZE); }
여기서 두 번째 인자로 Queues.SMALL_BUFFER_SIZE를 넣어주는데, 이게 256이다.
두 번째 인자는 prefetch라는 필드로, '처음'에 Publisher에게 얼마나 데이터를 요청할 것인지를 세팅할 수 있다.
이 값이 기본 256이므로, publishOn()이 허용할 수 있는 데이터는 0 ~ 255였고, 그 이상의 데이터가 emit될 때 예외가 발생한 것이다.
✅ DROP 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit된 데이터부터 Drop시킨다.

@Slf4j public class Example8_3 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(1L)) .onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped)) .publishOn(Schedulers.parallel()) .subscribe(data -> { try { Thread.sleep(5L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2000L); } }
onBackpressureDrop(): DROP 전략을 사용하기 위함. Drop된 데이터를 파라미터로 전달받을 수 있다.
<결과>
00:54:02.860 [parallel-1] INFO - # onNext: 0 00:54:02.882 [parallel-1] INFO - # onNext: 1 00:54:02.889 [parallel-1] INFO - # onNext: 2 00:54:02.895 [parallel-1] INFO - # onNext: 3 ... 00:54:03.094 [parallel-1] INFO - # onNext: 34 00:54:03.100 [parallel-1] INFO - # onNext: 35 00:54:03.106 [parallel-1] INFO - # onNext: 36 00:54:03.109 [parallel-2] INFO - # dropped: 256 // 첫 번째 Drop 구간 start 00:54:03.110 [parallel-2] INFO - # dropped: 257 ... 00:54:04.111 [parallel-2] INFO - # dropped: 1258 00:54:04.112 [parallel-2] INFO - # dropped: 1259 00:54:04.113 [parallel-2] INFO - # dropped: 1260 00:54:04.114 [parallel-2] INFO - # dropped: 1261 00:54:04.115 [parallel-2] INFO - # dropped: 1262 00:54:04.116 [parallel-2] INFO - # dropped: 1263 00:54:04.117 [parallel-1] INFO - # onNext: 191 00:54:04.117 [parallel-2] INFO - # dropped: 1264 // 첫 번째 Drop 구간 end ... 00:54:04.295 [parallel-1] INFO - # onNext: 219 00:54:04.301 [parallel-1] INFO - # onNext: 220 00:54:04.306 [parallel-1] INFO - # onNext: 221 00:54:04.310 [parallel-2] INFO - # dropped: 1457 00:54:04.311 [parallel-2] INFO - # dropped: 1458 00:54:04.312 [parallel-2] INFO - # dropped: 1459 00:54:04.312 [parallel-1] INFO - # onNext: 222 00:54:04.313 [parallel-2] INFO - # dropped: 1460 // 두 번째 Drop 구간 start 00:54:04.314 [parallel-2] INFO - # dropped: 1461 ... 00:54:04.522 [parallel-2] INFO - # dropped: 1669 00:54:04.523 [parallel-2] INFO - # dropped: 1670 00:54:04.524 [parallel-2] INFO - # dropped: 1671 00:54:04.525 [parallel-1] INFO - # onNext: 1265 // 1265 부터 처리 ... 00:54:04.852 [parallel-2] INFO - # dropped: 1999 00:54:04.853 [parallel-2] INFO - # dropped: 2000 00:54:04.854 [parallel-1] INFO - # onNext: 1318 00:54:04.854 [parallel-2] INFO - # dropped: 2001 00:54:04.855 [parallel-2] INFO - # dropped: 2002 00:54:04.856 [parallel-2] INFO - # dropped: 2003 00:54:04.857 [parallel-2] INFO - # dropped: 2004 // 두 번째 Drop 구간 end
첫 번째 Drop 구간에서 Drop이 시작되는 데이터는 숫자 256이고, Drop이 끝나는 데이터는 숫자 1264이다.
이 구간 동안에는 버퍼가 가득 차 있는 상태임을 알 수 있다.
숫자 1264까지 Drop되기 때문에 Subscriber 쪽에서는 숫자 1265부터 전달받아 처리하는 것을 볼 수 있다.
두 번째 Drop 구간에서 Drop이 시작되는 데이터는 숫자 1460인 것으로 보아 Subscriber 쪽에서는 숫자 1459까지 처리한다고 예상할 수 있다.
-> DROP 전략을 적용하면 버퍼가 가득 찬 상태에서는 '버퍼가 비워질 때까지' 데이터를 Drop한다.
-> 기본 전략은 이렇지만 prefetch를 사용하면 동작 방식이 살짝 다르다. (아래 Q&A 참고)
Q: '버퍼가 비워질 때까지' 데이터를 Drop한다면, Subscriber가 두 번째로 처리하는 데이터의 개수도 256개여야 하는데, 실제로는 1265 ~ 1459 데이터만 처리한다. 왜 256개가 아닌, 195개만 데이터를 처리하는가?
A: prefetch는 Downstream이 prefetch의 75% 정도를 수행했다면, prefetch의 75% 정도를 다시 Upstream에게 미리 요청하는 Replenishing Optimization 전략을 사용한다. 즉, size 256인 prefetch가 어느 정도 작업을 수행했다면, 약 75%인 195만큼의 데이터를 추가로 request하는 것이다.
✅ LATEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략.
Q: DROP 전략과의 차이?
A: 버퍼가 가득 찰 경우, emit된 데이터를 '나'라고 했을 때,
DROP 전략: 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 Drop -> 나 자신을 폐기한다.
LATEST 전략: 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨주고 나머지 데이터를 폐기한다 -> 내 앞에 누군가를 폐기한다.

-> 그림 상으로는 한꺼번에 폐기되는 것처럼 표현했지만, 실제로는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터가 폐기된다.
@Slf4j public class Example8_4 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(1L)) .onBackpressureLatest() .publishOn(Schedulers.parallel()) .subscribe(data -> { try { Thread.sleep(5L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2000L); } }
onBackpressueLatest(): LATEST 전략을 사용하기 위함.
<결과>
01:09:40.819 [parallel-1] INFO - # onNext: 0 01:09:40.838 [parallel-1] INFO - # onNext: 1 01:09:40.844 [parallel-1] INFO - # onNext: 2 01:09:40.850 [parallel-1] INFO - # onNext: 3 01:09:40.857 [parallel-1] INFO - # onNext: 4 01:09:40.863 [parallel-1] INFO - # onNext: 5 01:09:40.869 [parallel-1] INFO - # onNext: 6 01:09:40.876 [parallel-1] INFO - # onNext: 7 01:09:40.882 [parallel-1] INFO - # onNext: 8 ... 01:09:42.499 [parallel-1] INFO - # onNext: 249 01:09:42.506 [parallel-1] INFO - # onNext: 250 01:09:42.512 [parallel-1] INFO - # onNext: 251 01:09:42.518 [parallel-1] INFO - # onNext: 252 01:09:42.524 [parallel-1] INFO - # onNext: 253 01:09:42.530 [parallel-1] INFO - # onNext: 254 01:09:42.538 [parallel-1] INFO - # onNext: 255 01:09:42.547 [parallel-1] INFO - # onNext: 1320 // 버퍼가 비워지는 동안 최근 emit된 데이터 01:09:42.554 [parallel-1] INFO - # onNext: 1321 01:09:42.561 [parallel-1] INFO - # onNext: 1322 01:09:42.567 [parallel-1] INFO - # onNext: 1323 01:09:42.574 [parallel-1] INFO - # onNext: 1324 ... 01:09:42.795 [parallel-1] INFO - # onNext: 1361 01:09:42.801 [parallel-1] INFO - # onNext: 1362 01:09:42.807 [parallel-1] INFO - # onNext: 1363 01:09:42.814 [parallel-1] INFO - # onNext: 1364
Subscriber가 숫자 255를 출력하고 곧바로 숫자 1320을 출력한다.
이는 버퍼가 가득 찼다가 버퍼가 다 비워졌을 때 가장 최근에 emit된 데이터가 1320이기 때문이다.
✅ BUFFER 전략
버퍼가 가득 찼을 때, 버퍼의 데이터를 폐기하지 않는 전략, 버퍼의 데이터를 폐기하는 전략, 에러를 발생시키는 전략과 같이 다양하다.
이번에는 버퍼 내 데이터를 폐기하는 전략을 알아보겠다.
1. DROP_LATEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략.

@Slf4j public class Example8_5 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(300L)) .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) .onBackpressureBuffer(2, dropped -> log.info("** Overflow & Dropped: {} **", dropped), BufferOverflowStrategy.DROP_LATEST) .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) .publishOn(Schedulers.parallel(), false, 1) .subscribe(data -> { try { Thread.sleep(1000L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2500L); } }
onBackpressureBuffer(): BUFFER 전략을 사용하기 위함.
- 첫 번째 파라미터: 버퍼의 최대 용량. 여기서는 2로 설정.
- 두 번째 파라미터: 오버플로가 발생했을 때, Drop되는 데이터를 전달받아 후처리를 할 수 있다.
- 세 번째 파라미터: Backpressure 전략. 여기서는 DROP_LATEST 전략.
publishOn(): prefetch를 1로 설정, 한 번에 1개씩만 데이터를 요청.
<결과>
01:20:21.404 [parallel-2] INFO - # emitted by original Flux: 0 // buffer: [0] 01:20:21.463 [parallel-2] INFO - [ # emitted by Buffer: 0 ] // buffer: [] 01:20:21.704 [parallel-2] INFO - # emitted by original Flux: 1 // buffer: [1] 01:20:22.004 [parallel-2] INFO - # emitted by original Flux: 2 // buffer: [2, 1] 01:20:22.301 [parallel-2] INFO - # emitted by original Flux: 3 // buffer: [3, 2, 1] 01:20:22.311 [parallel-2] INFO - ** Overflow & Dropped: 3 ** // buffer: [2, 1] 01:20:22.472 [parallel-1] INFO - # onNext: 0 01:20:22.474 [parallel-1] INFO - [ # emitted by Buffer: 1 ] // buffer: [2] 01:20:22.604 [parallel-2] INFO - # emitted by original Flux: 4 // buffer: [4, 2] 01:20:22.904 [parallel-2] INFO - # emitted by original Flux: 5 // buffer: [5, 4, 2] 01:20:22.905 [parallel-2] INFO - ** Overflow & Dropped: 5 ** // buffer: [4, 2] 01:20:23.203 [parallel-2] INFO - # emitted by original Flux: 6 // buffer: [6, 4, 2] 01:20:23.204 [parallel-2] INFO - ** Overflow & Dropped: 6 ** // buffer: [4, 2] 01:20:23.480 [parallel-1] INFO - # onNext: 1 01:20:23.484 [parallel-1] INFO - [ # emitted by Buffer: 2 ] // buffer: [4] 01:20:23.503 [parallel-2] INFO - # emitted by original Flux: 7 // buffer: [7, 4]
-> 버퍼의 데이터 상태는 주석을 참고.
2. BUFFER DROP_OLDEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략.

@Slf4j public class Example8_6 { public static void main(String[] args) throws InterruptedException { Flux .interval(Duration.ofMillis(300L)) .doOnNext(data -> log.info("# emitted by original Flux: {}", data)) .onBackpressureBuffer(2, dropped -> log.info("** Overflow & Dropped: {} **", dropped), BufferOverflowStrategy.DROP_OLDEST) .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) .publishOn(Schedulers.parallel(), false, 1) .subscribe(data -> { try { Thread.sleep(1000L); } catch (InterruptedException e) {} log.info("# onNext: {}", data); }, error -> log.error("# onError", error)); Thread.sleep(2500L); } }
onBackpressureBuffer(): BUFFER 전략을 사용하기 위함
-> 이번엔 DROP_OLDEST 전략 사용.
<결과>
15:11:27.612 [parallel-2] INFO - # emitted by original Flux: 0 // buffer: [0] 15:11:27.676 [parallel-2] INFO - [ # emitted by Buffer: 0 ] // buffer: [] 15:11:27.908 [parallel-2] INFO - # emitted by original Flux: 1 // buffer: [1] 15:11:28.206 [parallel-2] INFO - # emitted by original Flux: 2 // buffer: [2, 1] 15:11:28.510 [parallel-2] INFO - # emitted by original Flux: 3 // buffer: [3, 2, 1] 15:11:28.519 [parallel-2] INFO - ** Overflow & Dropped: 1 ** // buffer: [3, 2] 15:11:28.688 [parallel-1] INFO - # onNext: 0 15:11:28.689 [parallel-1] INFO - [ # emitted by Buffer: 2 ] // buffer: [3] 15:11:28.810 [parallel-2] INFO - # emitted by original Flux: 4 // buffer: [4, 3] 15:11:29.106 [parallel-2] INFO - # emitted by original Flux: 5 // buffer: [5, 4, 3] 15:11:29.108 [parallel-2] INFO - ** Overflow & Dropped: 3 ** // buffer: [5, 4] 15:11:29.406 [parallel-2] INFO - # emitted by original Flux: 6 // buffer: [6, 5, 4] 15:11:29.409 [parallel-2] INFO - ** Overflow & Dropped: 4 ** // buffer: [6, 5] 15:11:29.695 [parallel-1] INFO - # onNext: 2 15:11:29.698 [parallel-1] INFO - [ # emitted by Buffer: 5 ] // buffer: [6] 15:11:29.707 [parallel-2] INFO - # emitted by original Flux: 7 // buffer: [7, 6]
-> 버퍼의 데이터 상태는 주석을 참고.
참고자료
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
---|---|
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence (0) | 2024.03.18 |
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram) (0) | 2024.03.11 |
[리액티브 프로그래밍] Reactor 개요 (0) | 2024.03.11 |