[Spring-Reactive] Reactor (Sinks)
in Spring on Spring
Sinks
Reactor의
Sinks
는 리액티브 스트림의 Signal을 명시적으로 프로그래밍 방식으로 전송할 수 있는 도구.generate
,create
등의 Operator 방식과 달리, 멀티스레드 환경에서도 스레드 안정성을 보장할 수 있어 고급 상황에서 유용하다.
Sinks는 Reactor 3.4 버전에서 도입된 새로운 기능으로, Publisher의 일종으로 데이터를 발행하고 구독할 수 있는 유연한 방법을 제공 한다.
Sinks는 Thread-Safe하게 signal을 발생 시킨다.
create()
Operator 방식
개념
Flux.create()
내에서sink.next(...)
호출로 Signal 전송- 작업 수행 → 결과 처리 → Subscriber 전달
예시
Flux.create(sink -> {
IntStream.range(1, 6).forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.map(...).subscribe(...);
특징
- 단계별로 실행되는 스레드가 다름
subscribeOn
: 생성map
,subscribe
: publishOn 별도 스레드
- 결과적으로 3개의 스레드 사용
Sinks
방식
개념
Sinks.many().unicast()
등으로 Sink 생성emitNext(...)
로 Signal 명시적 전송
예시
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
IntStream.range(1, 6).forEach(n -> {
new Thread(() -> {
unicastSink.emitNext(doTask(n), FAIL_FAST);
}).start();
});
특징
emitNext()
는 멀티스레드에서도 안전- 총 7개의 스레드 사용 (Thread-0~4 + parallel-2 + parallel-1)
- 스레드 안전성 보장으로 병렬 환경에 적합
결론 요약
| 항목 | create() 방식 | Sinks 방식 | |——|—————-|————-| | 사용 용도 | 간단한 Signal 처리 | 복잡한 병렬 환경 | | 스레드 안전성 | 낮음 | 높음 | | 사용 방식 | sink를 통해 emit | emitNext()
명시적 호출 | | 스레드 구성 | 단계별로 2~3개 | 동시성 많아지면 5개 이상 |
참고
- 단순 흐름 제어에는
create()
사용 - 멀티스레드 환경, 외부 이벤트 처리에는
Sinks
사용 권장
유형
- Sinks.One: 단일 값을 발행하고 완료하거나 에러를 발행할 수 있다.
- Sinks.Many: 여러 값을 발행할 수 있다.
- Sinks.Empty: 완료 신호만 발행할 수 있다.
Sinks.One
/**
* Sinks.One 예제
* - 한 건의 데이터만 emit 하는 예제
*/
public class SinkOneExample01 {
public static void main(String[] args) {
// emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop 됨.
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", FAIL_FAST);
mono.subscribe(data -> Logger.onNext("Subscriber1 ", data));
mono.subscribe(data -> Logger.onNext("Subscriber2 ", data));
}
}
/**
* Sinks.One 예제
* - 두 건의 데이터만 emit 하는 예제
*/
public class SinkOneExample02 {
public static void main(String[] args) {
// emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop 됨.
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", FAIL_FAST);
// Sink.One 은 단 한개의 데이터를 emit 할 수 있기때문에 두번째 emit한 데이터는 drop 된다.
sinkOne.emitValue("Hi Reactor", FAIL_FAST);
mono.subscribe(data -> Logger.onNext("Subscriber1 ", data));
mono.subscribe(data -> Logger.onNext("Subscriber2 ", data));
}
}
import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
/**
* Sinks.Many 예제
* - unicast()를 사용해서 단 하나의 Subscriber에게만 데이터를 emit하는 예제
*/
public class SinkManyExample01 {
public static void main(String[] args) {
// 단 하나의 Subscriber에게만 데이터를 emit할 수 있다.
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
unicastSink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
}
}
- Subscriber2부분을 주석처리하면 Subscriber1에서 데이터가 잘 나오지만 둘 이상 subscribe 있을 경우 아래와 같은 에러가 난다.
import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
/**
* Sinks.Many 예제
* - multicast()를 사용해서 하나 이상의 Subscriber에게 데이터를 emit하는 예제
*/
public class SinkManyExample02 {
public static void main(String[] args) {
// 하나 이상의 Subscriber에게 데이터를 emit할 수 있다.
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
multicastSink.emitNext(3, FAIL_FAST);
}
}
import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
/**
* Sinks.Many 예제
* - replay()를 사용하여 이미 emit된 데이터 중에서 특정 개수의 최신 데이터만 전달하는 예제
*/
public class SinkManyExample03 {
public static void main(String[] args) {
// 구독 이후, emit 된 데이터 중에서 최신 데이터 2개만 replay 한다.
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
}
}
import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
/**
* Sinks.Many 예제
* - replay()를 사용하여 이미 emit된 데이터 중에서 특정 개수의 최신 데이터만 전달하는 예제
*/
public class SinkManyExample04 {
public static void main(String[] args) {
// 구독 이후, emit된 데이터 중에서 최신 데이터 2개만 replay 한다.
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
replaySink.emitNext(4, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
}
}
import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
public class SinkManyExample05 {
public static void main(String[] args) {
// 구독 시점과 상관없이 emit된 모든 데이터를 replay 한다.
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
}
}