스레드(Thread)의 개념 이해
Reactor에서 사용되는 Scheduler는 Reactor Sequence에서 사용되는 스레드를 관리해 주는 관리자 역할을 한다.
-> 그럼 먼저 스레드에 대해 알아보자.
✅ 물리적인 스레드
실제 하드웨어와 관련된 스레드.
그림을 보면 하나의 코어는 두 개의 스레드를 포함하고 있다.
-> 물리적인 스레드는 논리적인 코어라고도 부른다.
✅ 논리적인 스레드
소프트웨어적으로 생성되는 스레드. (ex. Java에서 사용되는 스레드)
논리적인 스레드는 물리적인 스레드의 가용 범위 내에서 실행될 수 있다.
물리적인 스레드의 병렬성: 실제로 작업을 동시에 실행함.
논리적인 스레드의 동시성: 동시에 실행되는 것처럼 보이는 것.
-> 실제로는 무수히 많은 논리적인 스레드가 네 개의 물리적인 스레드를 아주 빠른 속도로 번걸아 사용하면서 마치 동시에 실행되는 것처럼 보이게 한다.
Scheduler란?
✅ Scheduler
비동기 프로그래밍을 위해 사용되는 스레드를 관리해 주는 역할. 어떤 스레드에서 무엇을 처리할지 제어한다.
-> Scheduler가 복잡한 스레드의 제어를 대신해 주기 때문에 개발자가 스레드를 직접 제어하는 부담에서 벗어날 수 있다.
Scheduler를 위한 전용 Operator
✅ subscribeOn()
구독이 발생한 직후 실행될 스레드를 지정하는 Operator.
원본 Publisher의 동작을 수행하기 위한 스레드라고 볼 수 있다.
@Slf4j
public class Example10_1 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
subscribeOn(): 원본 Flux가 데이터 소스를 emit할 스레드를 지정한다.
doOnNext(): 원본 Flux에서 emit되는 데이터를 로그로 출력한다.
doOnSubscribe(): 구독이 발생한 시점에 추가적인 동작을 처리한다. 구독에 발생한 시점에 실행되는 스레드가 무엇인지 확인할 수 있다.
<결과>
17:36:17.446 [main] INFO - # doOnSubscribe
17:36:17.465 [boundedElastic-1] INFO - # doOnNext: 1
17:36:17.502 [boundedElastic-1] INFO - # onNext: 1
17:36:17.503 [boundedElastic-1] INFO - # doOnNext: 3
17:36:17.504 [boundedElastic-1] INFO - # onNext: 3
17:36:17.504 [boundedElastic-1] INFO - # doOnNext: 5
17:36:17.504 [boundedElastic-1] INFO - # onNext: 5
17:36:17.504 [boundedElastic-1] INFO - # doOnNext: 7
17:36:17.504 [boundedElastic-1] INFO - # onNext: 7
main 스레드에서 구독을 시작하므로, 구독이 발생한 시점인 doOnSubscribe는 main 스레드에서 출력된다.
doOnNext는 subscribeOn()으로 인해 boundedElastic-1이라는 새로운 스레드에서 실행된다.
✅ publishOn()
Publisher가 Downstream으로 Signal을 전송할 때 실행되는 스레드를 제어하는 Operator.
publishOn()을 기준으로 Downstream의 실행 스레드를 변경한다.
@Slf4j
public class Example10_2 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
<결과>
17:40:08.784 [main] INFO - # doOnSubscribe
17:40:08.794 [main] INFO - # doOnNext: 1
17:40:08.798 [main] INFO - # doOnNext: 3
17:40:08.798 [main] INFO - # doOnNext: 5
17:40:08.798 [parallel-1] INFO - # onNext: 1
17:40:08.799 [main] INFO - # doOnNext: 7
17:40:08.799 [parallel-1] INFO - # onNext: 3
17:40:08.800 [parallel-1] INFO - # onNext: 5
17:40:08.800 [parallel-1] INFO - # onNext: 7
subscribeOn()이 없기 때문에, doOnNext는 main 스레드에서 실행되고,
publishOn() 기준 Downstream인 onNext가 parallel-1 이라는 새로운 스레드에서 실행된다.
✅ parallel()
subscribeOn()과 publishOn()은 동시성을 가지는 논리적인 스레드에 해당되지만,
parallel()은 병렬성을 가지는 물리적인 스레드에 해당된다.
-> 라운드 로빈 방식으로 CPU의 물리적인 스레드(논리적인 코어) 개수만큼의 스레드를 병렬로 실행한다.
+) Mac에서는 System Information > Total Number of Cores를 통해 논리적인 코어 개수를 알 수 있다.
@Slf4j
public class Example10_3 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel()
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
사실 parallel()은 emit되는 데이터를 CPU의 논리적인 코어 수에 맞게 골고루 분배하는 역할만 하며, 실제로 병렬 작업을 수행할 스레드의 할당은 runOn() Operator가 담당한다.
<결과>
17:46:10.923 [parallel-4] INFO - # onNext: 7
17:46:10.925 [parallel-5] INFO - # onNext: 9
17:46:10.925 [parallel-8] INFO - # onNext: 15
17:46:10.925 [parallel-10] INFO - # onNext: 19
17:46:10.916 [parallel-1] INFO - # onNext: 1
17:46:10.925 [parallel-6] INFO - # onNext: 11
17:46:10.923 [parallel-3] INFO - # onNext: 5
17:46:10.920 [parallel-2] INFO - # onNext: 3
17:46:10.925 [parallel-7] INFO - # onNext: 13
17:46:10.925 [parallel-9] INFO - # onNext: 17
나의 컴퓨터는 10개의 논리적인 코어를 갖고 있어서, 총 10개의 숫자를 10개의 스레드에서 병렬로 실행할 수 있다.
만약 물리적인 스레드를 전부 사용할 필요가 없다면 스레드의 개수를 지정해 주면 된다.
@Slf4j
public class Example10_4 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
<결과>
17:47:11.707 [parallel-2] INFO - # onNext: 3
17:47:11.709 [parallel-4] INFO - # onNext: 7
17:47:11.707 [parallel-3] INFO - # onNext: 5
17:47:11.707 [parallel-1] INFO - # onNext: 1
17:47:11.741 [parallel-2] INFO - # onNext: 11
17:47:11.741 [parallel-1] INFO - # onNext: 9
17:47:11.741 [parallel-4] INFO - # onNext: 15
17:47:11.741 [parallel-3] INFO - # onNext: 13
17:47:11.741 [parallel-2] INFO - # onNext: 19
17:47:11.741 [parallel-1] INFO - # onNext: 17
4개의 스레드가 병렬로 실행된다.
publishOn()과 subscribeOn()의 동작 이해
✅ publishOn()과 subscribeOn()을 사용하지 않을 경우
@Slf4j
public class Example10_5 {
public static void main(String[] args) {
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
<결과>
17:54:44.765 [main] INFO - # doOnNext fromArray: 1
17:54:44.767 [main] INFO - # doOnNext fromArray: 3
17:54:44.767 [main] INFO - # doOnNext fromArray: 5
17:54:44.767 [main] INFO - # doOnNext filter: 5
17:54:44.767 [main] INFO - # doOnNext map: 50
17:54:44.768 [main] INFO - # onNext: 50
17:54:44.768 [main] INFO - # doOnNext fromArray: 7
17:54:44.768 [main] INFO - # doOnNext filter: 7
17:54:44.768 [main] INFO - # doOnNext map: 70
17:54:44.768 [main] INFO - # onNext: 70
모든 과정이 main 스레드에서 실행된다.
✅ publishOn()을 하나만 사용할 경우
@Slf4j
public class Example10_6 {
public static void main(String[] args) throws InterruptedException {
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
<결과>
17:55:15.369 [main] INFO - # doOnNext fromArray: 1
17:55:15.375 [main] INFO - # doOnNext fromArray: 3
17:55:15.375 [main] INFO - # doOnNext fromArray: 5
17:55:15.376 [main] INFO - # doOnNext fromArray: 7
17:55:15.376 [parallel-1] INFO - # doOnNext filter: 5
17:55:15.377 [parallel-1] INFO - # doOnNext map: 50
17:55:15.377 [parallel-1] INFO - # onNext: 50
17:55:15.377 [parallel-1] INFO - # doOnNext filter: 7
17:55:15.378 [parallel-1] INFO - # doOnNext map: 70
17:55:15.378 [parallel-1] INFO - # onNext: 70
publishOn() 기준 Downstream인 filter()부터 parallel-1 스레드에서 실행된다.
✅ publishOn()을 두 번 사용할 경우
publishOn()은 Operator 체인상에서 한 개 이상을 사용할 수 있다.
@Slf4j
public class Example10_7 {
public static void main(String[] args) throws InterruptedException {
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
<결과>
17:56:45.415 [main] INFO - # doOnNext fromArray: 1
17:56:45.421 [main] INFO - # doOnNext fromArray: 3
17:56:45.421 [main] INFO - # doOnNext fromArray: 5
17:56:45.421 [main] INFO - # doOnNext fromArray: 7
17:56:45.421 [parallel-2] INFO - # doOnNext filter: 5
17:56:45.422 [parallel-1] INFO - # doOnNext map: 50
17:56:45.422 [parallel-2] INFO - # doOnNext filter: 7
17:56:45.423 [parallel-1] INFO - # onNext: 50
17:56:45.423 [parallel-1] INFO - # doOnNext map: 70
17:56:45.423 [parallel-1] INFO - # onNext: 70
publishOn()을 기준으로 스레드가 변경되는 것을 알 수 있다.
✅ subscribeOn()과 publishOn()을 함께 사용할 경우
@Slf4j
public class Example10_8 {
public static void main(String[] args) throws InterruptedException {
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
}
<결과>
17:57:46.648 [boundedElastic-1] INFO - # doOnNext fromArray: 1
17:57:46.656 [boundedElastic-1] INFO - # doOnNext fromArray: 3
17:57:46.656 [boundedElastic-1] INFO - # doOnNext fromArray: 5
17:57:46.656 [boundedElastic-1] INFO - # doOnNext filter: 5
17:57:46.657 [boundedElastic-1] INFO - # doOnNext fromArray: 7
17:57:46.657 [boundedElastic-1] INFO - # doOnNext filter: 7
17:57:46.657 [parallel-1] INFO - # doOnNext map: 50
17:57:46.657 [parallel-1] INFO - # onNext: 50
17:57:46.657 [parallel-1] INFO - # doOnNext map: 70
17:57:46.658 [parallel-1] INFO - # onNext: 70
구독이 발생한 직후, Publisher가 데이터를 emit하는 과정이 boundedElastic-1 스레드에서 실행된다.
그리고 별도의 publishOn()이 추가되지 않았기 때문에, filter()는 여전히 boundedElastic-1 스레드에서 실행된다.
publishOn()이 추가된 이후는 parallel-1 스레드에서 실행된다.
subscribeOn()은 Operator 체인상에서 어떤 위치에 있든, 구독 시점 직후, 즉 Publisher가 데이터를 emit하기 전에 스레드를 변경한다.
Scheduler의 종류
✅ Schedulers.immediate()
별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업을 처리하고자 할 때 사용한다.
@Slf4j
public class Example10_9 {
public static void main(String[] args) throws InterruptedException {
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.immediate())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
}
첫 번째 publishOn()에서 Schedulers.parallel() 스케줄러를 사용했고, 두 번째 publishOn()에서 Schedulers.immediate() 스케줄러를 사용했다.
<결과>
23:14:53.955 [parallel-1] INFO - # doOnNext filter: 5
23:14:53.969 [parallel-1] INFO - # doOnNext map: 50
23:14:53.969 [parallel-1] INFO - # onNext: 50
23:14:53.969 [parallel-1] INFO - # doOnNext filter: 7
23:14:53.969 [parallel-1] INFO - # doOnNext map: 70
23:14:53.970 [parallel-1] INFO - # onNext: 70
첫 번째 publishOn()에서 생성된 parallel-1을 그대로 끝까지 사용한다.
Q: 스레드를 그대로 유지하고 싶다면 굳이 publishOn(Schedulers.immediate())를 사용할 필요가 있을까?
A: 만약 위 예시 코드가 어떤 API이고, 파라미터로 Scheduler를 전달할 수 있다고 가정한다면, 이 API를 사용하는 입장에서 map() 이후의 Operator 체인 작업은 원래 실행되던 스레드에서 실행하게 하고 싶을 때도 있을 것이다. 즉, Scheduler가 필요한 API가 있는데 별도의 스레드를 추가로 할당하고 싶지 않을 경우 Schedulers.immediate()를 사용할 수 있다.
✅ Schedulers.single()
스레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용한다.
@Slf4j
public class Example10_10 {
public static void main(String[] args) throws InterruptedException {
doTask("task1")
.subscribe(data -> log.info("# onNext: {}", data));
doTask("task2")
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
private static Flux<Integer> doTask(String taskName) {
return Flux.fromArray(new Integer[] {1, 3, 5, 7})
.publishOn(Schedulers.single())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
}
}
<결과>
23:19:46.432 [single-1] INFO - # task1 doOnNext filter: 5
23:19:46.445 [single-1] INFO - # task1 doOnNext map: 50
23:19:46.445 [single-1] INFO - # onNext: 50
23:19:46.445 [single-1] INFO - # task1 doOnNext filter: 7
23:19:46.446 [single-1] INFO - # task1 doOnNext map: 70
23:19:46.446 [single-1] INFO - # onNext: 70
23:19:46.449 [single-1] INFO - # task2 doOnNext filter: 5
23:19:46.450 [single-1] INFO - # task2 doOnNext map: 50
23:19:46.450 [single-1] INFO - # onNext: 50
23:19:46.450 [single-1] INFO - # task2 doOnNext filter: 7
23:19:46.450 [single-1] INFO - # task2 doOnNext map: 70
23:19:46.450 [single-1] INFO - # onNext: 70
두 번의 구독을 하지만 스레드는 첫 번째 구독에서 이미 생성된 스레드를 재사용하게 된다.
이처럼 Schedulers.single()은 하나의 스레드를 재사용하면서 다수의 작업을 처리할 수 있고, 지연 시간이 짧은 작업을 처리하는 것이 효 과적이다.
✅ Schedulers.newSingle()
Schedulers.single()이 하나의 스레드를 재사용하는 반면, Schedulers.newSingle()은 호출할 때마다 매번 새로운 스레드를 생성한다.
@Slf4j
public class Example10_11 {
public static void main(String[] args) throws InterruptedException {
doTask("task1")
.subscribe(data -> log.info("# onNext: {}", data));
doTask("task2")
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
private static Flux<Integer> doTask(String taskName) {
return Flux.fromArray(new Integer[] {1, 3, 5, 7})
.publishOn(Schedulers.newSingle("new-single", true))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
}
}
Schedulers.newSingle()의 첫 번째 파라미터는 생성할 스레드의 이름을, 두 번째 파라미터는 이 스레드를 데몬 스레드로 동작하게 할지 여부를 설정한다.
Q: 데몬 스레드?
A: 보조 스레드라고도 불리는데, 주 스레드가 종료되면 자동으로 종료되는 특성이 있다. 위 코드에서는 데몬 스레드를 true로 설정해서 main 스레드가 종료되면 자동으로 종료되도록 설정했다.
<결과>
23:23:16.942 [new-single-2] INFO - # task2 doOnNext filter: 5
23:23:16.941 [new-single-1] INFO - # task1 doOnNext filter: 5
23:23:16.966 [new-single-2] INFO - # task2 doOnNext map: 50
23:23:16.966 [new-single-2] INFO - # onNext: 50
23:23:16.966 [new-single-1] INFO - # task1 doOnNext map: 50
23:23:16.966 [new-single-2] INFO - # task2 doOnNext filter: 7
23:23:16.966 [new-single-1] INFO - # onNext: 50
23:23:16.966 [new-single-2] INFO - # task2 doOnNext map: 70
23:23:16.966 [new-single-2] INFO - # onNext: 70
23:23:16.966 [new-single-1] INFO - # task1 doOnNext filter: 7
23:23:16.967 [new-single-1] INFO - # task1 doOnNext map: 70
23:23:16.967 [new-single-1] INFO - # onNext: 70
✅ Schedulers.boundedElastic()
ExecutorService 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 반납하여 재사용하는 방식이다.
기본적으로 CPU 코어 수 * 10만큼의 스레드를 생성하며, 풀에 있는 모든 스레드가 작업을 처리하고 있다면 이용 가능한 스레드가 생길 때까지 최대 100,000개의 작업이 큐에서 대기할 수 있다.
스레드의 수가 Schedulers.parallel()에 비해 상대적으로 많고, 큐도 갖고있는만큼 Blocking I/O 작업을 효과적으로 처리하기 좋다.
✅ Schedulers.parallel()
Non-Blocking I/O에 최적화되어 있는 Scheduler다.
-> CPU 코어 수만큼의 스레드를 생성한다.
✅ Schedulers.fromExecutorService()
기존에 이미 사용하고 있는 ExecutorService가 있다면 이 ExecutorService로부터 Scheduler를 생성하는 방식이다.
-> 그러나 Reactor에서는 이 방식을 권장하지 않는다.
✅ Schedulers.newXXXX()
Schedulers.single(), Schedulers.boundedElastic(), Schedulers.parallel()은 Reactor에서 제공하는 default Scheduler 인스턴스를 사용한다.
하지만 필요하다면 Schedulers.newSingle(), Schedulers.newBoundedElastic(), Schedulers.newParallel()를 사용해서 새로운 Scheduler 인스턴스를 생성할 수 있다.
즉, 스레드 이름, 생성 가능한 default 스레드 개수, 스레드 유휴 시간, 데몬 스레드 동작 여부 등을 직접 지정해서 커스텀 스레드 풀을 생성할 수 있다.
참고자료
https://en.wikipedia.org/wiki/Round-robin_scheduling
'book > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
[리액티브 프로그래밍] Debugging (0) | 2024.05.06 |
---|---|
[리액티브 프로그래밍] Context (0) | 2024.05.06 |
[리액티브 프로그래밍] Sinks (0) | 2024.04.21 |
[리액티브 프로그래밍] Backpressure (0) | 2024.04.21 |
[리액티브 프로그래밍] Cold Sequence와 Hot Sequence (0) | 2024.03.18 |