Rx

RxJava 디버깅과 예외처리 -3 ( onErrorReturn, onErrorReturnItem, onErrorResumeNext 함수)

봄석 2018. 12. 31. 16:10

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




예외처리

RxJava 프로그래밍에서 예외처리는 어떻게 할까요 ? 지금까지 onError 이벤트를 전달 받아서 예외처리를 한다고 했었습니다. 원래 자바에서는 정상적인 실행 코드와 예외처리 코드를 분리하기 위해서 try-catch 문을 제공합니다 . try-catch 문으로 예외처리를 한다면 아래코드처럼 작성하게 됩니다.

Observable<String> source = Observable.create(
            (ObservableEmitter<String> emitter) -> { 
                emitter.onNext("1");
                emitter.onError(new Exception("Some Error"));
                emitter.onNext("3");
                emitter.onComplete();
            });
        
        try { 
            source.subscribe(Log::i);
        } catch (Exception e) { 
            Log.e(e.getMessage());
        }




하지만 코드를 실행해보면 다음과 같은 결과가 나옵니다
main | value = 1
io.reactivex.exceptions.OnErrorNotImplementedException: Some Error
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.tryOnError(ObservableCreate.java:85)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
    at com.qjatjr1108.reactivejava.chapter07.ExceptionHandling.lambda$0(ExceptionHandling.java:15)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10903)
    at io.reactivex.Observable.subscribe(Observable.java:10889)
    at io.reactivex.Observable.subscribe(Observable.java:10792)
    at com.qjatjr1108.reactivejava.chapter07.ExceptionHandling.cannotCatch(ExceptionHandling.java:21)



발생한 에러인 OnErrorNotImplementedException의 발생원인을 파악하기 위해 RxJava의 내부 소스코드를 살펴보겟습니다 . 

먼저 at io.reactivex.Observable.subscribe(Observable.java:10792) (https://goo.gl/FvCYdQ) 의 

 public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }



부분을 보게되면 앞서 subscribe() 함수를 호출할 때는 onNext 인자만 넘겼지만 내부에서는 두 번째 인자로 Functions.ON_ERROR_MISSING을 입력했습니다.


아래코드는 Functions.java(https://goo.gl/SQUtpa) 의 

public static final Consumer<Throwable> ON_ERROR_MISSING = new OnErrorMissingConsumer();
 
 static final class OnErrorMissingConsumer implements Consumer<Throwable> {
        @Override
        public void accept(Throwable error) {
            RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
        }
    }

Colored by Color Scripter


굵은 글씨를 보게되면 단지 OnErrorNotImpletementdException 예외를 던지라고 알려주는 역할을 합니다. 


마지막으로 LamdaObserver.java(https://goo.gl/zxh2dC) 에있는 LamdaObserver.OnError()의 소스코드를 살펴보겠습니다 여기를 보면 왜 try-catch부분이 동작하지 않는지 알 수 있습니다.

 @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        }
    }




위에서 onError 변수는 맨 처음에 subscribe() 함수에서 전달했던 ON_ERROR_MISSING입니다. 우리가 observable을 생성하여 onError(new Exception("Some Error"))를 호출하면 결국 subscribe(onNext, onError) 함수의 인자로 넘긴 onError 인자로 전달합니다. 


따라서 try-catch 문은 RxJava에서 활용할 수 없습니다. 추가로 함수 체인이나 Observable내부에서 예외가 발생해도 onError 이벤트가 발생하고 try-catch 문으로는 해결할 수없습니다 .

즉, RxJava 에서 다른 방식의 예외처리 방법을 제공한다는것을 알 수 있습니다.



onErrorReturn() 함수

RxJava 에서는 에러도 어떠한 데이터로 보는것이 적절합니다. 따라서 예외 처리를하는 첫 번째 방식은 예외가 발생했을 때 에러를 의미하는 다른 데이터로 대체하는 것입니다. 

