Operator란?
✅ Operator
리액티브 스트림즈에서 연산자 역할.
ex) just(), create(), fromArray(), filter(), map()
-> Reactor에서 가장 중요한 구성요소.
+) Operator를 처음 이해할 때는 마블 다이어그램을 보는 것이 좋은데, 마블 다이어그램에 대한 설명은 다음 글을 참고하자.
https://gksdudrb922.tistory.com/317
Sequence 생성을 위한 Operator
✅ justOrEmpty
just()의 확장 Operator.
just()와 달리, emit할 데이터가 null일 경우, NPE가 발생하지 않고, onComplete Signal을 전송한다.
emit할 데이터가 null이 아닐 경우, Mono를 생성한다.
+) Flux는 just()만 있고, justOrEmpty()가 없다.
@Slf4j
public class Example14_1 {
public static void main(String[] args) {
Mono
.justOrEmpty(null)
.subscribe(data -> {},
error -> {},
() -> log.info("# onComplete"));
}
}
justOrEmpty()에 null을 전달했다.
<결과>
07:40:30.310 [main] INFO - # onComplete
NPE 없이 onComplete Signal을 전송했다.
+) Optional.ofNullable(null), Optional.empty()를 전달해도 onComplete Signal을 전송한다.
✅ fromIterable
Iterable에 포함된 데이터를 emit하는 Flux를 생성한다.
public class SampleData {
public static final List<Tuple2<String, Integer>> coins =
Arrays.asList(
Tuples.of("BTC", 52_000_000),
Tuples.of("ETH", 1_720_000),
Tuples.of("XRP", 533),
Tuples.of("ICX", 2_080),
Tuples.of("EOS", 4_020),
Tuples.of("BCH", 558_000));
}
@Slf4j
public class Example14_2 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.coins)
.subscribe(coin ->
log.info("coin 명: {}, 현재가: {}", coin.getT1(), coin.getT2())
);
}
}
<결과>
07:51:43.200 [main] INFO - coin 명: BTC, 현재가: 52000000
07:51:43.218 [main] INFO - coin 명: ETH, 현재가: 1720000
07:51:43.218 [main] INFO - coin 명: XRP, 현재가: 533
07:51:43.218 [main] INFO - coin 명: ICX, 현재가: 2080
07:51:43.219 [main] INFO - coin 명: EOS, 현재가: 4020
07:51:43.219 [main] INFO - coin 명: BCH, 현재가: 558000
+) 만약 Flux.just(list)를 사용한다면, 하나의 list 전체를 emit하는 Flux를 생성하게 된다.
✅ fromStream
Stream에 포함된 데이터를 emit하는 Flux를 생성한다.
Java Stream 특성상 Stream은 재사용할 수 없으며, cancel, error, complete 시에 자동으로 닫히게 된다.
public class SampleData {
public static final List<String> coinNames =
Arrays.asList("BTC", "ETH", "XRP", "ICX", "EOS", "BCH");
}
@Slf4j
public class Example14_3 {
public static void main(String[] args) {
Flux
.fromStream(() -> SampleData.coinNames.stream())
.filter(coin -> coin.equals("BTC") || coin.equals("ETH"))
.subscribe(data -> log.info("{}", data));
}
}
<결과>
07:57:43.592 [main] INFO - BTC
07:57:43.595 [main] INFO - ETH
✅ range
n부터 1씩 증가한 연속된 수를 m개 emit하는 Flux를 생성한다.
for문처럼 특정 횟수만큼 어떤 작업을 처리하고자 할 경우에 주로 사용된다.
@Slf4j
public class Example14_4 {
public static void main(String[] args) {
Flux
.range(5, 10)
.subscribe(data -> log.info("{}", data));
}
}
<결과>
08:05:53.597 [main] INFO - 5
08:05:53.598 [main] INFO - 6
08:05:53.598 [main] INFO - 7
08:05:53.598 [main] INFO - 8
08:05:53.598 [main] INFO - 9
08:05:53.598 [main] INFO - 10
08:05:53.598 [main] INFO - 11
08:05:53.599 [main] INFO - 12
08:05:53.599 [main] INFO - 13
08:05:53.599 [main] INFO - 14
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
}
@Slf4j
public class Example14_5 {
public static void main(String[] args) {
Flux
.range(7, 5)
.map(idx -> SampleData.btcTopPricesPerYear.get(idx))
.subscribe(tuple -> log.info("{}'s {}", tuple.getT1(), tuple.getT2()));
}
}
<결과>
08:07:41.584 [main] INFO - 2017's 22483583
08:07:41.586 [main] INFO - 2018's 19521543
08:07:41.586 [main] INFO - 2019's 15761568
08:07:41.586 [main] INFO - 2020's 22439002
08:07:41.587 [main] INFO - 2021's 63364000
✅ defer
Operator를 선언한 시점에 데이터를 emit하는 것이 아니라, 구독하는 시점에 데이터를 emit한다.
defer()는 데이터 emit을 지연시키기 때문에 꼭 필요한 시점에 데이터를 emit할 수 있다.
@Slf4j
public class Example14_6 {
public static void main(String[] args) throws InterruptedException {
log.info("# start: {}", LocalDateTime.now());
Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
Mono<LocalDateTime> deferMono = Mono.defer(() ->
Mono.just(LocalDateTime.now()));
Thread.sleep(2000);
justMono.subscribe(data -> log.info("# onNext just1: {}", data));
deferMono.subscribe(data -> log.info("# onNext defer1: {}", data));
Thread.sleep(2000);
justMono.subscribe(data -> log.info("# onNext just2: {}", data));
deferMono.subscribe(data -> log.info("# onNext defer2: {}", data));
}
}
첫 번째 Mono는 just()로 현재 시간을 emit한다.
두 번째 Mono는 defer() & just()로 현재 시간을 emit한다.
2초의 지연 시간을 가진 후에 두 Mono에 대한 첫 번째 구독이 발생한다.
또 2초의 지연 시간을 가진 후에 두 Mono에 대한 두 번째 구독이 발생한다.
<결과>
08:14:56.692 [main] INFO - # onNext just1: 2024-05-13T08:14:54.602106
08:14:56.694 [main] INFO - # onNext defer1: 2024-05-13T08:14:56.694660
08:14:58.704 [main] INFO - # onNext just2: 2024-05-13T08:14:54.602106
08:14:58.707 [main] INFO - # onNext defer2: 2024-05-13T08:14:58.707562
defer() & just()를 사용한 Mono의 경우, 예상했던 것처럼 2초의 간격을 두고 emit되었다.
그런데 just()만 사용한 Mono의 경우 지연 시간가는 상관없이 동일한 시간을 출력한다.
왜 이런 결과가 발생할까?
사실 just()는 Hot Publisher이기 때문에, 구독 여부와 상관없이 데이터를 emit한다.
그리고 구독이 발생하면 emit된 데이터를 다시 reply해서 Subscriber에게 전달하는 것이다.
반대로 defer()는 구독이 발생하기 전까지 emit을 지연시키기 때문에 실제 구독이 발생할 때 데이터를 emit한다.
+) Hot Publisher에 대한 설명은 다음 글을 참고한다.
https://gksdudrb922.tistory.com/318
@Slf4j
public class Example14_7 {
public static void main(String[] args) throws InterruptedException {
log.info("# start: {}", LocalDateTime.now());
Mono
.just("Hello")
.delayElement(Duration.ofSeconds(3))
.switchIfEmpty(sayDefault())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3500);
}
private static Mono<String> sayDefault() {
log.info("# Say Hi");
return Mono.just("Hi");
}
}
switchIfEmpty()는 Upstream Publisher가 데이터를 emit하지 않으면 호출된다.
Upstream Mono가 "Hello"라는 데이터를 emit하기 때문에 switchIfEmpty()는 호출되지 않을 것으로 기대된다.
<결과>
21:23:39.803 [main] INFO - # start: 2024-05-13T21:23:39.801993
21:23:39.875 [main] INFO - # Say Hi
21:23:42.900 [parallel-1] INFO - # onNext: Hello
그러나 실제로 sayDefault()는 호출되었다.
그 이유는, Upstream Mono의 데이터 emit 여부와 다르게, sayDefault()라는 Mono를 선언한 순간 이미 메서드는 호출되기 때문이다.
만약, switchIfEmpty()를 정말 대체용 Mono로 사용하고 싶다면, defer()를 사용하면 된다.
.switchIfEmpty(Mono.defer(() -> sayDefault()))
+) 위 예제와 관련된 자세한 설명은 다음 링크를 참고한다.
https://stackoverflow.com/questions/54373920/mono-switchifempty-is-always-called
✅ using
파라미터로 전달받은 resource를 emit하는 Flux를 생성한다.
- 첫 번째 파라미터: 읽어 올 resource
- 두 번째 파라미터: resource를 emit하는 Flux
- 세 번째 파라미터: onComplete 또는 onError Signal이 발생할 경우, 후처리.
@Slf4j
public class Example14_8 {
public static void main(String[] args) {
Path path = Paths.get("D:\\resources\\using_example.txt");
Flux
.using(() -> Files.lines(path), Flux::fromStream, Stream::close)
.subscribe(log::info);
}
}
- using_example.txt 파일을 한 라인씩 읽어온다. 읽어 온 라인 데이터는 Stream<String> 형태다.
- Stream 객체를 fromStream()으로 emit한다.
- onComplete Signal이 발생하면 Stream을 닫는다.
- 파일에서 라인을 다 읽을 때까지 1 ~3 과정을 반복한다.
<결과>
21:05:05.378 [main] INFO - Hello, world!
21:05:05.379 [main] INFO - Nice to mee you!
21:05:05.380 [main] INFO - Good bye~
✅ generate()
프로그래밍 방식으로 Signal을 발생시키며, 특히 동기적으로 데이터를 순차적으로 emit하고자 할 때 사용한다.
@Slf4j
public class Example14_9 {
public static void main(String[] args) {
Flux
.generate(() -> 0, (state, sink) -> {
sink.next(state);
if (state == 10)
sink.complete();
return ++state;
})
.subscribe(data -> log.info("# onNext: {}", data));
}
}
초깃값은 0, 값이 10일 경우 onComplete Signal을 발생시킨다.
<결과>
21:13:22.228 [main] INFO - # onNext: 0
21:13:22.230 [main] INFO - # onNext: 1
21:13:22.230 [main] INFO - # onNext: 2
21:13:22.230 [main] INFO - # onNext: 3
21:13:22.230 [main] INFO - # onNext: 4
21:13:22.230 [main] INFO - # onNext: 5
21:13:22.230 [main] INFO - # onNext: 6
21:13:22.230 [main] INFO - # onNext: 7
21:13:22.231 [main] INFO - # onNext: 8
21:13:22.231 [main] INFO - # onNext: 9
21:13:22.231 [main] INFO - # onNext: 10
@Slf4j
public class Example14_10 {
public static void main(String[] args) {
final int dan = 3;
Flux
.generate(() -> Tuples.of(dan, 1), (state, sink) -> {
sink.next(state.getT1() + " * " +
state.getT2() + " = " + state.getT1() * state.getT2());
if (state.getT2() == 9)
sink.complete();
return Tuples.of(state.getT1(), state.getT2() + 1);
}, state -> log.info("# 구구단 {}단 종료!", state.getT1()))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
구구단 3단을 출력하는 예제이다.
세 번째 파라미터로 Sequence가 종료될 때, 후처리를 할 수 있다.
<결과>
21:14:50.281 [main] INFO - # onNext: 3 * 1 = 3
21:14:50.283 [main] INFO - # onNext: 3 * 2 = 6
21:14:50.283 [main] INFO - # onNext: 3 * 3 = 9
21:14:50.283 [main] INFO - # onNext: 3 * 4 = 12
21:14:50.283 [main] INFO - # onNext: 3 * 5 = 15
21:14:50.283 [main] INFO - # onNext: 3 * 6 = 18
21:14:50.283 [main] INFO - # onNext: 3 * 7 = 21
21:14:50.283 [main] INFO - # onNext: 3 * 8 = 24
21:14:50.284 [main] INFO - # onNext: 3 * 9 = 27
21:14:50.284 [main] INFO - # 구구단 3단 종료!
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
public static Map<Integer, Tuple2<Integer, Long>> getBtcTopPricesPerYearMap() {
return btcTopPricesPerYear
.stream()
.collect(Collectors.toMap(t1 -> t1.getT1(), t2 -> t2));
}
}
@Slf4j
public class Example14_11 {
public static void main(String[] args) {
Map<Integer, Tuple2<Integer, Long>> map =
SampleData.getBtcTopPricesPerYearMap();
Flux
.generate(() -> 2019, (state, sink) -> {
if (state > 2021) {
sink.complete();
} else {
sink.next(map.get(state));
}
return ++state;
})
.subscribe(data -> log.info("# onNext: {}", data));
}
}
연도별 BTC 최고가를 출력하는 예제다.
<결과>
21:18:42.259 [main] INFO - # onNext: [2019,15761568]
21:18:42.265 [main] INFO - # onNext: [2020,22439002]
21:18:42.265 [main] INFO - # onNext: [2021,63364000]
✅ create
프로그래밍 방식으로 Signal을 발생시킨다.
generate()와 다른 점은 여러 데이터를 비동기적으로 emit할 수 있다.
@Slf4j
public class Example14_12 {
static int SIZE = 0;
static int COUNT = -1;
final static List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
public static void main(String[] args) {
log.info("# start");
Flux.create((FluxSink<Integer> sink) -> {
sink.onRequest(n -> {
try {
Thread.sleep(1000L);
for (int i = 0; i < n; i++) {
if (COUNT >= 9) {
sink.complete();
} else {
COUNT++;
sink.next(DATA_SOURCE.get(COUNT));
}
}
} catch (InterruptedException e) {}
});
sink.onDispose(() -> log.info("# clean up"));
}).subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2);
}
@Override
protected void hookOnNext(Integer value) {
SIZE++;
log.info("# onNext: {}", value);
if (SIZE == 2) {
request(2);
SIZE = 0;
}
}
@Override
protected void hookOnComplete() {
log.info("# onComplete");
}
});
}
}
- 구독이 발생하면 BaseSubscriber의 hookOnSubscribe()에서 request(2)를 호출하여 한 번에 두 개의 데이터를 요청한다.
- create()에서 sink.onReqeust()가 실행된다.
- 요청한 개수만큼 데이터를 emit한다.
- BaseSubscriber의 hookOnNext()에서 emit된 데이터를 처리하고, 다시 requets(2)를 호출한다.
- 2 ~ 4 과정이 반복되다가 onComplete Signal을 발생시킨다.
- BaseSubscriber의 hookOncomplete()를 호출한다.
- sink.onDispose()를 호출한다.
<결과>
21:23:01.329 [main] INFO - # start
21:23:02.448 [main] INFO - # onNext: 1
21:23:02.457 [main] INFO - # onNext: 2
21:23:03.463 [main] INFO - # onNext: 3
21:23:03.465 [main] INFO - # onNext: 4
21:23:04.471 [main] INFO - # onNext: 5
21:23:04.474 [main] INFO - # onNext: 6
21:23:05.481 [main] INFO - # onNext: 7
21:23:05.483 [main] INFO - # onNext: 8
21:23:06.487 [main] INFO - # onNext: 9
21:23:06.489 [main] INFO - # onNext: 10
21:23:07.491 [main] INFO - # onComplete
21:23:07.499 [main] INFO - # clean up
+) BaseSubscriber에 대한 설명은 다음 글을 참고한다.
https://gksdudrb922.tistory.com/319
위 예제는 Subscriber가 request()를 호출하면 Publisher가 요청 개수만큼 데이터를 emit한다.
이제 Subscriber의 요청과 상관없이 비동기적으로 데이터를 emit하는 방식을 살펴보자.
public class SampleData {
public static final List<Integer> btcPrices =
Arrays.asList(50_000_000, 50_100_000, 50_700_000, 51_500_000, 52_000_000);
}
public class CryptoCurrencyPriceEmitter {
private CryptoCurrencyPriceListener listener;
public void setListener(CryptoCurrencyPriceListener listener) {
this.listener = listener;
}
public void flowInto() {
listener.onPrice(SampleData.btcPrices);
}
public void complete() {
listener.onComplete();
}
}
public interface CryptoCurrencyPriceListener {
void onPrice(List<Integer> priceList);
void onComplete();
}
@Slf4j
public class Example14_13 {
public static void main(String[] args) throws InterruptedException {
CryptoCurrencyPriceEmitter priceEmitter = new CryptoCurrencyPriceEmitter();
Flux.create((FluxSink<Integer> sink) ->
priceEmitter.setListener(new CryptoCurrencyPriceListener() {
@Override
public void onPrice(List<Integer> priceList) {
priceList.stream().forEach(price -> {
sink.next(price);
});
}
@Override
public void onComplete() {
sink.complete();
}
}))
.publishOn(Schedulers.parallel())
.subscribe(
data -> log.info("# onNext: {}", data),
error -> {},
() -> log.info("# onComplete"));
Thread.sleep(3000L);
priceEmitter.flowInto();
Thread.sleep(2000L);
priceEmitter.complete();
}
}
Subscriber가 요청을 보내는 것이 아닌, 데이터를 리스닝하고 있다가, flowInto()와 같은 트리거가 발생하면 데이터를 emit한다.
<결과>
21:47:01.849 [parallel-1] INFO - # onNext: 50000000
21:47:01.859 [parallel-1] INFO - # onNext: 50100000
21:47:01.859 [parallel-1] INFO - # onNext: 50700000
21:47:01.859 [parallel-1] INFO - # onNext: 51500000
21:47:01.860 [parallel-1] INFO - # onNext: 52000000
21:47:03.860 [parallel-1] INFO - # onComplete
다음은 Backpressure 전략을 적용해본다.
@Slf4j
public class Example14_14 {
static int start = 1;
static int end = 4;
public static void main(String[] args) throws InterruptedException {
Flux.create((FluxSink<Integer> emitter) -> {
emitter.onRequest(n -> {
log.info("# requested: " + n);
try {
Thread.sleep(500L);
for (int i = start; i <= end; i++) {
emitter.next(i);
}
start += 4;
end += 4;
} catch (InterruptedException e) {}
});
emitter.onDispose(() -> {
log.info("# clean up");
});
}, FluxSink.OverflowStrategy.DROP)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel(), 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
Backpressure DROP 전략을 사용했다.
+) Backpressure에 대한 설명은 다음 글을 참고하자.
https://gksdudrb922.tistory.com/319
한 번에 4개의 데이터를 emit하는데, publishOn() 스케줄러의 prefetch 수를 2개로 지정했다. (2개는 DROP될 것으로 예상)
<결과>
21:46:28.921 [boundedElastic-1] INFO - # requested: 2
21:46:29.435 [parallel-1] INFO - # onNext: 1
21:46:29.446 [parallel-1] INFO - # onNext: 2
21:46:29.451 [boundedElastic-1] INFO - # requested: 2
21:46:29.959 [parallel-1] INFO - # onNext: 5
21:46:29.960 [parallel-1] INFO - # onNext: 6
21:46:29.960 [boundedElastic-1] INFO - # requested: 2
21:46:30.467 [parallel-1] INFO - # onNext: 9
21:46:30.468 [parallel-1] INFO - # onNext: 10
21:46:30.469 [boundedElastic-1] INFO - # requested: 2
21:46:30.975 [parallel-1] INFO - # onNext: 13
21:46:30.979 [parallel-1] INFO - # onNext: 14
21:46:30.983 [boundedElastic-1] INFO - # requested: 2
21:46:31.490 [parallel-1] INFO - # onNext: 17
21:46:31.493 [parallel-1] INFO - # onNext: 18
21:46:31.495 [boundedElastic-1] INFO - # requested: 2
Sequence 필터링 Operator
✅ filter
Upstream에서 emit된 데이터 중에서 조건에 일치하는 데이터만 Downstream으로 emit한다.
@Slf4j
public class Example14_15 {
public static void main(String[] args) {
Flux
.range(1, 20)
.filter(num -> num % 2 != 0)
.subscribe(data -> log.info("# onNext: {}", data));
}
}
<결과>
07:29:57.598 [main] INFO - # onNext: 1
07:29:57.599 [main] INFO - # onNext: 3
07:29:57.599 [main] INFO - # onNext: 5
07:29:57.599 [main] INFO - # onNext: 7
07:29:57.600 [main] INFO - # onNext: 9
07:29:57.600 [main] INFO - # onNext: 11
07:29:57.600 [main] INFO - # onNext: 13
07:29:57.600 [main] INFO - # onNext: 15
07:29:57.600 [main] INFO - # onNext: 17
07:29:57.601 [main] INFO - # onNext: 19
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
}
@Slf4j
public class Example14_16 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.filter(tuple -> tuple.getT2() > 20_000_000)
.subscribe(data -> log.info(data.getT1() + ":" + data.getT2()));
}
}
<결과>
07:30:57.132 [main] INFO - 2017:22483583
07:30:57.134 [main] INFO - 2020:22439002
07:30:57.135 [main] INFO - 2021:63364000
public class SampleData {
public enum CovidVaccine {
Pfizer,
AstraZeneca,
Moderna,
Janssen,
Novavax;
public static List<CovidVaccine> toList() {
return Arrays.asList(
CovidVaccine.Pfizer,
CovidVaccine.AstraZeneca,
CovidVaccine.Moderna,
CovidVaccine.Janssen,
CovidVaccine.Novavax
);
}
}
public static final List<Tuple2<CovidVaccine, Integer>> coronaVaccines =
Arrays.asList(
Tuples.of(CovidVaccine.Pfizer, 1_000_000),
Tuples.of(CovidVaccine.AstraZeneca, 3_000_000),
Tuples.of(CovidVaccine.Moderna, 4_000_000),
Tuples.of(CovidVaccine.Janssen, 2_000_000),
Tuples.of(CovidVaccine.Novavax, 2_500_000)
);
public static Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> getCovidVaccines() {
return coronaVaccines
.stream()
.collect(Collectors.toMap(t1 -> t1.getT1(), t2 -> t2));
}
public static final List<CovidVaccine> coronaVaccineNames = CovidVaccine.toList();
}
@Slf4j
public class Example14_17 {
public static void main(String[] args) throws InterruptedException {
Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
getCovidVaccines();
Flux
.fromIterable(SampleData.coronaVaccineNames)
.filterWhen(vaccine -> Mono
.just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
.publishOn(Schedulers.parallel()))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(1000);
}
}
filterWhen()은 비동기적으로 필터링을 수행한다.
<결과>
07:34:54.822 [parallel-2] INFO - # onNext: AstraZeneca
07:34:54.839 [parallel-3] INFO - # onNext: Moderna
✅ skip
Upstream에서 emit된 데이터 중에서 파라미터로 입력받은 숫자만큼 건너뛴 후, 나머지 데이터를 Downstream으로 emit한다.
@Slf4j
public class Example14_18 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.skip(2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(5500L);
}
}
interval()은 0부터 1초에 한 번씩 1 증가한 숫자를 emit한다.
<결과>
07:38:28.115 [parallel-1] INFO - # onNext: 2
07:38:29.114 [parallel-1] INFO - # onNext: 3
07:38:30.111 [parallel-1] INFO - # onNext: 4
skip()은 0과 1을 건너뛰고 2부터 Downstream으로 emit한다.
skip()의 파라미터로 시간을 지정하면, 지정한 시간 내에 emit된 데이터를 건너뛴 후, 나머지 데이터를 emit한다.
@Slf4j
public class Example14_19 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300))
.skip(Duration.ofSeconds(1))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(2000L);
}
}
<결과>
07:41:26.814 [parallel-2] INFO - # onNext: 3
07:41:27.115 [parallel-2] INFO - # onNext: 4
07:41:27.413 [parallel-2] INFO - # onNext: 5
1초 시간 내에 emit되는 데이터는 건너뛴다.
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
}
@Slf4j
public class Example14_20 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.filter(tuple -> tuple.getT2() >= 20_000_000)
.skip(2)
.subscribe(tuple -> log.info("{}, {}", tuple.getT1(), tuple.getT2()));
}
}
<결과>
07:42:52.577 [main] INFO - 2021, 63364000
✅ take
Upstream에서 emit된 데이터 중에서 파라미터로 입력받은 숫자만큼만, 데이터를 Downstream으로 emit한다. (skip의 반대)
@Slf4j
public class Example14_21 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.take(3)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(4000L);
}
}
<결과>
07:44:47.373 [parallel-1] INFO - # onNext: 0
07:44:48.362 [parallel-1] INFO - # onNext: 1
07:44:49.362 [parallel-1] INFO - # onNext: 2
tkae()의 파라미터로 시간을 지정하면, 지정한 시간 내에 emit된 데이터만 emit한다.
@Slf4j
public class Example14_22 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.take(Duration.ofMillis(2500))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
<결과>
07:46:33.923 [parallel-2] INFO - # onNext: 0
07:46:34.912 [parallel-2] INFO - # onNext: 1
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
}
@Slf4j
public class Example14_23 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeLast(2)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
takeLast()는 파라미터로 입력한 개수만큼 가장 마지막에 emit된 데이터를 Downstream으로 emit한다.
<결과>
07:47:56.375 [main] INFO - # onNext: 2020, 22439002
07:47:56.378 [main] INFO - # onNext: 2021, 63364000
@Slf4j
public class Example14_24 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeUntil(tuple -> tuple.getT2() > 20_000_000)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
takeUntil()은 파라미터로 입력한 Predicate가 true가 될 때까지 Downstream으로 emit한다.
<결과>
07:49:59.592 [main] INFO - # onNext: 2010, 565
07:49:59.600 [main] INFO - # onNext: 2011, 36094
07:49:59.601 [main] INFO - # onNext: 2012, 17425
07:49:59.601 [main] INFO - # onNext: 2013, 1405209
07:49:59.601 [main] INFO - # onNext: 2014, 1237182
07:49:59.601 [main] INFO - # onNext: 2015, 557603
07:49:59.601 [main] INFO - # onNext: 2016, 1111811
07:49:59.601 [main] INFO - # onNext: 2017, 22483583
Predicate를 평가할 때 사용한 데이터 22463583까지 포함된다.
@Slf4j
public class Example14_25 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeWhile(tuple -> tuple.getT2() < 20_000_000)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
takeWhile()은 파라미터로 입력한 Predicate가 true가 되는 동안에만 Downstream으로 emit한다. (takeUntil의 반대)
<결과>
07:51:25.625 [main] INFO - # onNext: 2010, 565
07:51:25.640 [main] INFO - # onNext: 2011, 36094
07:51:25.640 [main] INFO - # onNext: 2012, 17425
07:51:25.640 [main] INFO - # onNext: 2013, 1405209
07:51:25.640 [main] INFO - # onNext: 2014, 1237182
07:51:25.640 [main] INFO - # onNext: 2015, 557603
07:51:25.640 [main] INFO - # onNext: 2016, 1111811
takeUntil()과 달리, Predicate를 평가할 때 사용한 데이터 22463583까지는 포함되지 않는다.
✅ next
Upstream에서 emit되는 데이터 중에서 첫 번째 데이터만 Downstream으로 emit한다.
만약 Upstream에서 empty를 emit했다면, Downstream으로 empty Mono를 emit한다.
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
}
@Slf4j
public class Example14_26 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.next()
.subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2()));
}
}
<결과>
07:54:20.520 [main] INFO - # onNext: 2010, 565
Sequence 변환 Operator
✅ map
Upstream에서 emit되는 데이터를 mapper Fuction을 사용하여 변환한 후, Downstream으로 emit한다.
@Slf4j
public class Example14_27 {
public static void main(String[] args) {
Flux
.just("1-Circle", "3-Circle", "5-Circle")
.map(circle -> circle.replace("Circle", "Rectangle"))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
<결과>
07:31:36.043 [main] INFO - # onNext: 1-Rectangle
07:31:36.045 [main] INFO - # onNext: 3-Rectangle
07:31:36.045 [main] INFO - # onNext: 5-Rectangle
public class SampleData {
public static final List<Tuple2<Integer, Long>> btcTopPricesPerYear =
Arrays.asList(
Tuples.of(2010, 565L),
Tuples.of(2011, 36_094L),
Tuples.of(2012, 17_425L),
Tuples.of(2013, 1_405_209L),
Tuples.of(2014, 1_237_182L),
Tuples.of(2015, 557_603L),
Tuples.of(2016, 1_111_811L),
Tuples.of(2017, 22_483_583L),
Tuples.of(2018, 19_521_543L),
Tuples.of(2019, 15_761_568L),
Tuples.of(2020, 22_439_002L),
Tuples.of(2021, 63_364_000L)
);
}
@Slf4j
public class Example14_28 {
public static void main(String[] args) {
final double buyPrice = 50_000_000;
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.filter(tuple -> tuple.getT1() == 2021)
.doOnNext(data -> log.info("# doOnNext: {}", data))
.map(tuple -> calculateProfitRate(buyPrice, tuple.getT2()))
.subscribe(data -> log.info("# onNext: {}%", data));
}
private static double calculateProfitRate(final double buyPrice, Long topPrice) {
return (topPrice - buyPrice) / buyPrice * 100;
}
}
<결과>
07:33:07.970 [main] INFO - # doOnNext: [2021,63364000]
07:33:07.973 [main] INFO - # onNext: 26.728%
✅ flatMap
Upstream에서 emit되는 데이터를 Inner Sequence를 통해 여러 건의 데이터로 변환할 수 있다.
@Slf4j
public class Example14_29 {
public static void main(String[] args) {
Flux
.just("Good", "Bad")
.flatMap(feeling -> Flux
.just("Morning", "Afternoon", "Evening")
.map(time -> feeling + " " + time))
.subscribe(log::info);
}
}
<결과>
07:37:09.202 [main] INFO - Good Morning
07:37:09.204 [main] INFO - Good Afternoon
07:37:09.204 [main] INFO - Good Evening
07:37:09.204 [main] INFO - Bad Morning
07:37:09.204 [main] INFO - Bad Afternoon
07:37:09.204 [main] INFO - Bad Evening
@Slf4j
public class Example14_30 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(2, 8)
.flatMap(dan -> Flux
.range(1, 9)
.publishOn(Schedulers.parallel())
.map(n -> dan + " * " + n + " = " + dan * n))
.subscribe(log::info);
Thread.sleep(100L);
}
}
<결과>
07:38:44.190 [parallel-1] INFO - 2 * 1 = 2
07:38:44.216 [parallel-1] INFO - 2 * 2 = 4
...
07:38:44.218 [parallel-1] INFO - 5 * 1 = 5
07:38:44.223 [parallel-1] INFO - 5 * 2 = 10
...
07:38:44.226 [parallel-1] INFO - 6 * 1 = 6
07:38:44.226 [parallel-1] INFO - 6 * 2 = 12
...
07:38:44.231 [parallel-1] INFO - 9 * 1 = 9
07:38:44.231 [parallel-1] INFO - 9 * 2 = 18
...
07:38:44.231 [parallel-1] INFO - 3 * 1 = 3
07:38:44.232 [parallel-1] INFO - 3 * 2 = 6
...
07:38:44.235 [parallel-1] INFO - 4 * 8 = 32
07:38:44.235 [parallel-1] INFO - 4 * 9 = 36
-> flatMap()은 Inner Sequence를 비동기적으로 실행하면 데이터 emit의 순서를 보장하지 않는다.
✅ concat
파라미터로 입력되는 Publisher의 Sequence를 연결해서 데이터를 순차적으로 emit한다.
-> 먼저 입력된 Publisher의 Seuquence가 종료될 때까지 나머지 Publisher의 Sequence는 subscribe되지 않고 대기한다.
@Slf4j
public class Example14_31 {
public static void main(String[] args) {
Flux
.concat(Flux.just(1, 2, 3), Flux.just(4, 5))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
<결과>
07:50:04.461 [main] INFO - # onNext: 1
07:50:04.462 [main] INFO - # onNext: 2
07:50:04.463 [main] INFO - # onNext: 3
07:50:04.463 [main] INFO - # onNext: 4
07:50:04.463 [main] INFO - # onNext: 5
public class SampleData {
public enum CovidVaccine {
Pfizer,
AstraZeneca,
Moderna,
Janssen,
Novavax;
}
public static final List<Tuple2<CovidVaccine, Integer>> viralVectorVaccines =
Arrays.asList(
Tuples.of(CovidVaccine.AstraZeneca, 3_000_000),
Tuples.of(CovidVaccine.Janssen, 2_000_000)
);
public static final List<Tuple2<CovidVaccine, Integer>> mRNAVaccines =
Arrays.asList(
Tuples.of(CovidVaccine.Pfizer, 1_000_000),
Tuples.of(CovidVaccine.Moderna, 4_000_000)
);
public static final List<Tuple2<CovidVaccine, Integer>> subunitVaccines =
Arrays.asList(
Tuples.of(CovidVaccine.Novavax, 2_500_000)
);
}
@Slf4j
public class Example14_32 {
public static void main(String[] args) {
Flux
.concat(
Flux.fromIterable(getViralVector()),
Flux.fromIterable(getMRNA()),
Flux.fromIterable(getSubunit()))
.subscribe(data -> log.info("# onNext: {}", data));
}
private static List<Tuple2<SampleData.CovidVaccine, Integer>> getViralVector() {
return SampleData.viralVectorVaccines;
}
private static List<Tuple2<SampleData.CovidVaccine, Integer>> getMRNA() {
return SampleData.mRNAVaccines;
}
private static List<Tuple2<SampleData.CovidVaccine, Integer>> getSubunit() {
return SampleData.subunitVaccines;
}
}
<결과>
07:51:44.107 [main] INFO - # onNext: [AstraZeneca,3000000]
07:51:44.110 [main] INFO - # onNext: [Janssen,2000000]
07:51:44.110 [main] INFO - # onNext: [Pfizer,1000000]
07:51:44.110 [main] INFO - # onNext: [Moderna,4000000]
07:51:44.110 [main] INFO - # onNext: [Novavax,2500000]
✅ merge
파라미터로 입력되는 Publisher의 Sequence를 인터리빙 방식으로 연결한다.
-> 모든 Publisher의 Sequence가 즉시 subscribe된다.
Q: 인터리빙 방식?
A: 교차로 배치한다는 의미. merge()에서는 emit된 시간 순서대로 merge한다.
@Slf4j
public class Example14_33 {
public static void main(String[] args) throws InterruptedException {
Flux
.merge(
Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)),
Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L))
)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(2000L);
}
}
<결과>
07:55:45.411 [parallel-1] INFO - # onNext: 1
07:55:45.605 [parallel-2] INFO - # onNext: 5
07:55:45.744 [parallel-3] INFO - # onNext: 2
07:55:46.050 [parallel-5] INFO - # onNext: 3
07:55:46.112 [parallel-4] INFO - # onNext: 6
07:55:46.362 [parallel-6] INFO - # onNext: 4
07:55:46.620 [parallel-7] INFO - # onNext: 7
public class SampleData {
public static Map<String, Mono<String>> nppMap = new HashMap<>();
static {
nppMap.put("Ontario",
Mono.just("Ontario Done").delayElement(Duration.ofMillis(1500L)));
nppMap.put("Vermont",
Mono.just("Vermont Done").delayElement(Duration.ofMillis(400L)));
nppMap.put("New Hampshire",
Mono.just("New Hampshire Done").delayElement(Duration.ofMillis(700L)));
nppMap.put("New Jersey",
Mono.just("New Jersey Done").delayElement(Duration.ofMillis(500L)));
nppMap.put("Ohio",
Mono.just("Ohio Done").delayElement(Duration.ofMillis(1000L)));
nppMap.put("Michigan",
Mono.just("Michigan Done").delayElement(Duration.ofMillis(200L)));
nppMap.put("Illinois",
Mono.just("Illinois Done").delayElement(Duration.ofMillis(300L)));
nppMap.put("Virginia",
Mono.just("Virginia Done").delayElement(Duration.ofMillis(600L)));
nppMap.put("North Carolina",
Mono.just("North Carolina Done").delayElement(Duration.ofMillis(800L)));
nppMap.put("Georgia",
Mono.just("Georgia Done").delayElement(Duration.ofMillis(900L)));
}
}
@Slf4j
public class Example14_34 {
public static void main(String[] args) throws InterruptedException {
String[] usaStates = {
"Ohio", "Michigan", "New Jersey", "Illinois", "New Hampshire",
"Virginia", "Vermont", "North Carolina", "Ontario", "Georgia"
};
Flux
.merge(getMeltDownRecoveryMessage(usaStates))
.subscribe(log::info);
Thread.sleep(2000L);
}
private static List<Mono<String>> getMeltDownRecoveryMessage(String[] usaStates) {
List<Mono<String>> messages = new ArrayList<>();
for (String state : usaStates) {
messages.add(SampleData.nppMap.get(state));
}
return messages;
}
}
<결과>
08:00:02.346 [parallel-2] INFO - Michigan Done
08:00:02.438 [parallel-4] INFO - Illinois Done
08:00:02.539 [parallel-7] INFO - Vermont Done
08:00:02.640 [parallel-3] INFO - New Jersey Done
08:00:02.739 [parallel-6] INFO - Virginia Done
08:00:02.839 [parallel-5] INFO - New Hampshire Done
08:00:02.939 [parallel-8] INFO - North Carolina Done
08:00:03.040 [parallel-10] INFO - Georgia Done
08:00:03.138 [parallel-1] INFO - Ohio Done
08:00:03.640 [parallel-9] INFO - Ontario Done
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Testing (0) | 2024.05.08 |
---|---|
[리액티브 프로그래밍] Debugging (0) | 2024.05.06 |
[리액티브 프로그래밍] Context (0) | 2024.05.06 |
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |