Spring Framework/WebFlux

Mono, Flux 이해하기

Junuuu 2024. 1. 5. 00:01

개요

<S extends T> Mono<S> save(S entity);
<S extends T> Flux<S> saveAll(Iterable<S> entities);
fun <S : T> saveAll(entities: Iterable<S>): Flow<S>

ReactiveCrudRepository, CoroutineCrudRepository를 보다 보면 반환객체에 Mono<T>, Flux<T>, Flow<T>등이 등장합니다.

처음 보는 개념인 Mono, Flux에 대해 알아보는 시간을 가지려고 합니다.

Flow는 다음시간에..

 

Mono와 Flux는 무엇인가?

public abstract class Mono<T> implements CorePublisher<T>{...}
public abstract class Flux<T> implements CorePublisher<T>{...}
public interface CorePublisher<T> extends Publisher<T>{...}

Mono와 Flux는 모두 CorePublisher 인터페이스를 구현하고 있습니다.

CorePublisher는 Publisher 인터페이스를 상속받습니다.

 

Mono<T> 와 Flux<T>는 Reactive Stream의 Publisher에 해당하는 객체입니다. 

Flux는 0개부터 N개까지의 T타입의 원소를 방출하게 됩니다.

Mono는 0개부터 1개의 T타입 원소를 방출하게 됩니다.

 

 

Mono

 

빈 Mono 만들기

val mono = Mono.empty<String>()

 

 

아무것도 방출하지 않는 Mono 만들기

val mono = Mono.never<String>()

 

값을 포함하는 Mono 만들기

val mono = Mono.just("A")

 

 

Mono에서 예외 발생시키기

val mono = Mono.error<String>(IllegalStateException())

 

 

Flux

하나의 데이터를 전달할 때 마다 onNext 이벤트가 발생합니다.

Flux 내의 모든 데이터의 전달이 완료되면 onComplete 이벤트가 발생하고, 중간에 오류가 발생하면 onError 이벤트가 발생합니다.

 

빈 Flux 만들어보기

val flux = empty<String>()

 

List로 Flux 만들기

val list = listOf("A","B","C")
val flux = fromIterable(list)
flux.subscribe { x -> println(x) }

 

값을 포함하는 Flux 만들기

val flux = just("A", "B", "C")
flux.subscribe { x -> println(x) }

 

Flux에서 예외를 발생시키기

val flux: Flux<String> = Flux.error(IllegalArgumentException("Flux 예외 발생"))
flux.subscribe { x -> println(x) }

 

 

Mono & Flux 데이터 변환

Reactor는 데이터를 변환하는데 사용할 수 있는 여러 연산자를 제공합니다.

지연 시간이 예상되지 않는 간단한 변환에는 map 연산자를 활용할 수 있습니다.

 

map 연산자 활용예시

//username, firstname, lastname을 대문자화하기
Mono<User> capitalizeOne(Mono<User> mono) {
	return mono.map(u -> new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(),
	u.getLastname().toUpperCase()));
}

flux도 동일하게 map 연산자를 활용할 수 있습니다.

 

 

만약 문자열을 대문자로 변환하기 위해 웹서비스를 호출해야 한다고 가정해 보겠습니다.

해당 호출에는 지연 시간이 예상되기 때문에 더 이상 map 연산자를 활용할 수 없습니다.

대신 비동기 호출을 Flux 또는 Mono를 활용하고 flatMap 연산자를 활용할 수 있습니다.

 

flatMap 연산자 활용예시

Flux<User> asyncCapitalizeMany(Flux<User> flux) {
	return flux.flatMap(user -> asyncCapitalizeUser(user));
}

Mono<User> asyncCapitalizeUser(User u) {
	return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}

 

 

 

map vs flatMap

public final <R> Mono<R> flatMap(Function<? super T,? extends Mono<? extends R>> transformer)
public final <R> Mono<R> map(Function<? super T,? extends R> mapper)

map의 반환 값은 Object이고, flatMap의 반환 값은 Publisher(Mono / Flux)입니다.

따라서 내부가 동기적인 동작이라면 map으로 체이닝 하고 비동기 작업이라면 flatmap으로 체이닝 합니다.

 

 

 

 

 

 

참고자료

https://projectreactor.io/

https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro

https://d2.naver.com/helloworld/2771091

https://techblog.woowahan.com/12903/

https://pearlluck.tistory.com/730

https://velog.io/@zenon8485/Reactor-Java-1.-Mono%EC%99%80-Flux%EB%A5%BC-%EC%83%9D%EC%84%B1%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95

https://github.com/bradclarke2/Reactor-example-completed/blob/master/src/main/java/io/pivotal/literx/Part01Flux.java