본문 바로가기
Rx

리액티브연산자[조건 연산자]- 17(amb함수,takeUntil함수)

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




리액티브 연산자(함수) 분류 - 카테고리

연산자 종류 

연산자 함수 

  생성 연산자 


 just(), fromXXX(), create(), interval(), ragne(), timer(),   intervalRange(), defer(), repeat() 


  변환 연산자


 map(),flatMap(), concatMap(), switchMap() , groupBy(),

 scan(), buffer(), window()


  필터 연산자


 filter(), take(), skip(), distinct()


  결합 연산자

 

 zip(), combineLatest(), merge(), concat()

 

  조건 연산자


 amb(), takeUtil(), skipUtil(), all()

 

  에러 처리 연산자 


 onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()


  기타 연산자 

 

 subscribe(), subscribeOn(), observeOn(), reduce(), count()

 




조건연산자란?

조건연산자는 Observable의 흐름을 제어하는 역할을 합니다. 필터연산자가 발행된 값을 채택하느냐 기각하느냐 여부에 초점을 맞춘다면, 조건연산자는 지금까지의 흐름을 어떻게 제어할 것인가에 초점을 맞춥니다.


amb() - 둘중 어느것이든 먼저나오는 Observable을 채택합니다.

takeUntil(other) - other Observable에서 데이터가 발행되기 전까지만 현재 Observable을 채택합니다.

skipUntil(other) - takeUntil(other) 함수와는 반대로 other Observable에서 데이터가 발행될 동안 현재 Observable에서 발행하는 값을 무시합니다.

all() - Observable에 입력되는 값이 모두 특정 조건에 맞을때만 true를 발행합니다. 만약 조건이 맞지않는다면 바로 false를 발행합니다.



조건연산자 - amb() 함수

amb는 ambiguous(모호한)라는 영어 단어의 줄임말 입니다. 여러개의 Observable중에서 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 Observable입니다. 

이후에 나머지 Observable에서 발행하는 데이터는 모두 무시합니다.


amb() 함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


타임라인중 원을 가진 Observable을 먼저 발행했으므로 원만을 발행하고,

사각형을 발행하는 두 번째 Observable에서 발행되는 값은 모두 무시합니다.


amb()함수의 원형

@SchedulerSupport(SchedulerSupprot.NONE)
public static <T> Observable<T> amb(

    Iterable<? extends ObservableSource<? extends T>> sources)


List 인터페이스처럼 Iterable<Observable<T>> 객체를 인자로 넣으면 그중 가장 먼저 데이터를 발행하는 Observable만 선택해서 발행하도록 해줍니다.



amb()함수의 활용 예

String[] data1={"1","3","5"};
String[] data2={"2-R","4-R"};
        
List<Observable<String>> sources=Arrays.asList(
Observable.fromArray(data1).doOnComplete(()->Log.d("Observable #1 : onComplete()")),
Observable.fromArray(data2)
                 .delay(100L,TimeUnit.MILLISECONDS)
                    .doOnComplete(()->Log.d("Observable #2 : onComplete()"))
                );

Observable.amb(sources)
    .doOnComplete(()->Log.d("Result : onComplete()"))
    .subscribe(Log::i);
CommonUtils.sleep(1000);



sources 변수에는 data1에서 값을 발행하는 Observable과 data2에서 100ms 동안 기다렸다가 값을 발행하는 Observable을 넣습니다. 그리고 sources 변수에 amb()함수를 적용합니다.


실행결과

main | value = 1
main | value = 3
main | value = 5
main | debug = Observable #1 : onComplete()

main | debug = Result : onComplete()


첫번째 Observable이 먼저 값을 발행하기 때문에 첫번째 Observable의 값이 출력되고 , 첫번째 Observable에서 onComplete이벤트가 발생하면 Observable도 최종 완료(onComplete이벤트)됩니다.


조건연산자 - takeUntil() 함수

takeUntil() 함수는 take() 함수에 조건을 설정할 수 있습니다. 구체적으로 살펴보면 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시완료(onComplete 이벤트 발생)합니다. 즉, take() 함수 처럼 일정 개수만 값을 발행하되 완료기준을 다른 Observable에서 값을 발행하는 지로 판단하는 것입니다.


takeUntil() 함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


빨간원부터 차레대로 발행되다가, 팔각형 Observable이 데이터를 발행하기 전까지만 원을 다운스트림으로 전달합니다.


takeUntil() 함수의 원형

@SchedulerSupport(SchedulerSupport.NONE)

public final <U> Observable<T> takeUntil(ObservableSource<U> other)


takeUntil() 함수의 인자로는 값을 발행할 수 있는 other Obsrvable이 필요합니다. otherObservable 에서 값이 발행되면 기존 Observable에서 나오는 값을 더 발행하지 않고 완료(onComplete() )합니다.


takeUntil()함수를 활용한 예

String[] data={"1","2","3","4","5","6"};
        
Observable<String> source=Observable.fromArray(data)
           .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),(val,notUsed)->val)
           .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
        
source.subscribe(Log::i);
CommonUtils.sleep(1000);



fromArray() 함수에 데이터를 넣고 zipWith()와 interval() 함수를 활용하여 100ms 간격으로 데이터를 발행합니다. takeUntil() 함수의 인자로는 Observable.timer()를 호출하여 500ms 후에 값을 발행하도록 했습니다. timer() 함수는 값을 1번만 발행하므로 takeUntil()함수와 함께 사용하기에 편리합니다.


실행결과

RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 2
RxComputationThreadPool-2 | value = 3

RxComputationThreadPool-2 | value = 4



비슷한함수로는 skipWhile()이 있습니다. 

댓글