Mono, Flux 이해하기
개요
<S extends T> Mono<S> save(S entity);
<S extends T> Flux<S> saveAll(Iterable<S> entities);
fun <S : T> saveAll(entities: Iterable<S>): Flow<S>
ReactiveCrudRepository, CoroutineCrudRepository를 보다 보면 반환객체에 Mono<T>, Flux<T>, Flow<T>등이 등장합니다.
처음 보는 개념인 Mono, Flux에 대해 알아보는 시간을 가지려고 합니다.
Flow는 다음시간에..
Mono와 Flux는 무엇인가?
public abstract class Mono<T> implements CorePublisher<T>{...}
public abstract class Flux<T> implements CorePublisher<T>{...}
public interface CorePublisher<T> extends Publisher<T>{...}
Mono와 Flux는 모두 CorePublisher 인터페이스를 구현하고 있습니다.
CorePublisher는 Publisher 인터페이스를 상속받습니다.
Mono<T> 와 Flux<T>는 Reactive Stream의 Publisher에 해당하는 객체입니다.
Flux는 0개부터 N개까지의 T타입의 원소를 방출하게 됩니다.
Mono는 0개부터 1개의 T타입 원소를 방출하게 됩니다.
Mono
빈 Mono 만들기
val mono = Mono.empty<String>()
아무것도 방출하지 않는 Mono 만들기
val mono = Mono.never<String>()
값을 포함하는 Mono 만들기
val mono = Mono.just("A")
Mono에서 예외 발생시키기
val mono = Mono.error<String>(IllegalStateException())
Flux
하나의 데이터를 전달할 때 마다 onNext 이벤트가 발생합니다.
Flux 내의 모든 데이터의 전달이 완료되면 onComplete 이벤트가 발생하고, 중간에 오류가 발생하면 onError 이벤트가 발생합니다.
빈 Flux 만들어보기
val flux = empty<String>()
List로 Flux 만들기
val list = listOf("A","B","C")
val flux = fromIterable(list)
flux.subscribe { x -> println(x) }
값을 포함하는 Flux 만들기
val flux = just("A", "B", "C")
flux.subscribe { x -> println(x) }
Flux에서 예외를 발생시키기
val flux: Flux<String> = Flux.error(IllegalArgumentException("Flux 예외 발생"))
flux.subscribe { x -> println(x) }
Mono & Flux 데이터 변환
Reactor는 데이터를 변환하는데 사용할 수 있는 여러 연산자를 제공합니다.
지연 시간이 예상되지 않는 간단한 변환에는 map 연산자를 활용할 수 있습니다.
map 연산자 활용예시
//username, firstname, lastname을 대문자화하기
Mono<User> capitalizeOne(Mono<User> mono) {
return mono.map(u -> new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(),
u.getLastname().toUpperCase()));
}
flux도 동일하게 map 연산자를 활용할 수 있습니다.
만약 문자열을 대문자로 변환하기 위해 웹서비스를 호출해야 한다고 가정해 보겠습니다.
해당 호출에는 지연 시간이 예상되기 때문에 더 이상 map 연산자를 활용할 수 없습니다.
대신 비동기 호출을 Flux 또는 Mono를 활용하고 flatMap 연산자를 활용할 수 있습니다.
flatMap 연산자 활용예시
Flux<User> asyncCapitalizeMany(Flux<User> flux) {
return flux.flatMap(user -> asyncCapitalizeUser(user));
}
Mono<User> asyncCapitalizeUser(User u) {
return Mono.just(new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}
map vs flatMap
public final <R> Mono<R> flatMap(Function<? super T,? extends Mono<? extends R>> transformer)
public final <R> Mono<R> map(Function<? super T,? extends R> mapper)
map의 반환 값은 Object이고, flatMap의 반환 값은 Publisher(Mono / Flux)입니다.
따라서 내부가 동기적인 동작이라면 map으로 체이닝 하고 비동기 작업이라면 flatmap으로 체이닝 합니다.
참고자료
https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro
https://d2.naver.com/helloworld/2771091
https://techblog.woowahan.com/12903/
https://pearlluck.tistory.com/730