[Spring-Reactive] Reactor (Mono and Flux)
in Spring on Spring
- Reactor란?
- Hello Reactor 코드로 보는 Reactor 구성 요소
- Reactor 용어 정의
- Publisher
- 마블 다이어그램 (Marble Diagram)
- 마블 다이어그램으로 Reactor의 Publisher 이해하기
- Flux 마블 다이어그램
Reactor란?
리액티브 스트림즈의 구현체인 Reactor는 리액티브 프로그래밍을 위한 라이브러리라고 정의 할 수 있다.
Reactor Core 라이브러리는 Spring WebFlux 프레임워크에 라이브러리로 포함되어 있어 실습시 dependency 설정을 해줘야 한다.
틀징
번호 | 특징 이름 | 설명 |
---|---|---|
1 | Reactive Streams | 리액티브 스트림 표준을 구현한 리액터 라이브러리. 비동기 스트림 처리 방식. |
2 | Non-Blocking | JVM 위에서 비동기 논블로킹으로 애플리케이션을 만들기 위한 핵심 기술. |
3 | Java’s functional API | Java8 이상의 함수형 API를 통해 Publisher와 Subscriber 간 상호작용을 구현. |
4 | Flux(N) | 여러 개의 데이터를 emit하는 타입. 0개 이상 다수의 데이터 처리. |
5 | Mono(1) | 하나의 데이터를 emit하는 타입. 단일 데이터 처리 전용. |
6 | Well-suited for microservices | 마이크로서비스에 적합. Non-Blocking I/O 방식으로 높은 처리량을 지원. |
7 | Backpressure-ready network | Subscriber가 소비할 수 있는 만큼 데이터 흐름을 조절(Backpressure 지원). |
Hello Reactor 코드로 보는 Reactor 구성 요소
Hello Reactor 예제는 “Publisher가 데이터 제공 → Operator로 데이터 가공 → Subscriber가 소비”하는 Reactor의 기본 구조를 단순하게 보여준다.
public class Example1 {
public static void main(String[] args) {
Flux<String> sequence = Flux.just("Hello", "Reactor");
sequence.map(data -> data.toLowerCase())
.subscribe(data -> System.out.println(data));
}
}
구성 요소 | 설명 |
---|---|
Flux.just("Hello", "Reactor") | 데이터 소스를 생성하는 Publisher. 여러 데이터를 방출할 수 있다. |
.map(data -> data.toLowerCase()) | 데이터 변환 처리 (Operator). 데이터를 소문자로 변환. |
.subscribe(System.out::println) | 데이터 소비(Subscriber). 변환된 데이터를 출력. |
Reactor 흐름 3단계
- Publisher 생성 (데이터 소스 제공)
→Flux.just()
- Operator 적용 (데이터 변환/처리)
→.map()
- Subscriber 등록 (데이터 소비)
→.subscribe()
Reactor 용어 정의
- Publisher : 발행자, 게시자, 생산자, 방출자(Emitter)
- Subscriber : 구독자, 소비자
- Emit : Publisher가 데이터를 내보내는 것(방출하다. 내보내다. 통지하다.)
- Sequence : Publisher가 emit하는 데이터의 연속적인 흐름을 정의 해 놓은 것. Operator 체인형태로 정의 된다.
- Subscribe : Subscriber가 Sequence를 구독하는 것
- Dispose : Subscriber가 Sequence 구독을 해지 하는 것
Publisher
Mono와 Flux에 대해 알아본다.
Mono
- 0개 또는 1개의 데이터를 emit하는 Publisher이다.(Compare with RxJava Maybe)
- 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.
package com.itvillage.section03.class01;
import com.itvillage.utils.Logger;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.util.Collections;
/**
* Mono 활용 예제
* - worldtimeapi.org Open API를 이용해서 서울의 현재 시간을 조회한다.
*/
public class MonoExample03 {
public static void main(String[] args) {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
Mono.just(
restTemplate.exchange(worldTimeUri, HttpMethod.GET, new HttpEntity<String>(headers), String.class)
)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response.getBody());
String dateTime = jsonContext.read("$.datetime");
return dateTime;
})
.subscribe(
data -> Logger.info("# emitted data: " + data),
error -> {
Logger.onError(error);
},
() -> Logger.info("# emitted onComplete signal")
);
}
}
Flux
- 0 개 ~ N 개의 데이터를 emit하는 Publisher이다.
- 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.
// ex 1
public class FluxExample01 {
public static void main(String[] args) {
Flux.just(6, 9, 13)
.map(num -> num % 2)
.subscribe(remainder -> Logger.info("# remainder: {}", remainder));
}
}
// ex 2
public class FluxExample02 {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{3, 6, 7, 9})
.filter(num -> num > 6)
.map(num -> num * 2)
.subscribe(multiply -> Logger.info("# multiply: {}", multiply));
}
}
// ex 3
public class FluxExample03 {
public static void main(String[] args) {
Flux<Object> flux =
Mono.justOrEmpty(null)
.concatWith(Mono.justOrEmpty("Jobs"));
flux.subscribe(data -> Logger.info("# result: {}", data));
}
}
// ex 4
public class FluxExample04 {
public static void main(String[] args) {
Flux.concat(
Flux.just("Venus"),
Flux.just("Earth"),
Flux.just("Mars"))
.collectList()
.subscribe(planetList -> Logger.info("# Solar System: {}", planetList));
}
}
마블 다이어그램 (Marble Diagram)
리액티브 프로그래밍에서 데이터 흐름과 Operator 동작을 이해할 때 사용하는 시각적 표현 방법이다.
구슬 모양 도형들이 시간에 따라 이동하면서 데이터를 나타낸다.
마블 다이어그램 구성
- ①: Publisher가 데이터를 emit하는 타임라인
- ②: emit된 데이터
- ③: 데이터 스트림 완료 신호 (
onComplete
) - ④: Operator로 데이터가 입력되는 지점
- ⑤: 데이터를 처리하는 Operator 함수 (예:
map(x -> x + 1)
) - ⑥: Operator에서 가공 후 내보내는 출력 지점
- ⑦: 처리 완료된 데이터 타임라인
- ⑧: 처리된 데이터
- ⑨: 오류 발생 시 나타나는 X (
onError
)
핵심 개념
- Publisher: 데이터를 생성하고 emit하는 쪽
- Operator: 입력받은 데이터를 가공하거나 변환하는 함수
- Subscriber: 최종적으로 데이터를 소비하는 쪽
왜 마블 다이어그램을 쓰는가?
- Operator의 동작을 직관적으로 이해할 수 있음
- 복잡한 API 문서를 읽기 전에 마블 다이어그램을 보면 이해가 빠름
- Reactor에서는 각 Operator마다 마블 다이어그램을 제공
마블 다이어그램 읽는 팁
- 왼쪽 → 오른쪽: 시간이 흐르는 방향
- 원: 데이터 아이템
- 선: 시간의 흐름
- X: 에러 발생을 의미 (onError Signal)
마블 다이어그램으로 Reactor의 Publisher 이해하기
마블 다이어그램을 통해 Mono와 Flux를 시각적으로 이해한다.
Mono 마블 다이어그램
- Mono는 단 하나의 데이터를 emit하거나 완료(onComplete)만 전달하는 Publisher이다.
- RxJava의 Maybe, Single과 비교할 수 있다.
- Mono는 0개 또는 1개의 데이터만 emit한다.
Mono 예제 코드 1
public class Example6_1 {
public static void main(String[] args) {
Mono.just("Hello Reactor")
.subscribe(System.out::println);
}
}
출력 결과
Hello Reactor
Mono 예제 코드 2 (empty)
public class Example6_2 {
public static void main(String[] args) {
Mono
.empty()
.subscribe(
none -> System.out.println("# emitted onNext signal"),
error -> {},
() -> System.out.println("# emitted onComplete signal")
);
}
}
출력 결과
# emitted onComplete signal
- empty()를 사용하면 데이터 emit 없이 완료 신호만 보낸다.
Mono 활용 예제 (HTTP 호출)
public class Example6_3 {
public static void main(String[] args) {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
Mono.just(restTemplate)
.exchange(worldTimeUri, HttpMethod.GET, new HttpEntity<>(headers), String.class)
.map(response -> {
DocumentContext jsonContext = JsonPath.parse(response.getBody());
String dateTime = jsonContext.read("$.datetime");
return dateTime;
})
.subscribe(
data -> System.out.println("# emitted data: " + data),
error -> System.out.println(error),
() -> System.out.println("# emitted onComplete signal")
);
}
}
출력 결과
# emitted data: 2022-02-08T16:15:15.859465+09:00
# emitted onComplete signal
Flux 마블 다이어그램
- Flux는 여러 개의 데이터를 emit할 수 있는 Publisher이다.
- Mono와 달리 0개 또는 N개의 데이터를 emit할 수 있다.
Flux 예제 코드 1
public class Example6_4 {
public static void main(String[] args) {
Flux.just(6, 9, 13)
.map(num -> num % 2)
.subscribe(System.out::println);
}
}
출력 결과
0
1
1
Flux 예제 코드 2 (배열 기반)
public class Example6_5 {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{3, 6, 7, 9})
.filter(num -> num > 6)
.map(num -> num * 2)
.subscribe(System.out::println);
}
}
출력 결과
14
18
Flux 활용 예제 (Mono를 연결해서 Flux로)
public class Example6_6 {
public static void main(String[] args) {
Flux<String> flux = Mono.justOrEmpty("Steve")
.concatWith(Mono.justOrEmpty("Jobs"));
flux.subscribe(System.out::println);
}
}
출력 결과
Steve
Jobs
concatWith
를 통해 두 개의 Mono를 Flux로 연결할 수 있다.
Flux 여러 데이터 연결 예제
public class Example6_7 {
public static void main(String[] args) {
Flux.concat(
Flux.just("Mercury", "Venus", "Earth"),
Flux.just("Mars", "Jupiter", "Saturn"),
Flux.just("Uranus", "Neptune", "Pluto")
)
.collectList()
.subscribe(planets -> System.out.println(planets));
}
}
출력 결과
[Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto]
collectList()
를 통해 Flux 데이터를 하나의 List로 묶을 수 있다.
요약
- Mono: 최대 1개의 데이터 emit
- Flux: 0개 이상 여러 개 데이터 emit 가능
- concatWith(): 여러 Mono를 이어붙여 Flux로 변환
- collectList(): Flux 스트림을 List로 수집