Context란?
✅ 프로그래밍에서 Context
어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보
ex)
- ServletContext: Servlet이 Servlet Container와 통신하기 위해 필요한 정보를 제공하는 인터페이스.
- ApplicationContext: SpringFramwork에서 애플리케이션 정보를 제공하는 인터페이스. (ex. Spring Bean)
- SecurityContext: Spring Security에서 애플리케이션 사용자의 인증 정보를 제공하는 인터페이스.
✅ Reactor에서 Context
Operator 같은 Reactor 구성요소 간에 전파되는 key/value 형태의 저장소.
-> Operator가 Context의 정보를 동일하게 이용할 수 있다.
-> 각각의 실행 스레드와 매핑되는 ThreadLocal과 달리, Subscriber와 매핑된다.
-> 구독이 발생할 때마다 구독과 연결된 하나의 Context가 생긴다.
✅ 컨텍스트에 데이터 쓰기
예제를 통해 알아보자.
@Slf4j
public class Example11_1 {
public static void main(String[] args) throws InterruptedException {
Mono
.deferContextual(ctx ->
Mono
.just("Hello" + " " + ctx.get("firstName"))
.doOnNext(data -> log.info("# just doOnNext : {}", data))
)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.transformDeferredContextual(
(mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
)
.contextWrite(context -> context.put("lastName", "Jobs"))
.contextWrite(context -> context.put("firstName", "Steve"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
contextWrite() Operator를 통해서 Context를 쓰고 있다.
실제 데이터를 쓰는 동작은 Context API중 하나인 put()을 통해서 쓸 수 있다.
+) context.put()을 통해 Context에 데이터를 쓰면, 불변 객체를 다음 contextWrite() Operator로 전달해 스레드 안전성을 보장한다.
이제 구독이 발생하면 Context에 "Steve"와 "Jobs"라는 두 개의 데이터가 저장될 것이다.
public abstract class Mono<T> implements CorePublisher<T> {
...
public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {
return onAssembly(new MonoContextWrite<>(this, contextModifier));
}
...
}
contextWrite() Operator의 파라미터는 Function<Context, Context>인 것을 알 수 있다.
✅ Context에 쓰인 데이터 읽기
1. 원본 데이터 소스 레벨에서 읽는 방식
위 예제에서 deferContextual() Operator를 통해 읽을 수 있다.
+) 파라미터인 "ctx"는 Context 타입이 아닌 ContextView 타입이라는 사실을 기억하자. (쓸 때는 Context, 읽을 때는 ContextView)
+) contextWrite() Operator가 deferContextual() Operator의 Downstream에 위치하지만, Context는 Operator 체인의 아래에서 위로 전파되기 떄문에 deferContextual() Operator는 Downstream에서 쓰여진 데이터를 읽을 수 있다.
2. Operator 체인의 중간에서 읽는 방식
transformDeferredContextual() Operator를 사용한다.
<결과>
11:56:35.521 [boundedElastic-1] INFO - # just doOnNext : Hello Steve
11:56:35.552 [parallel-1] INFO - # onNext: Hello Steve Jobs
subscribeOn()과 publishOn()을 사용해서 데이터를 emit하는 스레드와 데이터를 처리하는 스레드를 분리했기 때문에,
Context에서 데이터를 읽어오는 작업을 각각 다른 스레드에서 수행했음을 알 수 있다.
자주 사용되는 Context 관련 API
✅ Context API
Context API | 설명 |
put(key, value) | key/value 형태로 Context에 값을 쓴다. |
of(key1, value1, key2, value2, ...) // 최대 5개 까지 가능 | key/value 형태로 Context에 여러 개의 값을 쓴다. |
putAll(ContextView) | 현재 Context와 파라미터로 입력된 ContextView를 merge한다. |
delete(key) | Context에서 key에 해당하는 value를 삭제한다. |
Context에 데이터를 쓰기 위해서는 Context API를 사용해야 한다.
@Slf4j
public class Example11_3 {
public static void main(String[] args) throws InterruptedException {
final String key1 = "company";
final String key2 = "firstName";
final String key3 = "lastName";
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(key1) + ", " + ctx.get(key2) + " " + ctx.get(key3))
)
.publishOn(Schedulers.parallel())
.contextWrite(context ->
context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
)
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}" , data));
Thread.sleep(100L);
}
}
Context.of()는 Context를 리턴하는데, putAll()은 ContextView를 파라미터로 받기 때문에,
Context를 ContextView로 변환해 주어야 하는데, 이 작업을 readOnly() API를 통해 수행한다.
<결과>
12:18:15.930 [parallel-1] INFO - # onNext: Apple, Steve Jobs
✅ ContextView API
ContextView API | 설명 |
get(key) | ContextView에서 key에 해당하는 value를 반환한다. |
getOrEmpty(key) | ContextView에서 key에 해당되는 value를 Optional로 래핑해서 반환한다. |
getOrDefualt(key, dafault value) | ContextView에서 key에 해당하는 value를 가져온다. key에 해당하는 value가 없으면 default value를 가져온다. |
hashKey(key) | ContextView에서 특정 key가 존재하는지를 확인한다. |
isEmpty() | Context가 비어 있는지 확인한다. |
size() | Context 내에 있는 key/value의 개수를 반환한다. |
Context에서 데이터를 읽기 위해서는 ContextView API를 사용해야 한다.
@Slf4j
public class Example11_4 {
public static void main(String[] args) throws InterruptedException {
final String key1 = "company";
final String key2 = "firstName";
final String key3 = "lastName";
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(key1) + ", " +
ctx.getOrEmpty(key2).orElse("no firstName") + " " +
ctx.getOrDefault(key3, "no lastName"))
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}" , data));
Thread.sleep(100L);
}
}
contextWrite에서 key1에 대한 value만 저장하기 때문에,
key2, key3에 대한 value를 읽을 때는 getOrEmpty(), getOrDefault()를 사용했다.
<결과>
12:24:46.622 [parallel-1] INFO - # onNext: Apple, no firstName no lastName
Context의 특징
✅ Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다
@Slf4j
public class Example11_5 {
public static void main(String[] args) throws InterruptedException {
final String key1 = "company";
Mono<String> mono = Mono.deferContextual(ctx ->
Mono.just("Company: " + " " + ctx.get(key1))
)
.publishOn(Schedulers.parallel());
mono.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# subscribe1 onNext: {}", data));
mono.contextWrite(context -> context.put(key1, "Microsoft"))
.subscribe(data -> log.info("# subscribe2 onNext: {}", data));
Thread.sleep(100L);
}
}
얼핏 보면 두 개의 데이터가 하나의 Context에 저장될 것 같지만, 각 구독마다 다른 Context가 생성된다.
<결과>
12:28:04.953 [parallel-1] INFO - # subscribe1 onNext: Company: Apple
12:28:04.953 [parallel-2] INFO - # subscribe2 onNext: Company: Microsoft
✅ Context는 Operator 체인의 아래에서 위로 전파된다
@Slf4j
public class Example11_6 {
public static void main(String[] args) throws InterruptedException {
String key1 = "company";
String key2 = "name";
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(key1)) // key1 read
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key2, "Bill")) // key2 write
.transformDeferredContextual((mono, ctx) ->
mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve")) // key2 read
)
.contextWrite(context -> context.put(key1, "Apple")) // key1 write
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
<결과>
12:32:07.037 [parallel-1] INFO - # onNext: Apple, Steve
key1의 read 시점은 write 시점보다 Upstream에 있기 때문에, 정상적으로 Subscriber에 전달되는 것을 볼 수 있다.
그러나, key2의 read 시점은 write 시점보다 DownStream에 있기 때문에, key2로 read할 시점에 value가 Context에 없어 default value인 "Steve"가 출력되는 것이다.
+) 일반적으로 모든 Operator에서 저장된 데이터를 읽을 수 있도록 contextWrite()를 Operator 체인의 맨 마지막에 둔다.
✅ 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다
Context가 아래에서 위로 전파되는 특성 때문에, Operator 체인 상에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다.
✅ Inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다
@Slf4j
public class Example11_7 {
public static void main(String[] args) throws InterruptedException {
String key1 = "company";
Mono
.just("Steve")
.flatMap(name ->
Mono.deferContextual(ctx ->
Mono
.just(ctx.get(key1) + ", " + name)
.transformDeferredContextual((mono, innerCtx) ->
mono.map(data -> data + ", " + innerCtx.get("role"))
)
.contextWrite(context -> context.put("role", "CEO"))
)
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
여기서는 데이터를 두 번 쓰고 있는데,
한 번은 Operator 체인의 제일 마지막에 쓰고 있고,
한 번은 flatMap() Operator 내부에 존재하는 Operator 체인에서 쓰고 있다.
flatMap() Operator 내부에 있는 Sequence를 Inner Sequence라고 하는데,
Inner Sequence에서는 외부 Context의 값을 읽을 수 있다. (get(key1) 부분)
<결과>
12:44:22.662 [parallel-1] INFO - # onNext: Apple, Steve, CEO
✅ Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없다
위 예제에서 일부 코드가 추가된 예제를 보자.
@Slf4j
public class Example11_7 {
public static void main(String[] args) throws InterruptedException {
String key1 = "company";
Mono
.just("Steve")
.transformDeferredContextual((stringMono, ctx) -> // 추가
ctx.get("role")) // 추가
.flatMap(name ->
Mono.deferContextual(ctx ->
Mono
.just(ctx.get(key1) + ", " + name)
.transformDeferredContextual((mono, innerCtx) ->
mono.map(data -> data + ", " + innerCtx.get("role"))
)
.contextWrite(context -> context.put("role", "CEO"))
)
)
.publishOn(Schedulers.parallel())
.contextWrite(context -> context.put(key1, "Apple"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
<결과>
Caused by: java.util.NoSuchElementException: Context does not contain key: role
Context에 'role'이라는 key가 없기 때문에 NoSuchElementException이 발생한다.
Inner Sequence 외부에서는 Inner Sequence 내부에서 저장한 'role' 데이터를 읽을 수 없음을 알 수 있다.
✅ Context는 직교성(독립성)을 가지는 정보를 전송하는 데 적합하다
@Slf4j
public class Example11_8 {
public static final String HEADER_AUTH_TOKEN = "authToken";
public static void main(String[] args) {
Mono<String> mono =
postBook(Mono.just(
new Book("abcd-1111-3533-2809"
, "Reactor's Bible"
,"Kevin"))
)
.contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));
mono.subscribe(data -> log.info("# onNext: {}", data));
}
private static Mono<String> postBook(Mono<Book> book) {
return Mono
.zip(book,
Mono
.deferContextual(ctx ->
Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
)
.flatMap(tuple -> {
String response = "POST the book(" + tuple.getT1().getBookName() +
"," + tuple.getT1().getAuthor() + ") with token: " +
tuple.getT2();
return Mono.just(response); // HTTP POST 전송을 했다고 가정
});
}
}
@AllArgsConstructor
@Data
class Book {
private String isbn;
private String bookName;
private String author;
}
예제는 Context를 실제로 어떻게 활용할 수 있을지 보여준다.
인증된 도서 관리자가 신규 도서를 등록하기 위해 도서 정보와 인증 토큰을 서버로 전송한다고 가정하자.
이 예제의 핵심은 구독 직전에 contextWrite()로 저장한 인증 데이터를 다시 zip()에서 Context로부터 읽어 와서 사용한다는 것이다.
-> 이처럼 Context는 인증 정보같은 직교성(독립성)을 가지는 정보를 전송하는 데 적합하다
<결과>
12:56:04.653 [main] INFO - # onNext: POST the book(Reactor's Bible,Kevin) with token: eyJhbGciOi
참고자료
https://projectreactor.io/docs/core/release/api/
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Testing (0) | 2024.05.08 |
---|---|
[리액티브 프로그래밍] Debugging (0) | 2024.05.06 |
[리액티브 프로그래밍] Scheduler (0) | 2024.04.21 |
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |
[리액티브 프로그래밍] Backpressure (0) | 2024.04.21 |