onError 이벤트는 흐름이 바로 중단되므로 subscribe() 함수를 호출할 때 onError 이벤트를 처리하는 것은 Out Of Memory 같은 중대한 에러가 발생했을 때만 활용합니다.


onErrorReturn () 는 에러가 발생했을 때 내가 원하는 데이터로 대체하는 함수입니다.

onErrorReturn() 함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


앞의 3개의 데이터가 정상적으로 발행되고 마지막 데이터에서 어떤 에러가 발생하는 경우 onErrorReturn() 함수는 인자로 넘겼던 기본값을 대신 발행하고 onComplete 이벤트가 발생합니다. onError 이벤트는 발생하지 않습니다.



onErrorReturn() 함수의 활용 예

String[] grades={"70","88","&100","93","83"}; //&100이 에러데이터
        
        Observable<Integer>source= Observable.fromArray(grades)
                .map(data->Integer.parseInt(data))
                .onErrorReturn(e->{
                    if(e instanceof NumberFormatException){
                        e.printStackTrace();
                    }
                    return -1;
                });
        source.subscribe(data->{
            if(data<0){Log.e("Wrong Data found");
            return;
            }
            Log.i("Grade is : " +data);

        });


먼저  Integer.parseInt() 메소드는 NumberFormatException 라고 하는 검증된 예외가 있습니다.

따라서 이를 처리해 주어야 합니다. RxJava 에서는 try-catch 문이 동작하지 않기 때문에 onErrorReturn() 함수에서 처리하며 NumberFormatException 발생시 -1을 리턴합니다.

subscribe() 함수는 성적 데이터를 처리하므로 0 보다 커야합니다.

onErrorReturn() 함수에서 예외 발생 시 음수값을 리턴했으므로 data가 0보다 작으면 에러 발생으로 판단하고 에러로그를 출력합니다.


위 코드를 보고 'onError 이벤트에서 예외처리를하는것과 무었이 다른가? '라고 생각할 수 있습니다. subscribe () 함수를 호출할 때 onError이벤트를 처리하는 것이 아닌 onErrorReturn()등의 함수를 활용하여 예외처리하는 것은 몇가지 장점이 있습니다.


1) 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있습니다.

2) Observable을 생성하는 측과 구독하는 측이 서로 다를 수 있다는 점입니다.

구체적으로 말하자면 Observable에서 에러 가능성을 명시하지 않았는데 구독자가 필요한 예외처리를 빠짐없이 하는 것은 어렵다는 뜻입니다. 이럴 떄 Observable을 생성하는 측에서 발생할 수 있는 예외처리를 미리 해 두면 구독자는 선언된 예외상황을 보고 그에 맞는 처리를 할 수 있습니다. 예를들어 위 코드에서는 map(data->Integer.parseInt(data)) 부분에서 예외가 발생할 수 있음을 명시하고 있습니다.


아래 코드는 위의 코드를 onError 이벤트에서 예외처리하도록 변경한 것입니다.


구독할때 onError 이벤트에서의 예외처리

 String[] grades={"70","88","&100","93","83"}; //&100이 에러데이터
        
        Observable<Integer>source= Observable.fromArray(grades)
                .map(data->Integer.parseInt(data));
        source.subscribe(data->
            Log.i("Grade is : " +data),
            e->{if(e instanceof NumberFormatException){
                e.printStackTrace();
            }
            Log.e("Wrong Data found");
        });
        
    }



실행결과는 위의 코드와 동일합니다

