Rx

RxJava 디버깅과 예외처리 -2 ( 디버깅, doOnEach ,doOnsubscribe, doOnDispose, doOnLifeCycle, doOnTerminate ,doFinally 함수)

봄석 2018. 12. 31. 15:04

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




doOnEach() 함수

doOnEach() 함수입니다 . onNext, onComplete, onError 이벤트를 각각 처리하는 것이 아니라 한번에 처리할 수 있기 때문입니다.


doOnEach() 함수의 원형은 다음과 같습니다.

public final Observable<T> doOnEach(
    final Consumer<? super Notification<T>> onNotification)

public final Observable<T> doOnEach(final Observable<? super T> observer)



첫 번째 Notification<T> 객체를 전달받아서 이벤트별로 구별하여 처리하는 방법입니다.

String[] data={"ONE","TWO","THREE"};
Observable<String> source=Observable.fromArray(data);
        
source.doOnEach(noti->{
        if(noti.isOnNext()) Log.d("onNext():", noti.getValue());
        if(noti.isOnComplete()) Log.d("onComplete");
        if(noti.isOnError()) Log.e("onError",noti.getError().getMessage());

 }).subscribe(System.out::println);


Notification<T> 객체는 발생한 이벤트의 종류를 알 수 있는 boolean 타입의 isOnNext(),

isOnComplete(), isOnError() 함수를 제공합니다. onNext() 함수의 경우 getValue() 함수를 호출하면 발행된 값을 알 수 있고 onError() 함수의 경우 getError() 함수를 호출하면 Throwable 객체를 얻어올수 있습니다.

main | onNext(): | debug = ONE
ONE
main | onNext(): | debug = TWO
TWO
main | onNext(): | debug = THREE
THREE

main | debug = onComplete


Notification<T> 객체를 사용하지 않고 Observer인터페이스를 사용하는 방법도 있습니다.

Observer 인터페이스는 Observable의 subscribe() 함수를 호출할 때 인자로 전달하는 인터페이스 입니다.


Observer 인터페이스 활용 예

public void doOnEachObserver() { 
        String[] orgs = {"1", "2", "3"};
        Observable<String> source = Observable.fromArray(orgs);
        
        source.doOnEach(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //doOnEach()에서는 onSubscribe()가 호출되지 않습니다. 
            }
 
            @Override
            public void onNext(String value) {
                Log.d("onNext()", value);
            }
 
            @Override
            public void onError(Throwable e) {
                Log.e("onError()", e.getMessage());
            }
 
            @Override
            public void onComplete() {
                Log.d("onComplete()");
            }})
            .subscribe(Log::i);
        CommonUtils.exampleComplete();
    }




실행결과

main | onNext() | debug = 1
main | value = 1
main | onNext() | debug = 2
main | value = 2
main | onNext() | debug = 3
main | value = 3

main | debug = onComplete()


한가지 특이한 접은 Observer 객체를 인자로 받았으나 onSubscribe() 함수는 호출되지 않습니다. doOnEach() 함수는 오직 onNext, onComplete, onError 이벤트만 처리하기 때문입니다. 

유의하여 사용해야합니다. 하지만 위코드는 잘사용하지 않으므로 Notification을 잘 활용합니사.



doOnSubscribe() , doOnDispose(), 기타함수

Observable의 알림 이벤트 중에는 onSubscribe 와 onDispose 이벤트도 있습니다. 각각

Observable을 구독했을 때와 구독해지했을때 이벤트를 처리할 수 있습니다.


doOnSubscribe()와 doOnDispose() 함수의 원형

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onUsbscribe)

public final Observable<T> doOnDispose(Action onDispose)



doOnSubscribe() 함수는 Observable을 구독 했을 때 어던 작업을 할 수 있습니다. 람다 표현식의 인자로는 구독의 결과로 나오는 Disposable이 객체가 제공됩니다.


doOnDispose() 함수는 Observable의 구독을 해지했을때 호출되며 인자는 Action 객체입니다.

스레드 다수에서 Observable을 참조할 수 있기 때문에 Action 객체는 '스레드 안전'하게 동작해야 합니다. 두 함수를 활용하는 코드는 아래와 같습니다.


doOnDispose() 와 doOnSubscribe() 의 활용 예

String[] orgs={"1","3","5","2","6"};
Observable<String> source=Observable.fromArray(orgs)
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b)->a)
            .doOnSubscribe(d->Log.d("onSubscribe()"))
            .doOnDispose(()->Log.d("dispose"));

Disposable d= source.subscribe(Log::i);
CommonUtils.sleep(400);
d.dispose();
CommonUtils.sleep(300);




실행결과

main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
RxComputationThreadPool-1 | value = 5
RxComputationThreadPool-1 | value = 2

main | debug = dispose


100ms 간격으로 orgs 배열의 데이터를 발행한 후 doOnSubscribe() 및 doOnDispose() 함수를 호출하여 로그를 출력합니다. Observable은 zipWith() 함수를 활용하여 interval() 함수와 합성했기 때문에 main 스레드가 아니라 꼐산 스케줄러에서 동작합니다. main스레드는 200ms 후에 Observable을 구독 해지합니다. 


한편  doOnSubscribe()와 doOnDispose() 를 각각 호출하지않고 한번에 호출하는 

doOnLifeCycle() 도 있습니다.


doOnLifeCycle() 함수 예

String[] orgs={"1","3","5","2","6"};
Observable<String> source=Observable.fromArray(orgs)
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b)->a)
            .doOnLifecycle(d->Log.d("onSubscribe()"),()->Log.d("onDispose()"));

Disposable d=source.subscribe(Log::i);
        
CommonUtils.sleep(400);
d.dispose();
CommonUtils.sleep(300);




실행결과는 doOnSubscribe() 와 doOnDispose() 함수와 같습니다

각각 넣은 인자를 한번에 넣을수 있는것만 다릅니다.


또한 doOnTerminate() 함수는 Observable이 끝나는 조건인 onCoplete 혹은 onError 이벤트가 발생했을때 실행하는 함수입니다. 정확하게는 onComplete () 혹은 onError 이벤트 발생 직전에 호출합니다. 


doOnTerminate() 함수의 활용 예

String[] orgs={"1","3","5"};
Observable<String> source= Observable.fromArray(orgs);
        
source.doOnTerminate(()->Log.d("onTerminate()"))
            .doOnComplete(()->Log.d("onComplete()"))
            .doOnError(e->Log.e("onError()",e.getMessage()))
        .subscribe(Log::i);



실행결과

main | value = 1
main | value = 3
main | value = 5
main | debug = onTerminate()

main | debug = onComplete()




마지막으로 doFinally() 함수는 onError. onComplete혹은 onDispose 이벤트 발생시에 호출됩니다.