시작하기에 앞서, reactor-test 모듈의 기능을 사용하기 위해, build.gradle 파일에 다음 의존성을 추가한다.
dependencies {
testImplementation 'io.projectreactor:reactor-test'
}
StepVerifier를 사용한 테스팅
Reactor에서 가장 일반적인 테스팅 방식은,
구독 시점에 Operator 체인이 시나리오 대로 동작하는지를 테스트하는 것이다.
ex)
- 다음에 발생할 Signal이 무엇인지
- 기대하던 데이터들이 emit되었는지
- 특정 시간 동안 emit된 데이터가 있는지
Reactor에서는 Operator 체인의 다양한 동작 방식을 테스트하기 위해 StepVerifier라는 API를 제공한다.
✅ Signal 이벤트 테스트
public class ExampleTest13_1 {
@Test
public void sayHelloReactorTest() {
StepVerifier
.create(Mono.just("Hello Reactor")) // 테스트 대상 Sequence 생성
.expectNext("Hello Reactor") // emit 된 데이터 검증
.expectComplete() // onComplete Signal 검증
.verify(); // 검증 실행.
}
}
- create()을 통해 테스트 대상 Sequence를 생성한다.
- expectXXXX()를 통해 Sequence에서 예상되는 Signal의 기댓값을 평가한다.
- verify()를 호출함으로써 전체 Operator 체인의 테스트를 트리거한다.
<expectXXXX() 메서드>
메서드 | 설명 |
expectSubscription() | 구독이 이루어짐을 기대한다. |
expectNext(T t) | onNext Signal을 통해 전달되는 값이 파라미터로 전달된 값과 같음을 기대한다. |
expectComplete() | onComplete Signal이 전송되기를 기대한다. |
expectError() | onError Signal이 전송되기를 기대한다. |
expectNextCount(long count) | 구독 시점 또는 이전(previous) expectNext()를 통해 기댓값이 평가된 데이터 이후부터 emit된 수를 기대한다. |
expectNoEvent(Duration duration) | 주어진 시간 동안 Signal 이벤트가 발생하지 않았음을 기대한다. |
expectAccessibleContext() | 구독 시점 이후에 Context가 전파되었음을 기대한다. |
expectNextSequence(Iterable<? extends T> iterable) | emit된 데이터들이 파라미터로 전달된 iterable의 요소와 매치됨을 기대한다. |
<verifyXXXX() 메서드>
메서드 | 설명 |
verify() | 검증을 트리거한다. |
verifyComplete() | 검증을 트리거하고, onComplete Signal을 기대한다. |
verifyError() | 검증을 트리거하고, onError Signal을 기대한다. |
verifyTimeout(Duration duration) | 검증을 트리거하고, 주어진 시간이 초과되어도 Publisher가 종료되지 않음을 기대한다. |
다음은 테스트가 필요한 대상 클래스이다.
public class GeneralTestExample {
public static Flux<String> sayHello() {
return Flux
.just("Hello", "Reactor");
}
public static Flux<Integer> divideByTwo(Flux<Integer> source) {
return source
.zipWith(Flux.just(2, 2, 2, 2, 0), (x, y) -> x/y);
}
public static Flux<Integer> takeNumber(Flux<Integer> source, long n) {
return source
.take(n);
}
}
public class ExampleTest13_3 {
@Test
public void sayHelloTest() {
StepVerifier
.create(GeneralTestExample.sayHello())
.expectSubscription()
.as("# expect subscription")
.expectNext("Hi")
.as("# expect Hi")
.expectNext("Reactor")
.as("# expect Reactor")
.verifyComplete();
}
}
as() 메서드는 이전 기댓값 평가 단계에 대한 설명을 추가할 수 있다.
-> 만약 테스트에 실패하게 되면 실패한 단계에 해당하는 설명이 로그로 출력된다.
<결과>
java.lang.AssertionError: expectation "# expect Hi" failed (expected value: Hi; actual value: Hello)
첫 번째로 emit된 데이터가 'Hi'라고 기대했는데, 실제 emit된 값은 'Hello'이기 때문에 테스트 결과는 fail이다.
public class ExampleTest13_4 {
@Test
public void divideByTwoTest() {
Flux<Integer> source = Flux.just(2, 4, 6, 8, 10);
StepVerifier
.create(GeneralTestExample.divideByTwo(source))
.expectSubscription()
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectError()
.verify();
}
}
divideByTwo는 5번째 emit되는 데이터에서 0으로 나누는 ArithmeticException이 발생하기 때문에, 위 테스트는 통과한다.
+) expectNext(1) ~ expectNext(4)를 expectNext(1, 2, 3, 4)로 변경할 수 있다.
public class ExampleTest13_5 {
@Test
public void takeNumberTest() {
Flux<Integer> source = Flux.range(0, 1000);
StepVerifier
.create(GeneralTestExample.takeNumber(source, 500),
StepVerifierOptions.create().scenarioName("Verify from 0 to 499"))
.expectSubscription()
.expectNext(0)
.expectNextCount(498)
.expectNext(500)
.expectComplete()
.verify();
}
}
takeNumber() 메서드는 Source Flux에서 파라미터로 전달된 숫자의 개수만큼만 데이터를 emit한다.
StepVerifierOptions는 StepVerifier에 옵션을 추가하는 클래스인데, 예제에서는 테스트에 실패할 경우 출력할 시나리오명을 입력했다.
<결과>
java.lang.AssertionError: [Verify from 0 to 499] expectation "expectNext(500)" failed (expected value: 500; actual value: 499)
처음 expcetNext()로 0이 emit되었음을 기대한다 -> 성공.
expectNextCount()로 498개의 데이터가 emit되었음을 기대한다 -> 성공.
마지막 expectNext()로 500이 emit되었음을 기대한다 -> 실패.
Q: 왜 실패하지?
A: 총 500개 데이터가 emit되는데, 처음에 0이 emit되고, 이후에 498개가 emit되면 총 0~498까지 emit된 것이다. 나머지 하나의 데이터는 500이 아닌, 499이다.
✅ 시간 기반(Time-based) 테스트
가상의 시간을 이용해 미래에 실행되는 Reactor Sequence의 시간을 앞당겨 테스트할 수 있는 기능을 지원한다.
다음은 테스트 대상 클래스이다.
public class TimeBasedTestExample {
public static Flux<Tuple2<String, Integer>> getCOVID19Count(Flux<Long> source) {
return source
.flatMap(notUse -> Flux.just(
Tuples.of("서울", 10),
Tuples.of("경기도", 5),
Tuples.of("강원도", 3),
Tuples.of("충청도", 6),
Tuples.of("경상도", 5),
Tuples.of("전라도", 8),
Tuples.of("인천", 2),
Tuples.of("대전", 1),
Tuples.of("대구", 2),
Tuples.of("부산", 3),
Tuples.of("제주도", 0)
)
);
}
public static Flux<Tuple2<String, Integer>> getVoteCount(Flux<Long> source) {
return source
.zipWith(Flux.just(
Tuples.of("중구", 15400),
Tuples.of("서초구", 20020),
Tuples.of("강서구", 32040),
Tuples.of("강동구", 14506),
Tuples.of("서대문구", 35650)
)
)
.map(Tuple2::getT2);
}
}
Q: Tuples가 뭐지?
A: 서로 다른 타입의 데이터를 저장할 수 있는 Reactor에서 제공하는 Collection으로서 총 8개의 데이터를 저장할 수 있다. 참고로 Tuple2는 두 개의 데이터를 저장할 수 있다.
public class ExampleTest13_7 {
@Test
public void getCOVID19CountTest() {
StepVerifier
.withVirtualTime(() -> TimeBasedTestExample.getCOVID19Count(
Flux.interval(Duration.ofHours(1)).take(1)
)
)
.expectSubscription()
.then(() -> VirtualTimeScheduler
.get()
.advanceTimeBy(Duration.ofHours(1)))
.expectNextCount(11)
.expectComplete()
.verify();
}
}
withVirtualTime() 메서드는 VirtualTimeScheduler라는 가상 스케줄러의 제어를 받도록 해준다.
먼저 getCOVID19Count() 메서드에 1시간 뒤에 동작하는 Flux를 인자로 받는다.
실제로는 1시간을 기다려야 정상 동작하는지 확인할 수 있지만,
VirtualTimeScheduler의 advanceTimeBy()를 이용해 시간을 1시간 뒤의 미래로 이동할 수 있다.
따라서 테스트는 바로 수행되고, expectNextCount(11)을 통해 총 11개의 데이터가 emit된 것을 검증할 수 있다.
public class ExampleTest13_8 {
@Test
public void getCOVID19CountTest() {
StepVerifier
.create(TimeBasedTestExample.getCOVID19Count(
Flux.interval(Duration.ofMinutes(1)).take(1)
)
)
.expectSubscription()
.expectNextCount(11)
.expectComplete()
.verify(Duration.ofSeconds(3));
}
}
지정한 시간 내에 테스트 대상 메서드의 작업이 종료되는지를 확인하낟.
-> verify() 메서드에 3초의 시간을 지정했다. 이는 3초 내에 테스트가 끝나지 않으면 시간 초과로 간주하겠다는 의미다.
기댓값은 3초 이내지만, 1분 뒤에 데이터를 emit하기로 설정했기 때문에, 테스트는 실패한다.
<결과>
java.lang.AssertionError: VerifySubscriber timed out on reactor.core.publisher.FluxFlatMap$FlatMapMain@5039eae4
public class ExampleTest13_9 {
@Test
public void getVoteCountTest() {
StepVerifier
.withVirtualTime(() -> TimeBasedTestExample.getVoteCount(
Flux.interval(Duration.ofMinutes(1))
)
)
.expectSubscription()
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNextCount(5)
.expectComplete()
.verify();
}
}
코드는 getVoteCount()를 통해 1분에 한 번씩 한 개 구의 투표 현황을 확인하는 테스트이다.
expectNoEvent()는 1분 동안 Signal이 발생하지 않을 것이라고 기대한다.
얼핏 보면 테스트는 성공할 것 같지만, 테스트를 한 번 하기 위해 5분을 기다려야 되나 싶다.
그런, withVirtualTime() 메서드를 사용하면, expectNoEvent()의 시간동안 어떤 이벤트도 발생하지 않을 것이라고 기대하는 동시에 지정한 시간 만큼 시간을 이동한다.
결과적으로 다섯 번의 expectNoEvent()를 호출함으로써 총 5분의 시간을 이동하고, 테스트를 바로 종료할 수 있다.
✅ Backpressure 테스트
Backpressure를 테스트하기 위한 대상 클래스이다.
public class BackpressureTestExample {
public static Flux<Integer> generateNumber() {
return Flux
.create(emitter -> {
for (int i = 1; i <= 100; i++) {
emitter.next(i);
}
emitter.complete();
}, FluxSink.OverflowStrategy.ERROR);
}
}
프로그래밍 방식으로 100개의 숫자를 emit하고 있으며, Backpressure 전략으로 ERROR 전략을 지정했기 때문에 오버플로가 발생하면 OverflowException이 발생할 것이다.
public class ExampleTest13_11 {
@Test
public void generateNumberTest() {
StepVerifier
.create(BackpressureTestExample.generateNumber(), 1L)
.thenConsumeWhile(num -> num >= 1)
.verifyComplete();
}
}
create() 메서드에서 두 번째 파라미터는 1개의 데이터를 요청한다는 뜻이다.
그러나 generateNumber()에서는 100개의 데이터를 emit하기 때문에 오버플로가 발생한다.
+) thenConsumeWhile()은 해당 조건을 만족하는 동안 데이터를 계속 소비한다. 즉, 1부터 100까지 전부 소비할 것이다.
<결과>
21:14:46.911 [Test worker] DEBUG- onNextDropped: 3
21:14:46.911 [Test worker] DEBUG- onNextDropped: 4
21:14:46.912 [Test worker] DEBUG- onNextDropped: 5
21:14:46.912 [Test worker] DEBUG- onNextDropped: 6
21:14:46.913 [Test worker] DEBUG- onNextDropped: 7
...
21:14:46.943 [Test worker] DEBUG- onNextDropped: 97
21:14:46.943 [Test worker] DEBUG- onNextDropped: 98
21:14:46.943 [Test worker] DEBUG- onNextDropped: 99
21:14:46.943 [Test worker] DEBUG- onNextDropped: 100
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onError(reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)))
Q: 왜 데이터가 드롭됐다는 onNextDropped가 3부터 찍힐까?
A: 검색해도 잘 안나와서, 내가 추측을 해보면, 테스트에서 한 개의 데이터만 처리할 수 있기 떄문에, 1은 onNext가 처리되고, 2에서 오버플로가 발생했고, 3부터는 전부 drop된다는 뜻으로 저렇게 로그가 찍힌것 아닐까 싶다.
public class ExampleTest13_12 {
@Test
public void generateNumberTest() {
StepVerifier
.create(BackpressureTestExample.generateNumber(), 1L)
.thenConsumeWhile(num -> num >= 1)
.expectError()
.verifyThenAssertThat()
.hasDroppedElements();
}
}
오버플로로 인해 에러가 발생함을 기대하는 예제이다.
expectError(): 에러를 기대한다.
verifyThenAssertThat(): 검증을 트리거하고 추가적인 Assertion을 할 수 있다.
hasDroppedElements(): Drop된 데이터가 있음을 Assertion한다.
✅ Context 테스트
다음은 Context 테스트 대상 클래스이다.
public class ContextTestExample {
public static Mono<String> getSecretMessage(Mono<String> keySource) {
return keySource
.zipWith(Mono.deferContextual(ctx ->
Mono.just((String)ctx.get("secretKey"))))
.filter(tp ->
tp.getT1().equals(
new String(Base64Utils.decodeFromString(tp.getT2())))
)
.transformDeferredContextual(
(mono, ctx) -> mono.map(notUse -> ctx.get("secretMessage"))
);
}
}
Context에는 secretKey와 secretMessage가 저장되어 있다.
getSecretMessage()는 파라미터로 입력받은 keySource와 Conext에 저장된 secretKey가 일치하면 Context에 저장된 secretMessage를 리턴한다.
public class ExampleTest13_14 {
@Test
public void getSecretMessageTest() {
Mono<String> source = Mono.just("hello");
StepVerifier
.create(
ContextTestExample
.getSecretMessage(source)
.contextWrite(context ->
context.put("secretMessage", "Hello, Reactor"))
.contextWrite(context -> context.put("secretKey", "aGVsbG8="))
)
.expectSubscription()
.expectAccessibleContext()
.hasKey("secretKey")
.hasKey("secretMessage")
.then()
.expectNext("Hello, Reactor")
.expectComplete()
.verify();
}
}
expectAccessibleSubscription(): 구독 이후, Context가 전파됨을 기대한다.
hasKey(): Context에 파라미터에 해당하는 값이 있음을 기대한다.
+) Context에 저장된 secretKey 값은 "hello"를 Base64 인코딩한 문자열이다.
✅ Record 기반 테스트
다음은 Record 기반 테스트 대상 클래스이다.
public class RecordTestExample {
public static Flux<String> getCapitalizedCountry(Flux<String> source) {
return source
.map(country -> country.substring(0, 1).toUpperCase() +
country.substring(1));
}
}
public class ExampleTest13_16 {
@Test
public void getCountryTest() {
StepVerifier
.create(RecordTestExample.getCapitalizedCountry(
Flux.just("korea", "england", "canada", "india")))
.expectSubscription()
.recordWith(ArrayList::new)
.thenConsumeWhile(country -> !country.isEmpty())
.consumeRecordedWith(countries -> {
assertThat(
countries
.stream()
.allMatch(country ->
Character.isUpperCase(country.charAt(0))),
is(true)
);
})
.expectComplete()
.verify();
}
}
recordWith()는 파라미터로 전달한 컬렉션에 emit된 데이터를 추가하는 세션을 시작한다.
thenConsumeWhtile()로 파라미터로 전달한 Predicate와 일치하는 데이터에 대해 소비할 수 있도록 한다.
consumeRecordedWith()로 컬렉션에 기록된 데이터를 소비한다.여기서는 모든 데이터의 첫 글자가 대문자인지 여부를 확인한다.
-> 이처럼 구체적인 조건으로 Assertion해야 하는 경우 Record 기반 테스트를 사용한다.
public class ExampleTest13_17 {
@Test
public void getCountryTest() {
StepVerifier
.create(RecordTestExample.getCapitalizedCountry(
Flux.just("korea", "england", "canada", "india")))
.expectSubscription()
.recordWith(ArrayList::new)
.thenConsumeWhile(country -> !country.isEmpty())
.expectRecordedMatches(countries ->
countries
.stream()
.allMatch(country ->
Character.isUpperCase(country.charAt(0))))
.expectComplete()
.verify();
}
}
이번에는 테스트 시나리오는 같지만, expectRecordedMatched() 메서드 내에 Predicate를 사용했다.
-> 테스트 결과는 같지만 코드가 조금 더 간결해졌다.
TestPublisher를 사용한 테스팅
✅ TestPublisher
record-test 모듈에서 지원하는 테스트 전용 Publisher.
TestPublisher를 사용하면, 개발자가 프로그래밍 방식으로 Signal을 발생시키면서 원하는 상황을 미세하게 재연하며 테스트할 수 있다.
✅ 정상 동작하는(Well-behaved) TestPublisher
emit하는 데이터가 Null인지, 요청하는 개수보다 더 많은 데이터를 emit하는지 등의 리액티브 스트림즈 사양 위반 여부를 사전에 체크.
public class ExampleTest13_18 {
@Test
public void divideByTwoTest() {
TestPublisher<Integer> source = TestPublisher.create();
StepVerifier
.create(GeneralTestExample.divideByTwo(source.flux()))
.expectSubscription()
.then(() -> source.emit(2, 4, 6, 8, 10))
.expectNext(1, 2, 3, 4)
.expectError()
.verify();
}
}
TestPublisher를 통해 테스트에 필요한 데이터를 emit할 수 있다.
이처럼 간단한 예제에서는 일반 Flux를 사용해도 별 차이 없어보이는데, 만약 조건에 따라서 Signal을 변경해야 되는 등 복잡한 상황이라면 테스트하기가 용이할 것이다.
✅ TestPublisher가 발생시키는 Signal 유형
- next(T...): 1개 이상의 onNext Signal을 발생시킨다.
- emit(T...): 1개 이상의 onNext Signal을 발생시킨 후, onComplete Signal을 발생시킨다.
- complete(): onComplete Signal을 발생시킨다.
- error(Throwable): onError Signal을 발생시킨다.
✅ 오동작하는(Misbehaving) TestPublisher
리액티브 스트림즈 사양 위반 여부를 사전에 체크하지 않는다. 위반되더라도 TestPublisher는 데이터를 emit할 수 있다.
public class ExampleTest13_19 {
@Test
public void divideByTwoTest() {
TestPublisher<Integer> source =
TestPublisher.createNoncompliant(TestPublisher.Violation.ALLOW_NULL);
StepVerifier
.create(GeneralTestExample.divideByTwo(source.flux()))
.expectSubscription()
.then(() -> {
getDataSource().stream()
.forEach(data -> source.next(data));
source.complete();
})
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify();
}
private static List<Integer> getDataSource() {
return Arrays.asList(2, 4, 6, 8, null);
}
}
오동작하는 TestPublisher를 ALLOW_NULL로 위반 조건을 지정하여, 데이터의 값이 null이라도 정상 동작하도록 한다.
그리고 null을 포함하는 데이터 소스를 사용했다.
<결과>
java.lang.NullPointerException: e
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at reactor.util.concurrent.SpscArrayQueue.offer(SpscArrayQueue.java:51)
at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:909)
at reactor.test.publisher.DefaultTestPublisher$TestPublisherSubscription.onNext(DefaultTestPublisher.java:233)
...
TestPublisher가 onNext Signal을 전송하는 과정에서 NullPointerException이 발생했다.
만약 TestPublisher를 정상동작하는 Publisher로 변경하면 다음과 같은 결과를 볼 수 있다.
java.lang.NullPointerException: emitted values must be non-null
onNext Signal을 전송하기 전에 Validation을 거쳐 전송할 데이터가 null이면 NullPointerException을 던진다.
오동작하는 경우와 아닌 경우의 차이점은,
오동작하는 경우는 TestPublisher가 데이터가 null인 경우에도 onNext()까지 정상적으로 호출하고, onNext() 내부에서 null 데이터가 돌아다니면서 NPE가 발생한 것이고,
정상동작하는 경우는 TestPublisher가 데이터가 null인 경우 미리 Validation을 통해 onNext()를 호출하지 않고 NPE를 발생시킨다.
Q: 리액티브 스트림즈가 일반적으로 null을 허용하지 않는데, ALLOW_NULL을 통한 null을 허용하는 테스트가 무슨 의미가 있지?
A: 특정 상황에서는 null을 허용하지 않는 제약 조건을 무시하고 싶을 수 있다. 예를 들어, 어떤 메서드가 null 값을 처리하는데 이에 대한 예외 처리가 제대로 이루어졌는지 확인하려면 null 값을 전달하여 해당 예외가 발생하는지 확인할 수 있다.
✅ 오동작하는 TestPublisher를 생성하기 위한 위반 조건
ALLOW_NULL: 전송할 데이터가 null이어도 NullpointerException을 발생시키지 않고 다음 호출을 진행한다.
CLEANUP_ON_TERMINATE: onComplete, onError, emit과 같은 Terminal Signal을 연달아 여러 번 보낼 수 있도록 한다.
DEFER_CANCELLATION: cancel Signal을 무시하고 계속해서 Signal을 emit할 수 있도록 한다.
REQUEST_OVERFLOW: 요청 개수보다 더 많은 Signal이 발생하더라도 IllegalStateException을 발생시키지 않고 다음 호출을 진행할 수 있도록 한다.
PublisherProbe를 사용한 테스팅
✅ PublisherProbe
PublisherProbe를 통해 Sequence의 실행 경로를 테스트할 수 있다.
-> 주로 조건에 따라 Sequence가 분기되는 경우, Sequence의 실행 경로를 추적해서 정상적으로 실행되었는지 테스트한다.
다음은 PublisherProbe 테스트를 위한 기반 클래스다.
public class PublisherProbeTestExample {
public static Mono<String> processTask(Mono<String> main, Mono<String> standby) {
return main
.flatMap(massage -> Mono.just(massage))
.switchIfEmpty(standby);
}
public static Mono<String> supplyMainPower() {
return Mono.empty();
}
public static Mono supplyStandbyPower() {
return Mono.just("# supply Standby Power");
}
}
processTask() 메서드는 평소에는 주전력을 사용하다가 전력이 끊겼을 경우 예비 전력을 사용하는 상황을 시뮬레이션한다.
switchIfEmpty() Operator는 Upstream Publisher가 데이터 emit 없이 종료되는 경우, 대체 Publisher가 데이터를 emit하도록 한다.
public class ExampleTest13_21 {
@Test
public void publisherProbeTest() {
PublisherProbe<String> probe =
PublisherProbe.of(PublisherProbeTestExample.supplyStandbyPower());
StepVerifier
.create(PublisherProbeTestExample
.processTask(
PublisherProbeTestExample.supplyMainPower(),
probe.mono())
)
.expectNextCount(1)
.verifyComplete();
probe.assertWasSubscribed();
probe.assertWasRequested();
probe.assertWasNotCancelled();
}
}
테스트는 Sequence가 분기되는 상황에서 실제로 어느 Publisher가 동작하는지 해당 Publisher의 실행 경로를 텟흐트한다.
PublisherProbe 클래스의 assertWas... 메서드를 통해 기대하는 Publihser가 구독을 했는지, 요청을 했는지, 중간에 취소가 되지 않았는지를 Assertion한다.
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Operator (0) | 2024.05.13 |
---|---|
[리액티브 프로그래밍] Debugging (0) | 2024.05.06 |
[리액티브 프로그래밍] Context (0) | 2024.05.06 |
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |