[Spring-Reactive] Reactor (Mono and Flux)

Reactor란?

리액티브 스트림즈의 구현체인 Reactor는 리액티브 프로그래밍을 위한 라이브러리라고 정의 할 수 있다.
Reactor Core 라이브러리는 Spring WebFlux 프레임워크에 라이브러리로 포함되어 있어 실습시 dependency 설정을 해줘야 한다.

틀징

번호특징 이름설명
1Reactive Streams리액티브 스트림 표준을 구현한 리액터 라이브러리. 비동기 스트림 처리 방식.
2Non-BlockingJVM 위에서 비동기 논블로킹으로 애플리케이션을 만들기 위한 핵심 기술.
3Java’s functional APIJava8 이상의 함수형 API를 통해 Publisher와 Subscriber 간 상호작용을 구현.
4Flux(N)여러 개의 데이터를 emit하는 타입. 0개 이상 다수의 데이터 처리.
5Mono(1)하나의 데이터를 emit하는 타입. 단일 데이터 처리 전용.
6Well-suited for microservices마이크로서비스에 적합. Non-Blocking I/O 방식으로 높은 처리량을 지원.
7Backpressure-ready networkSubscriber가 소비할 수 있는 만큼 데이터 흐름을 조절(Backpressure 지원).

Hello Reactor 코드로 보는 Reactor 구성 요소

Hello Reactor 예제는 “Publisher가 데이터 제공 → Operator로 데이터 가공 → Subscriber가 소비”하는 Reactor의 기본 구조를 단순하게 보여준다.

public class Example1 {
    public static void main(String[] args) {
        Flux<String> sequence = Flux.just("Hello", "Reactor");
        sequence.map(data -> data.toLowerCase())
                .subscribe(data -> System.out.println(data));
    }
}
구성 요소설명
Flux.just("Hello", "Reactor")데이터 소스를 생성하는 Publisher. 여러 데이터를 방출할 수 있다.
.map(data -> data.toLowerCase())데이터 변환 처리 (Operator). 데이터를 소문자로 변환.
.subscribe(System.out::println)데이터 소비(Subscriber). 변환된 데이터를 출력.

Reactor 흐름 3단계

  1. Publisher 생성 (데이터 소스 제공)
    Flux.just()
  2. Operator 적용 (데이터 변환/처리)
    .map()
  3. Subscriber 등록 (데이터 소비)
    .subscribe()

Reactor 용어 정의

  • Publisher : 발행자, 게시자, 생산자, 방출자(Emitter)
  • Subscriber : 구독자, 소비자
  • Emit : Publisher가 데이터를 내보내는 것(방출하다. 내보내다. 통지하다.)
  • Sequence : Publisher가 emit하는 데이터의 연속적인 흐름을 정의 해 놓은 것. Operator 체인형태로 정의 된다.
  • Subscribe : Subscriber가 Sequence를 구독하는 것
  • Dispose : Subscriber가 Sequence 구독을 해지 하는 것

img.png

Publisher

Mono와 Flux에 대해 알아본다.

Mono

  • 0개 또는 1개의 데이터를 emit하는 Publisher이다.(Compare with RxJava Maybe)
  • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.
package com.itvillage.section03.class01;

import com.itvillage.utils.Logger;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.Collections;

/**
 * Mono 활용 예제
 *  - worldtimeapi.org Open API를 이용해서 서울의 현재 시간을 조회한다.
 */
public class MonoExample03 {
    public static void main(String[] args) {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));


        Mono.just(
                restTemplate.exchange(worldTimeUri, HttpMethod.GET, new HttpEntity<String>(headers), String.class)
        )
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response.getBody());
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                })
                .subscribe(
                        data -> Logger.info("# emitted data: " + data),
                        error -> {
                            Logger.onError(error);
                        },
                        () -> Logger.info("# emitted onComplete signal")
                );

    }
}

Flux

  • 0 개 ~ N 개의 데이터를 emit하는 Publisher이다.
  • 데이터 emit 과정에서 에러가 발생하면 onError signal을 emit한다.
// ex 1
public class FluxExample01 {
    public static void main(String[] args) {
        Flux.just(6, 9, 13)
                .map(num -> num % 2)
                .subscribe(remainder -> Logger.info("# remainder: {}", remainder));
    }
}

// ex 2
public class FluxExample02 {
    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{3, 6, 7, 9})
                .filter(num -> num > 6)
                .map(num -> num * 2)
                .subscribe(multiply -> Logger.info("# multiply: {}", multiply));
    }
}

// ex 3
public class FluxExample03 {
    public static void main(String[] args) {
        Flux<Object> flux =
                Mono.justOrEmpty(null)
                        .concatWith(Mono.justOrEmpty("Jobs"));
        flux.subscribe(data -> Logger.info("# result: {}", data));
    }
}

// ex 4
public class FluxExample04 {
    public static void main(String[] args) {
        Flux.concat(
                Flux.just("Venus"),
                Flux.just("Earth"),
                Flux.just("Mars"))
            .collectList()
            .subscribe(planetList -> Logger.info("# Solar System: {}", planetList));
    }
}

마블 다이어그램 (Marble Diagram)

리액티브 프로그래밍에서 데이터 흐름과 Operator 동작을 이해할 때 사용하는 시각적 표현 방법이다.
구슬 모양 도형들이 시간에 따라 이동하면서 데이터를 나타낸다.

