[Spring-Reactive] Reactor (Sinks)

Sinks

Reactor의 Sinks리액티브 스트림의 Signal을 명시적으로 프로그래밍 방식으로 전송할 수 있는 도구. generate, create 등의 Operator 방식과 달리, 멀티스레드 환경에서도 스레드 안정성을 보장할 수 있어 고급 상황에서 유용하다.
Sinks는 Reactor 3.4 버전에서 도입된 새로운 기능으로, Publisher의 일종으로 데이터를 발행하고 구독할 수 있는 유연한 방법을 제공 한다.
Sinks는 Thread-Safe하게 signal을 발생 시킨다.

create() Operator 방식

개념

  • Flux.create() 내에서 sink.next(...) 호출로 Signal 전송
  • 작업 수행 → 결과 처리 → Subscriber 전달

예시

Flux.create(sink -> {
    IntStream.range(1, 6).forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.map(...).subscribe(...);

특징

  • 단계별로 실행되는 스레드가 다름
    • subscribeOn: 생성
    • map, subscribe: publishOn 별도 스레드
  • 결과적으로 3개의 스레드 사용

Sinks 방식

개념

  • Sinks.many().unicast() 등으로 Sink 생성
  • emitNext(...)로 Signal 명시적 전송

예시

Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
IntStream.range(1, 6).forEach(n -> {
    new Thread(() -> {
        unicastSink.emitNext(doTask(n), FAIL_FAST);
    }).start();
});

특징

  • emitNext()멀티스레드에서도 안전
  • 7개의 스레드 사용 (Thread-0~4 + parallel-2 + parallel-1)
  • 스레드 안전성 보장으로 병렬 환경에 적합

결론 요약

| 항목 | create() 방식 | Sinks 방식 | |——|—————-|————-| | 사용 용도 | 간단한 Signal 처리 | 복잡한 병렬 환경 | | 스레드 안전성 | 낮음 | 높음 | | 사용 방식 | sink를 통해 emit | emitNext() 명시적 호출 | | 스레드 구성 | 단계별로 2~3개 | 동시성 많아지면 5개 이상 |

참고

  • 단순 흐름 제어에는 create() 사용
  • 멀티스레드 환경, 외부 이벤트 처리에는 Sinks 사용 권장

유형

  • Sinks.One: 단일 값을 발행하고 완료하거나 에러를 발행할 수 있다.
  • Sinks.Many: 여러 값을 발행할 수 있다.
  • Sinks.Empty: 완료 신호만 발행할 수 있다.

Sinks.One

/**
 * Sinks.One 예제
 *  - 한 건의 데이터만 emit 하는 예제
 */
public class SinkOneExample01 {
    public static void main(String[] args) {
        // emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop 됨.
        Sinks.One<String> sinkOne = Sinks.one();
        Mono<String> mono = sinkOne.asMono();

        sinkOne.emitValue("Hello Reactor", FAIL_FAST);

        mono.subscribe(data -> Logger.onNext("Subscriber1 ", data));
        mono.subscribe(data -> Logger.onNext("Subscriber2 ", data));
    }
}
/**
 * Sinks.One 예제
 *  - 두 건의 데이터만 emit 하는 예제
 */
public class SinkOneExample02 {
    public static void main(String[] args) {
        // emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop 됨.
        Sinks.One<String> sinkOne = Sinks.one();
        Mono<String> mono = sinkOne.asMono();

        sinkOne.emitValue("Hello Reactor", FAIL_FAST);

        // Sink.One 은 단 한개의 데이터를 emit 할 수 있기때문에 두번째 emit한 데이터는 drop 된다.
        sinkOne.emitValue("Hi Reactor", FAIL_FAST);


        mono.subscribe(data -> Logger.onNext("Subscriber1 ", data));
        mono.subscribe(data -> Logger.onNext("Subscriber2 ", data));
    }
}

img.png

import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

/**
 * Sinks.Many 예제
 *  - unicast()를 사용해서 단 하나의 Subscriber에게만 데이터를 emit하는 예제
 */
public class SinkManyExample01 {
    public static void main(String[] args) {
        // 단 하나의 Subscriber에게만 데이터를 emit할 수 있다.
        Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<Integer> fluxView = unicastSink.asFlux();

        unicastSink.emitNext(1, FAIL_FAST);
        unicastSink.emitNext(2, FAIL_FAST);

        fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));

        unicastSink.emitNext(3, FAIL_FAST);

        fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
    }
}
  • Subscriber2부분을 주석처리하면 Subscriber1에서 데이터가 잘 나오지만 둘 이상 subscribe 있을 경우 아래와 같은 에러가 난다.

img_1.png

import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

/**
 * Sinks.Many 예제
 *  - multicast()를 사용해서 하나 이상의 Subscriber에게 데이터를 emit하는 예제
 */
public class SinkManyExample02 {
    public static void main(String[] args) {
        // 하나 이상의 Subscriber에게 데이터를 emit할 수 있다.
        Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
        Flux<Integer> fluxView = multicastSink.asFlux();

        multicastSink.emitNext(1, FAIL_FAST);
        multicastSink.emitNext(2, FAIL_FAST);


        fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));

        fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));

        multicastSink.emitNext(3, FAIL_FAST);
    }
}
import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

/**
 * Sinks.Many 예제
 *  - replay()를 사용하여 이미 emit된 데이터 중에서 특정 개수의 최신 데이터만 전달하는 예제
 */
public class SinkManyExample03 {
    public static void main(String[] args) {
        // 구독 이후, emit 된 데이터 중에서 최신 데이터 2개만 replay 한다.
        Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
        Flux<Integer> fluxView = replaySink.asFlux();

        replaySink.emitNext(1, FAIL_FAST);
        replaySink.emitNext(2, FAIL_FAST);
        replaySink.emitNext(3, FAIL_FAST);

        fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
        fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
    }
}

img_2.png

import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

/**
 * Sinks.Many 예제
 *  - replay()를 사용하여 이미 emit된 데이터 중에서 특정 개수의 최신 데이터만 전달하는 예제
 */
public class SinkManyExample04 {
    public static void main(String[] args) {
        // 구독 이후, emit된 데이터 중에서 최신 데이터 2개만 replay 한다.
        Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
        Flux<Integer> fluxView = replaySink.asFlux();

        replaySink.emitNext(1, FAIL_FAST);
        replaySink.emitNext(2, FAIL_FAST);
        replaySink.emitNext(3, FAIL_FAST);

        fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));

        replaySink.emitNext(4, FAIL_FAST);

        fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
    }
}

img_3.png

import com.itvillage.utils.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

public class SinkManyExample05 {
    public static void main(String[] args) {
        // 구독 시점과 상관없이 emit된 모든 데이터를 replay 한다.
        Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
        Flux<Integer> fluxView = replaySink.asFlux();

        replaySink.emitNext(1, FAIL_FAST);
        replaySink.emitNext(2, FAIL_FAST);
        replaySink.emitNext(3, FAIL_FAST);


        fluxView.subscribe(data -> Logger.onNext("Subscriber1", data));
        fluxView.subscribe(data -> Logger.onNext("Subscriber2", data));
    }
}

img_4.png


© 2023 Lee. All rights reserved.