본문 바로가기
Rx

리액티브연산자[결합 연산자]- 16(merge함수,concat함수)

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




리액티브 연산자(함수) 분류 - 카테고리

연산자 종류 

연산자 함수 

  생성 연산자 


 just(), fromXXX(), create(), interval(), ragne(), timer(),   intervalRange(), defer(), repeat() 


  변환 연산자


 map(),flatMap(), concatMap(), switchMap() , groupBy(),

 scan(), buffer(), window()


  필터 연산자


 filter(), take(), skip(), distinct()


  결합 연산자

 

 zip(), combineLatest(), merge(), concat()

 

  조건 연산자


 amb(), takeUtil(), skipUtil(), all()

 

  에러 처리 연산자 


 onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()


  기타 연산자 

 

 subscribe(), subscribeOn(), observeOn(), reduce(), count()

 





결합 연산자란?

결합 연산자는 다수의 Observable을 하나로 합하는 방법을 제공합니다.

앞서 flatMap() 함수나 groupBy() 함수등은 1개의 Observable을 여러개로 확장해주는 반면,

결합 연산자들은 여러개의 Observable을 내가 원하는 Observable로 결합해 줍니다.


결합연산자 종류

zip() - 입력 Observable에서 데이터를 모두 새로 발행했을때 그것을 합해줍니다.

combineLatest() - 처음에 각 Observable에서 데이터를 발행한 후에는 어디에서 값을 발행하든 최신값으로 갱신합니다.

marge() - 최신 데이터 여부와 상관없이 각 Observable 에서 발행하는 데이터를 그대로 출력합니다.

concat() - 입력된 Observable을 Observable 단위로 이어 붙여줍니다.




결합연산자 - merge() 함수

merge() 함수는 zip() 함수나 combineLatest() 함수와 비교하면 가장 단순한 결합 함수입니다.

입력 Observable의 순서와 모드 Observable이 데이터를 발행하는지에 대해 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행합니다.


merge() 함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


merge() 함수의 활용 예

String[] data1={"1","3"};
String[] data2 ={"2","4","6"};
Observable<String> source1= Observable.interval(0L,100L,TimeUnit.MILLISECONDS)
            .map(Long::intValue)
            .map(idx->data1[idx])
            .take(data1.length);

Observable<String> source2= Observable.interval(50L,TimeUnit.MILLISECONDS)
            .map(Long::intValue)
            .map(idx->data2[idx])
            .take(data2.length);
        
Observable<String> source= Observable.merge(source1,source2);
source.subscribe(Log::i);
CommonUtils.sleep(1000);



첫 번째 Observable은 대기시간 없이 100ms 간격으로 값을 발행하고 두 번째 Observable은 50ms 간격으로 값을 발행하므로 2개 값이 결과 Observable에서 섞입니다.


실행결과

RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-1 | value = 3
RxComputationThreadPool-2 | value = 4

RxComputationThreadPool-2 | value = 6


주목해서 봐야할 점은 첫 번쨰와 두 번째 Observable의 데이터 발행이 모두 개별의 스레드에서 이뤄진다는 점입니다.


data1 배열에 해당하는 '1'과 '3'은 RxComputationTheeadPool-1 스레드에서 발행하고

data2 배열에 해당하는 데이터는  RxComputationTheeadPool-2 스레드에서 발행합니다.



결합연산자 - concat() 함수

concat()은 2개 이상의 Observable을 이어 붙여주는 함수 입니다. 첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독합니다

스레드를 활용해 일반적인 코드로 이와 같은 내용을 구현하려면 만만치 않을 것입니다.


concat()함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


첫 번째 Observable에 onComplete 이벤트가 발생하지 않게 하면 두 번째 Observable은 영원히 대기합니다. 이는 잠재적인 메모리 누수의 위험을 내포합니다. 따라서 입력 Observable이 반드시 완료 (onComplete 이벤트)될 수 있게 해야 합니다.


concat()함수의 원형

@SchedulerSupoort(SchedulerSupport.NONE)
public static <T> Observable<T> concat(
    Observable<? extends T> source1,
    Observable<? extends T> source2
)



concat() 함수의 활용 예

Action onCompleteAction =()->Log.d("onComplete()");
        
String[] data1={"1","3","5"};
String[] data2={"2","4","6"};
        
Observable<String> source1=Observable.fromArray(data1)
.doOnComplete(onCompleteAction);

Observable<String> source2=Observable.interval(100L,TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx->data2[idx])
.take(data2.length)
.doOnComplete(onCompleteAction);
        
Observable<String> source=Observable.concat(source1,source2)
    .doOnComplete(onCompleteAction);

source.subscribe(Log::i);
CommonUtils.sleep(1000);



concat() 함수를 활용할 때는 onComplete 이벤트 발생 여부가 중요합니다.따라서 Action 객체를 생성했습니다.  Action 클래스는 io.reactivexfunctions 패키지에 속해 있으며 doOnComplete() 함수처럼 인자가 없는 람다 표현식을 사용할때 사용합니다. 코드에서는 단지 로그로 onComplete() 라는 문자열만 출력했습니다.


다음은 Observable(source1), Observable(source2), 결과인 Observable(source)에 각각 doOnComplete(coCompleteAction)을 추가했습니다. 마지막으로 concat() 함수를 이용해

source1과 source2를 결합했습니다. 결합할수 있는 Observable은 최대 4개입니다.



Observable의 중간상태를 확인하는방법

리액티브 프로그래밍 할때는 중간에 로그를 출력하는 것이 낯설게 느껴집니다. 특히 함수형 프로그래밍 패러다임을 배우면서 "로그나 화면 출력하는 등은 부수효과를 발생시킨다" 라는 내용을 접하면 부수효과를 최소화 하려는 경향이 생깁니다.


하지만 부수효과를 감내하고서라도 적절한 로그 는 유지 보수성을 확보하기 위해 꼭 필요합니다.  RxJava에서는 Observable의 중간 결과를 간편하게 확인할 수 있는 함수들을 제공합니다.

확실하지 않은 코드나 예제코드를 실행할때 찜찜한 부분이 있다면 doOnNext(), doOnComplete(),doOnError()함수를 추가해보면 좋습니다.

댓글