마블 다이어그램 구성

img_1.png

  • : Publisher가 데이터를 emit하는 타임라인
  • : emit된 데이터
  • : 데이터 스트림 완료 신호 (onComplete)
  • : Operator로 데이터가 입력되는 지점
  • : 데이터를 처리하는 Operator 함수 (예: map(x -> x + 1))
  • : Operator에서 가공 후 내보내는 출력 지점
  • : 처리 완료된 데이터 타임라인
  • : 처리된 데이터
  • : 오류 발생 시 나타나는 X (onError)

핵심 개념

  • Publisher: 데이터를 생성하고 emit하는 쪽
  • Operator: 입력받은 데이터를 가공하거나 변환하는 함수
  • Subscriber: 최종적으로 데이터를 소비하는 쪽

왜 마블 다이어그램을 쓰는가?

  • Operator의 동작을 직관적으로 이해할 수 있음
  • 복잡한 API 문서를 읽기 전에 마블 다이어그램을 보면 이해가 빠름
  • Reactor에서는 각 Operator마다 마블 다이어그램을 제공

마블 다이어그램 읽는 팁

  • 왼쪽 → 오른쪽: 시간이 흐르는 방향
  • : 데이터 아이템
  • : 시간의 흐름
  • X: 에러 발생을 의미 (onError Signal)

마블 다이어그램으로 Reactor의 Publisher 이해하기

마블 다이어그램을 통해 Mono와 Flux를 시각적으로 이해한다.

Mono 마블 다이어그램

img_4.png

  • Mono는 단 하나의 데이터를 emit하거나 완료(onComplete)만 전달하는 Publisher이다.
  • RxJava의 Maybe, Single과 비교할 수 있다.
  • Mono는 0개 또는 1개의 데이터만 emit한다.

Mono 예제 코드 1

public class Example6_1 {
    public static void main(String[] args) {
        Mono.just("Hello Reactor")
            .subscribe(System.out::println);
    }
}

출력 결과

Hello Reactor

Mono 예제 코드 2 (empty)

public class Example6_2 {
    public static void main(String[] args) {
        Mono
            .empty()
            .subscribe(
                none -> System.out.println("# emitted onNext signal"),
                error -> {},
                () -> System.out.println("# emitted onComplete signal")
            );
    }
}

출력 결과

# emitted onComplete signal
  • empty()를 사용하면 데이터 emit 없이 완료 신호만 보낸다.

Mono 활용 예제 (HTTP 호출)

img_5.png

public class Example6_3 {
    public static void main(String[] args) {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
            .host("worldtimeapi.org")
            .port(80)
            .path("/api/timezone/Asia/Seoul")
            .build()
            .encode()
            .toUri();

        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));

        Mono.just(restTemplate)
            .exchange(worldTimeUri, HttpMethod.GET, new HttpEntity<>(headers), String.class)
            .map(response -> {
                DocumentContext jsonContext = JsonPath.parse(response.getBody());
                String dateTime = jsonContext.read("$.datetime");
                return dateTime;
            })
            .subscribe(
                data -> System.out.println("# emitted data: " + data),
                error -> System.out.println(error),
                () -> System.out.println("# emitted onComplete signal")
            );
    }
}

출력 결과

# emitted data: 2022-02-08T16:15:15.859465+09:00
# emitted onComplete signal

Flux 마블 다이어그램

  • Flux는 여러 개의 데이터를 emit할 수 있는 Publisher이다.
  • Mono와 달리 0개 또는 N개의 데이터를 emit할 수 있다.

Flux 예제 코드 1

public class Example6_4 {
    public static void main(String[] args) {
        Flux.just(6, 9, 13)
            .map(num -> num % 2)
            .subscribe(System.out::println);
    }
}

출력 결과

0
1
1

Flux 예제 코드 2 (배열 기반)

public class Example6_5 {
    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{3, 6, 7, 9})
            .filter(num -> num > 6)
            .map(num -> num * 2)
            .subscribe(System.out::println);
    }
}

출력 결과

14
18

Flux 활용 예제 (Mono를 연결해서 Flux로)

img_2.png

public class Example6_6 {
    public static void main(String[] args) {
        Flux<String> flux = Mono.justOrEmpty("Steve")
            .concatWith(Mono.justOrEmpty("Jobs"));

        flux.subscribe(System.out::println);
    }
}

출력 결과

Steve
Jobs
  • concatWith를 통해 두 개의 Mono를 Flux로 연결할 수 있다.

Flux 여러 데이터 연결 예제

public class Example6_7 {
    public static void main(String[] args) {
        Flux.concat(
            Flux.just("Mercury", "Venus", "Earth"),
            Flux.just("Mars", "Jupiter", "Saturn"),
            Flux.just("Uranus", "Neptune", "Pluto")
        )
        .collectList()
        .subscribe(planets -> System.out.println(planets));
    }
}

출력 결과

[Mercury, Venus, Earth, Mars, Jupiter, Saturn, Uranus, Neptune, Pluto]
  • collectList()를 통해 Flux 데이터를 하나의 List로 묶을 수 있다.

요약

  • Mono: 최대 1개의 데이터 emit
  • Flux: 0개 이상 여러 개 데이터 emit 가능
  • concatWith(): 여러 Mono를 이어붙여 Flux로 변환
  • collectList(): Flux 스트림을 List로 수집

© 2023 Lee. All rights reserved.