main | value = Grade is : 70
main | value = Grade is : 88
java.lang.NumberFormatException: For input string: "&100"
    at java.lang.NumberFormatException.forInputString(Unknown Source)
    at java.lang.Integer.parseInt(Unknown Source)
    at java.lang.Integer.parseInt(Unknown Source)
    at com.qjatjr1108.reactivejava.chapter07.ExceptionHandling.lambda$4(ExceptionHandling.java:50)
    at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
    at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:107)
    at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:36)
    at io.reactivex.Observable.subscribe(Observable.java:10903)
    at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
    at io.reactivex.Observable.subscribe(Observable.java:10903)
    at io.reactivex.Observable.subscribe(Observable.java:10889)
    at io.reactivex.Observable.subscribe(Observable.java:10818)
    at com.qjatjr1108.reactivejava.chapter07.ExceptionHandling.onError(ExceptionHandling.java:51)
    at com.qjatjr1108.reactivejava.chapter07.ExceptionHandling.main(ExceptionHandling.java:64)

main | error = Wrong Data found




onErrorReturnItem() 함수


한편 onErrorReturn() 함수와 비슷한 함수로 onErrorReturnItem() 함수가 있습니다.

onErrorReturn()함수와 동일하지만 Throwable 객체를 인자로 전달하지 않기 때문에 코드는 좀 더 간결해집니다. 단, 예외의 종류는 확인할 수 없습니다.


onErrorReturnItem() 함수의 활용 예

String[] grades={"70","88","&100","93","83"}; //&100이 에러데이터
        
        Observable<Integer>source= Observable.fromArray(grades)
                .map(data->Integer.parseInt(data))
                .onErrorReturnItem(-1);
                
        source.subscribe(data->{
            if(data<0){Log.e("Wrong Data found");
            return;
            }
            Log.i("Grade is : " +data);
        });



Throwable( e)객체를 사용하지 않았기 떄문에 코드의 가독성이 좀 더 높아집니다. 


실행결과

main | value = Grade is : 70
main | value = Grade is : 88

main | error = Wrong Data found




onErrorResumeNext() 함수

onErrorReturn()과 onErrorReturnItem() 함수는 에러가 발생한 시점에 특정 값으로 대체하는 것이였습니다. onErrorResumeNext() 는 에러가 발생했을 때 내가 원하는 Observable로 대체하는 방법입니다. Observable로 대체한다는 것은 에러 발생 시 데이터를 교체하는 것뿐만 아니라 관리자에게 이메일을 보낸다던가 자원을 해제하는 등의 추가 작업을 해야할 때 유용합니다.


onErrorResumeNext() 함수의 마블 다이어 그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


에러가 발생했을 때 특정 값을 발행한다는 점은 onErrorReturn() 함수와 크게 다르지 않습니다.

특정 값을 원하는 Observable로 설정할 수 있다는 점이 다릅니다.


onErrorResumeNext() 함수의 활용 예

String[] salesData={"100","200","A300"}; //A300은 에러데이터
        Observable<Integer> onParseError=Observable.defer(()->{
            Log.d("Send email to administrator");
            return Observable.just(-1);
        }).subscribeOn(Schedulers.io()); // IO 스케줄러에서 실행됨
        
        Observable<Integer> source=Observable.fromArray(salesData)
                .map(Integer::parseInt)
                .onErrorResumeNext(onParseError);
        
        source.subscribe(data->{
            if(data<0){
                Log.e("Wrong Data found");
                return;
            }
            Log.i("Sales data :"+ data);
        });
        CommonUtils.sleep(1000);



에러가 발생했을 때 관리자에게 이메일을 보내고 '-1' 이라는 데이터를 발행하는 Obsevable로 대체합니다. onParseError 변수는 subscribeOn() 함수를 호출하여 IO 스케줄러에서 실행합니다.

RxJava는 이처럼 내가 원하는 코드를 실행하는 스케줄러를 선언적으로 지정할 수 있어

활용 범위가 넓습니다


실행결과

main | value = Sales data :100
main | value = Sales data :200
RxCachedThreadScheduler-1 | debug = Send email to administrator

RxCachedThreadScheduler-1 | error = Wrong Data found



참고로 onErrorResumeNext() 함수는 onErrorReturn () 함수처럼 Throwable을 받아오는 오버로딩도 제공합니다.