Backpressure란?
✅ Backpressure의 역할
Publisher가 데이터를 emit하는 속도에 비해, Downstream Publisher(Downstream Consumer) 혹은 Subscriber가 데이터를 처리하는 속도가 느리다면 오버플로가 발생하거나 최악의 경우에는 시스템이 다운되는 문제가 발생한다.
-> Backpressure는 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 역할을 한다.
Reactor에서의 Backpressure 처리 방식
데이터 개수 제어
✅ BaseSubscriber
Subscriber의 구현클래스
-> Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청할 수 있다.
@Slf4j
public class Example8_1 {
public static void main(String[] args) {
Flux.range(1, 5)
.doOnRequest(data -> log.info("# doOnRequest: {}", data))
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
Thread.sleep(2000L);
log.info("# hookOnNext: {}", value);
request(1);
}
});
}
}
doOnRequest(): Subscriber가 데이터를 요청할 때마다 동작을 수행한다.
hookOnSubscribe(): onSubscribe()를 대신해 구독 시점에 request() 메서드를 호출해서 최초 데이터 요청 개수를 제어한다.
hookOnNext(): onNext()를 대신해 Publisher가 emit한 데이터를 처리한 후에 request() 메서드를 통해 다시 데이터를 요청한다.
<결과>
00:18:05.129 [main] INFO - # doOnRequest: 1
00:18:07.137 [main] INFO - # hookOnNext: 1
00:18:07.138 [main] INFO - # doOnRequest: 1
00:18:09.142 [main] INFO - # hookOnNext: 2
00:18:09.143 [main] INFO - # doOnRequest: 1
00:18:11.148 [main] INFO - # hookOnNext: 3
00:18:11.149 [main] INFO - # doOnRequest: 1
00:18:13.155 [main] INFO - # hookOnNext: 4
00:18:13.156 [main] INFO - # doOnRequest: 1
00:18:15.157 [main] INFO - # hookOnNext: 5
00:18:15.160 [main] INFO - # doOnRequest: 1
첫 번째 doOnRequest는 hookOnSubscribe()에서 request()를 호출함으로써 출력된 결과이고,
나머지 doOnReqeust는 hookOnNext()에서 request()를 호출함으로써 출력된 결과이다.
Backpressure 전략 사용
✅ Backpressure 전략
Reactor에서 제공하는 Backpressure 전략.
종류 | 설명 |
IGNORE 전략 | Backpressure를 사용하지 않는다. |
ERROR 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략 |
DROP 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시키는 전략 |
LATEST 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략 |
BUFFER 전략 | Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시키는 전략 |
✅ IGNORE 전략
Backpressure를 적용하지 않는 전략.
✅ ERROR 전략
Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateException을 발생시킨다.
-> 이 경우 Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.
@Slf4j
public class Example8_2 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureError()
.doOnNext(data -> log.info("# doOnNext: {}", data))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
interval(): 0부터 1씩 증가한 숫자를 1ms에 한번씩 emit한다.
onBackpressureError(): ERROR 전략을 사용하기 위함.
doOnNext(): Publisher가 emit한 데이터를 확인하거나 추가적인 동작을 정의하는 용도로 사용된다. 주로 디버깅 용도로 사용할 수 있다.
publishOn(): Reactor Squence의 일부를 별도의 스레드에서 실행할 수 있도록 해준다.
-> Publisher는 1ms마다 데이터를 emit, Subscriber는 5ms마다 데이터를 처리하도록 세팅하여 Backpressure 전략을 테스트한다.
<결과>
00:27:37.117 [parallel-2] INFO - # doOnNext: 0
00:27:37.168 [parallel-2] INFO - # doOnNext: 1
00:27:37.168 [parallel-2] INFO - # doOnNext: 2
00:27:37.169 [parallel-2] INFO - # doOnNext: 3
00:27:37.169 [parallel-2] INFO - # doOnNext: 4
00:27:37.169 [parallel-2] INFO - # doOnNext: 5
...
00:27:37.172 [parallel-2] INFO - # doOnNext: 18
00:27:37.175 [parallel-1] INFO - # onNext: 0
00:27:37.174 [parallel-2] INFO - # doOnNext: 19
00:27:37.176 [parallel-2] INFO - # doOnNext: 20
00:27:37.176 [parallel-2] INFO - # doOnNext: 21
...
00:27:37.182 [parallel-1] INFO - # onNext: 1
00:27:37.183 [parallel-2] INFO - # doOnNext: 67
00:27:37.183 [parallel-2] INFO - # doOnNext: 68
00:27:37.184 [parallel-2] INFO - # doOnNext: 69
00:27:37.185 [parallel-2] INFO - # doOnNext: 70
00:27:37.186 [parallel-2] INFO - # doOnNext: 71
00:27:37.187 [parallel-2] INFO - # doOnNext: 72
00:27:37.188 [parallel-2] INFO - # doOnNext: 73
00:27:37.189 [parallel-1] INFO - # onNext: 2
00:27:37.189 [parallel-2] INFO - # doOnNext: 74
00:27:37.190 [parallel-2] INFO - # doOnNext: 75
00:27:37.191 [parallel-2] INFO - # doOnNext: 76
00:27:37.192 [parallel-2] INFO - # doOnNext: 77
00:27:37.193 [parallel-2] INFO - # doOnNext: 78
00:27:37.194 [parallel-1] INFO - # onNext: 3
00:27:37.195 [parallel-2] INFO - # doOnNext: 79
00:27:37.195 [parallel-2] INFO - # doOnNext: 80
00:27:37.196 [parallel-2] INFO - # doOnNext: 81
00:27:37.197 [parallel-2] INFO - # doOnNext: 82
00:27:37.198 [parallel-2] INFO - # doOnNext: 83
00:27:37.199 [parallel-2] INFO - # doOnNext: 84
00:27:37.200 [parallel-2] INFO - # doOnNext: 85
00:27:37.201 [parallel-1] INFO - # onNext: 4
...
00:27:37.221 [parallel-2] INFO - # doOnNext: 106
00:27:37.222 [parallel-2] INFO - # doOnNext: 107
00:27:37.223 [parallel-2] INFO - # doOnNext: 108
00:27:37.224 [parallel-2] INFO - # doOnNext: 109
00:27:37.225 [parallel-2] INFO - # doOnNext: 110
00:27:37.226 [parallel-2] INFO - # doOnNext: 111
00:27:37.227 [parallel-2] INFO - # doOnNext: 112
00:27:37.227 [parallel-1] INFO - # onNext: 8
...
00:27:37.369 [parallel-2] INFO - # doOnNext: 254
00:27:37.370 [parallel-2] INFO - # doOnNext: 255
00:27:37.375 [parallel-1] INFO - # onNext: 31
00:27:37.383 [parallel-1] INFO - # onNext: 32
00:27:37.390 [parallel-1] INFO - # onNext: 33
00:27:37.396 [parallel-1] INFO - # onNext: 34
00:27:37.403 [parallel-1] INFO - # onNext: 35
...
00:27:38.849 [parallel-1] INFO - # onNext: 249
00:27:38.855 [parallel-1] INFO - # onNext: 250
00:27:38.861 [parallel-1] INFO - # onNext: 251
00:27:38.868 [parallel-1] INFO - # onNext: 252
00:27:38.874 [parallel-1] INFO - # onNext: 253
00:27:38.879 [parallel-1] INFO - # onNext: 254
00:27:38.887 [parallel-1] INFO - # onNext: 255
00:27:38.902 [parallel-1] ERROR- # onError
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:220)
at reactor.core.publisher.Flux.lambda$onBackpressureError$27(Flux.java:6739)
...
doOnNext()가 출력한 로그에서는 Publisher가 거의 1ms에 한 번씩 데이터를 emit하는 것을 확인할 수 있다.
onNext()가 출력한 로그에서는 5ms에 한 번씩 로그를 출력하다가 255라는 숫자를 출력하고 OverflowException이 발생했다.
-> OverflowException은 IllegalStateException을 상속한 하위 클래스다.
Q: 왜 255까지만 출력할 수 있는가?
A: publishOn()의 내부 구현을 살펴보자.
public final Flux<T> publishOn(Scheduler scheduler) {
return publishOn(scheduler, Queues.SMALL_BUFFER_SIZE);
}
여기서 두 번째 인자로 Queues.SMALL_BUFFER_SIZE를 넣어주는데, 이게 256이다.
두 번째 인자는 prefetch라는 필드로, '처음'에 Publisher에게 얼마나 데이터를 요청할 것인지를 세팅할 수 있다.
이 값이 기본 256이므로, publishOn()이 허용할 수 있는 데이터는 0 ~ 255였고, 그 이상의 데이터가 emit될 때 예외가 발생한 것이다.
✅ DROP 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit된 데이터부터 Drop시킨다.
@Slf4j
public class Example8_3 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
onBackpressureDrop(): DROP 전략을 사용하기 위함. Drop된 데이터를 파라미터로 전달받을 수 있다.
<결과>
00:54:02.860 [parallel-1] INFO - # onNext: 0
00:54:02.882 [parallel-1] INFO - # onNext: 1
00:54:02.889 [parallel-1] INFO - # onNext: 2
00:54:02.895 [parallel-1] INFO - # onNext: 3
...
00:54:03.094 [parallel-1] INFO - # onNext: 34
00:54:03.100 [parallel-1] INFO - # onNext: 35
00:54:03.106 [parallel-1] INFO - # onNext: 36
00:54:03.109 [parallel-2] INFO - # dropped: 256 // 첫 번째 Drop 구간 start
00:54:03.110 [parallel-2] INFO - # dropped: 257
...
00:54:04.111 [parallel-2] INFO - # dropped: 1258
00:54:04.112 [parallel-2] INFO - # dropped: 1259
00:54:04.113 [parallel-2] INFO - # dropped: 1260
00:54:04.114 [parallel-2] INFO - # dropped: 1261
00:54:04.115 [parallel-2] INFO - # dropped: 1262
00:54:04.116 [parallel-2] INFO - # dropped: 1263
00:54:04.117 [parallel-1] INFO - # onNext: 191
00:54:04.117 [parallel-2] INFO - # dropped: 1264 // 첫 번째 Drop 구간 end
...
00:54:04.295 [parallel-1] INFO - # onNext: 219
00:54:04.301 [parallel-1] INFO - # onNext: 220
00:54:04.306 [parallel-1] INFO - # onNext: 221
00:54:04.310 [parallel-2] INFO - # dropped: 1457
00:54:04.311 [parallel-2] INFO - # dropped: 1458
00:54:04.312 [parallel-2] INFO - # dropped: 1459
00:54:04.312 [parallel-1] INFO - # onNext: 222
00:54:04.313 [parallel-2] INFO - # dropped: 1460 // 두 번째 Drop 구간 start
00:54:04.314 [parallel-2] INFO - # dropped: 1461
...
00:54:04.522 [parallel-2] INFO - # dropped: 1669
00:54:04.523 [parallel-2] INFO - # dropped: 1670
00:54:04.524 [parallel-2] INFO - # dropped: 1671
00:54:04.525 [parallel-1] INFO - # onNext: 1265 // 1265 부터 처리
...
00:54:04.852 [parallel-2] INFO - # dropped: 1999
00:54:04.853 [parallel-2] INFO - # dropped: 2000
00:54:04.854 [parallel-1] INFO - # onNext: 1318
00:54:04.854 [parallel-2] INFO - # dropped: 2001
00:54:04.855 [parallel-2] INFO - # dropped: 2002
00:54:04.856 [parallel-2] INFO - # dropped: 2003
00:54:04.857 [parallel-2] INFO - # dropped: 2004 // 두 번째 Drop 구간 end
첫 번째 Drop 구간에서 Drop이 시작되는 데이터는 숫자 256이고, Drop이 끝나는 데이터는 숫자 1264이다.
이 구간 동안에는 버퍼가 가득 차 있는 상태임을 알 수 있다.
숫자 1264까지 Drop되기 때문에 Subscriber 쪽에서는 숫자 1265부터 전달받아 처리하는 것을 볼 수 있다.
두 번째 Drop 구간에서 Drop이 시작되는 데이터는 숫자 1460인 것으로 보아 Subscriber 쪽에서는 숫자 1459까지 처리한다고 예상할 수 있다.
-> DROP 전략을 적용하면 버퍼가 가득 찬 상태에서는 '버퍼가 비워질 때까지' 데이터를 Drop한다.
-> 기본 전략은 이렇지만 prefetch를 사용하면 동작 방식이 살짝 다르다. (아래 Q&A 참고)
Q: '버퍼가 비워질 때까지' 데이터를 Drop한다면, Subscriber가 두 번째로 처리하는 데이터의 개수도 256개여야 하는데, 실제로는 1265 ~ 1459 데이터만 처리한다. 왜 256개가 아닌, 195개만 데이터를 처리하는가?
A: prefetch는 Downstream이 prefetch의 75% 정도를 수행했다면, prefetch의 75% 정도를 다시 Upstream에게 미리 요청하는 Replenishing Optimization 전략을 사용한다. 즉, size 256인 prefetch가 어느 정도 작업을 수행했다면, 약 75%인 195만큼의 데이터를 추가로 request하는 것이다.
✅ LATEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략.
Q: DROP 전략과의 차이?
A: 버퍼가 가득 찰 경우, emit된 데이터를 '나'라고 했을 때,
DROP 전략: 버퍼 밖에서 대기 중인 데이터를 하나씩 차례대로 Drop -> 나 자신을 폐기한다.
LATEST 전략: 새로운 데이터가 들어오는 시점에 가장 최근의 데이터만 남겨주고 나머지 데이터를 폐기한다 -> 내 앞에 누군가를 폐기한다.
-> 그림 상으로는 한꺼번에 폐기되는 것처럼 표현했지만, 실제로는 데이터가 들어올 때마다 이전에 유지하고 있던 데이터가 폐기된다.
@Slf4j
public class Example8_4 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L))
.onBackpressureLatest()
.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2000L);
}
}
onBackpressueLatest(): LATEST 전략을 사용하기 위함.
<결과>
01:09:40.819 [parallel-1] INFO - # onNext: 0
01:09:40.838 [parallel-1] INFO - # onNext: 1
01:09:40.844 [parallel-1] INFO - # onNext: 2
01:09:40.850 [parallel-1] INFO - # onNext: 3
01:09:40.857 [parallel-1] INFO - # onNext: 4
01:09:40.863 [parallel-1] INFO - # onNext: 5
01:09:40.869 [parallel-1] INFO - # onNext: 6
01:09:40.876 [parallel-1] INFO - # onNext: 7
01:09:40.882 [parallel-1] INFO - # onNext: 8
...
01:09:42.499 [parallel-1] INFO - # onNext: 249
01:09:42.506 [parallel-1] INFO - # onNext: 250
01:09:42.512 [parallel-1] INFO - # onNext: 251
01:09:42.518 [parallel-1] INFO - # onNext: 252
01:09:42.524 [parallel-1] INFO - # onNext: 253
01:09:42.530 [parallel-1] INFO - # onNext: 254
01:09:42.538 [parallel-1] INFO - # onNext: 255
01:09:42.547 [parallel-1] INFO - # onNext: 1320 // 버퍼가 비워지는 동안 최근 emit된 데이터
01:09:42.554 [parallel-1] INFO - # onNext: 1321
01:09:42.561 [parallel-1] INFO - # onNext: 1322
01:09:42.567 [parallel-1] INFO - # onNext: 1323
01:09:42.574 [parallel-1] INFO - # onNext: 1324
...
01:09:42.795 [parallel-1] INFO - # onNext: 1361
01:09:42.801 [parallel-1] INFO - # onNext: 1362
01:09:42.807 [parallel-1] INFO - # onNext: 1363
01:09:42.814 [parallel-1] INFO - # onNext: 1364
Subscriber가 숫자 255를 출력하고 곧바로 숫자 1320을 출력한다.
이는 버퍼가 가득 찼다가 버퍼가 다 비워졌을 때 가장 최근에 emit된 데이터가 1320이기 때문이다.
✅ BUFFER 전략
버퍼가 가득 찼을 때, 버퍼의 데이터를 폐기하지 않는 전략, 버퍼의 데이터를 폐기하는 전략, 에러를 발생시키는 전략과 같이 다양하다.
이번에는 버퍼 내 데이터를 폐기하는 전략을 알아보겠다.
1. DROP_LATEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략.
@Slf4j
public class Example8_5 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300L))
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_LATEST)
.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
}
}
onBackpressureBuffer(): BUFFER 전략을 사용하기 위함.
- 첫 번째 파라미터: 버퍼의 최대 용량. 여기서는 2로 설정.
- 두 번째 파라미터: 오버플로가 발생했을 때, Drop되는 데이터를 전달받아 후처리를 할 수 있다.
- 세 번째 파라미터: Backpressure 전략. 여기서는 DROP_LATEST 전략.
publishOn(): prefetch를 1로 설정, 한 번에 1개씩만 데이터를 요청.
<결과>
01:20:21.404 [parallel-2] INFO - # emitted by original Flux: 0 // buffer: [0]
01:20:21.463 [parallel-2] INFO - [ # emitted by Buffer: 0 ] // buffer: []
01:20:21.704 [parallel-2] INFO - # emitted by original Flux: 1 // buffer: [1]
01:20:22.004 [parallel-2] INFO - # emitted by original Flux: 2 // buffer: [2, 1]
01:20:22.301 [parallel-2] INFO - # emitted by original Flux: 3 // buffer: [3, 2, 1]
01:20:22.311 [parallel-2] INFO - ** Overflow & Dropped: 3 ** // buffer: [2, 1]
01:20:22.472 [parallel-1] INFO - # onNext: 0
01:20:22.474 [parallel-1] INFO - [ # emitted by Buffer: 1 ] // buffer: [2]
01:20:22.604 [parallel-2] INFO - # emitted by original Flux: 4 // buffer: [4, 2]
01:20:22.904 [parallel-2] INFO - # emitted by original Flux: 5 // buffer: [5, 4, 2]
01:20:22.905 [parallel-2] INFO - ** Overflow & Dropped: 5 ** // buffer: [4, 2]
01:20:23.203 [parallel-2] INFO - # emitted by original Flux: 6 // buffer: [6, 4, 2]
01:20:23.204 [parallel-2] INFO - ** Overflow & Dropped: 6 ** // buffer: [4, 2]
01:20:23.480 [parallel-1] INFO - # onNext: 1
01:20:23.484 [parallel-1] INFO - [ # emitted by Buffer: 2 ] // buffer: [4]
01:20:23.503 [parallel-2] INFO - # emitted by original Flux: 7 // buffer: [7, 4]
-> 버퍼의 데이터 상태는 주석을 참고.
2. BUFFER DROP_OLDEST 전략
Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 오래된 데이터를 Drop하여 폐기한 후, 확보된 공간에 emit된 데이터를 채우는 전략.
@Slf4j
public class Example8_6 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300L))
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_OLDEST)
.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
}
}
onBackpressureBuffer(): BUFFER 전략을 사용하기 위함
-> 이번엔 DROP_OLDEST 전략 사용.
<결과>
15:11:27.612 [parallel-2] INFO - # emitted by original Flux: 0 // buffer: [0]
15:11:27.676 [parallel-2] INFO - [ # emitted by Buffer: 0 ] // buffer: []
15:11:27.908 [parallel-2] INFO - # emitted by original Flux: 1 // buffer: [1]
15:11:28.206 [parallel-2] INFO - # emitted by original Flux: 2 // buffer: [2, 1]
15:11:28.510 [parallel-2] INFO - # emitted by original Flux: 3 // buffer: [3, 2, 1]
15:11:28.519 [parallel-2] INFO - ** Overflow & Dropped: 1 ** // buffer: [3, 2]
15:11:28.688 [parallel-1] INFO - # onNext: 0
15:11:28.689 [parallel-1] INFO - [ # emitted by Buffer: 2 ] // buffer: [3]
15:11:28.810 [parallel-2] INFO - # emitted by original Flux: 4 // buffer: [4, 3]
15:11:29.106 [parallel-2] INFO - # emitted by original Flux: 5 // buffer: [5, 4, 3]
15:11:29.108 [parallel-2] INFO - ** Overflow & Dropped: 3 ** // buffer: [5, 4]
15:11:29.406 [parallel-2] INFO - # emitted by original Flux: 6 // buffer: [6, 5, 4]
15:11:29.409 [parallel-2] INFO - ** Overflow & Dropped: 4 ** // buffer: [6, 5]
15:11:29.695 [parallel-1] INFO - # onNext: 2
15:11:29.698 [parallel-1] INFO - [ # emitted by Buffer: 5 ] // buffer: [6]
15:11:29.707 [parallel-2] INFO - # emitted by original Flux: 7 // buffer: [7, 6]
-> 버퍼의 데이터 상태는 주석을 참고.
참고자료
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
---|---|
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence (0) | 2024.03.18 |
[리액티브 프로그래밍] 마블 다이어그램(Marble Diagram) (0) | 2024.03.11 |
[리액티브 프로그래밍] Reactor 개요 (0) | 2024.03.11 |