본문 바로가기
Rx

스케줄러 -6(콜백지옥벗어나기)

by 봄석 2018. 12. 28.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




RxJava의 버전별 스케줄러 종류

스케줄러 

RxJava 2.x 

RxJava 1.x 

  뉴 스레드 스케줄러

newThread()

 newThread()

 싱글 스레드 스케줄러

single() 

지원 안 함 

계산 스케줄러 

computation() 

computation() 

IO 스케줄러 

io() 

io() 

트램펄린 스케줄러 

trampoline() 

trampoline() 

메인 스레드 스케줄러 

지원 안 함 

im 

 테스트 스케줄러

지원 안 함 

test() 


스케줄러 동작 방법

1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.

2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 

발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.

3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.

4. 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행한다



스케줄러를 활용해 콜백 지옥 벗어나기

서버와 통신하는 네트워크 프로그래밍을 할 때 마주치는 콜백 지옥을 RxJava가 어떻게 해결해주는지 알아보겠습니다.


RxJava의 스케줄러를 활용하면 비동기 프로그래밍 방식이 달라집니다. 앞서 계산 스케줄러나 IO 스케줄러의 예제에서도 살펴봤듯이 스레드를 생성하거나 Callable, Runnable 객체를 실행하는 코드가 사라집니다. 리액티브 프로그래밍은 서버와 연동하는 비동기 프로그래밍을 작성할 때 큰 힘을 발휘합니다.


예제를 통해 간단한 HTTP 기반의 네트워크 프로그램을 작성합니다.

private static final String URL_README=
            "https://raw.githubusercontent.com/yudong80/reactivejava/master/README.md";
private final OkHttpClient client = new OkHttpClient();
public void run(){
        Request request=new Request.Builder()
                .url(URL_README)
                .build();
        client.newCall(request).enqueue(new Callback(){
            @Override
            public void onFailure(Call call, IOException e){
                e.printStackTrace();
            }
            
            @Override
            public void onResponse(Call call,Response response) throws IOException{
                Log.i(response.body().string());
            }
});



HTTP GET 명령으로 URL_README에 저장된 URL 내용을 가져옵니다 . 성공하면 가져온 내용을 출력하고 실패면 printStackTrace() 메소드로 호출 스택을 출력합니다 .


실행결과는 아래와같습니다

1
OkHttp https://raw.githubuserc... | value = Welcome to Java Reactive Programming!!
cs

좀더 발전시켜 첫번째 URL을 호출한 다음 성공했을 때 다른 서버의 두번째 URL을 호출하게 만드는 예제를 만들어보겠습니다.
즉 성공했다는 콜백을 받았을 때만 두번째 URL을 호출해야합니다.

CallBackHell.java
public class CallbackHell {
    private static final String FIRST_URL ="https://api.github.com/zen";
    private static final String SECOND_URL=GITHUB_ROOT+"/samples/callback_hell";
    
    private final OkHttpClient client=new OkHttpClient();
    
    private Callback onSuccess=new Callback(){
 
        @Override
        public void onFailure(Call call, IOException e) {
            e.printStackTrace();
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.i(response.body().string());
        }
        
    };
    public void run(){
        Request request=new Request.Builder()
                .url(FIRST_URL)
                .build();
        client.newCall(request).enqueue(new Callback(){
 
            @Override
            public void onFailure(Call call, IOException e) {
                e.printStackTrace();
                
            }
 
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.i(response.body().string());
                
                //콜백을 다시 추가
                Request requset=new Request.Builder()
                        .url(SECOND_URL)
                        .build();
                client.newCall(request).enqueue(onSuccess);
            }
        });
    }
    public static void main(String args[]){
        CallbackHell demo=new CallbackHell();
        demo.run();
    }
}



코드가 복잡해집니다. 첫 번째 HTTP GET 호출은 run() 메소드 실행 코드 내부에서 포함할 수 있습니다. 하지만 실행 결과를 얻은 후 두번째 URL을 호출할 때는 지역 변수를 사용할 수 없으므로 객체의 멤버변수로 선언해야 합니다. 즉 , 이미 첫 번째 호출의 성고과 실패가 있고 그것을 기준으로 해서 두 번째 호출의 성공과 실패가 있기 때문에 이를 모두 고려해 코드를 작성해야 합니다.


이 결과 작성자가 호출한 내용을 한 눈에 보기 어려워서 작성자의 의도가 코드를 통해 분명하게 드러나지않습니다


이러한 어려움이 있지만 실행결과는 매우 단순합니다.

OkHttp https://api.github.com/... | value = It's not fully shipped until it's fast.

OkHttp https://api.github.com/... | value = Approachable is better than simple.




이제 RxJava의 스케줄러를 이용해 간결하게 비동기 네트워크 코드를 작성해 보겠습니다

subscribeOn() 함수와 스케줄러를 활용하면 됩니다. 호출하는 URL은 동일합니다.


RxJava의 스케줄러를 활용한 URL호출

public class CallBackHeaven {
    private static final String FIRST_URL = "https://api.github.com/zen";
    private static final String SECOND_URL = GITHUB_ROOT + "/samples/callback_hell";
    public void usingConcat(){
        CommonUtils.exampleStart();
        Observable<String> source=Observable.just(FIRST_URL)
                .subscribeOn(Schedulers.io())
                .map(OkHttpHelper::get)
                .concatWith(Observable.just(SECOND_URL)
                        .map(OkHttpHelper::get));
        source.subscribe(Log::it);
        CommonUtils.sleep(5000);
    }
    public static void main(String args[]){
        CallBackHeaven demo=new CallBackHeaven();
        demo.usingConcat();
    }
}




곧바로 내가 호출하는 첫 번째 URL과 두 번째 URL에 대한 코드가 한눈에 보일 겁니다.

concatWith() 함수는 concat()함수와 기능이 동일합니다. 단, concat()함수는 첫 번째 Observable과 두 번째 Observable을 함께 인자로 넣어야하지만, concatWith()함수는 현재의 Observable에 새로운 Observable을 결합할 수 있다는 차이가 있습니다.


앞서 CallBackHell 코드에서는 OkHttpClient의 enqueue() 메소드를 호출하여 콜백을 전달받았지만, 이 코드에서는  OkHttpHelper.get() 메소드 안에서 OkHttpClient의 execute()메소드를 호출합니다. 또한 IO 스케줄러로 별도의 스레드에서 네트워크를 호출합니다.


실행결과

RxCachedThreadScheduler-1 | 2817 | value = Anything added dilutes everything else.

RxCachedThreadScheduler-1 | 3294 | value = Welcome to Callback Hell!!


이 코드에 장점

첫 번째는 선언적 동시성입니다. 순수한 비즈니스 로직과 비동기 동작을 위한 스레드 부분을 구별할 수 있도록 해줍니다.


두 번째는 가독성입니다. 정상적인 로직과 향후 예외처리 부분을 말끔하게 분리할 수 있도록 해줍니다.




실습예제: 동시성 네트워크 호출

CallBackHeaven 코드에서 좀더 욕심을 내보겠습니다. concat() 함수는 결국 첫 번째 Observable에서 데이터 발행이 끝날 때까지 기다려야 합니다.

만약 첫 번째 URL과 두 번째 URL 요청을 동시에 수행하고 결과만 결합하면 어떨까요 ?

첫 번째 URL의 응답을 기다리지 않고 두 번째 URL 호출을 시작할 수 있기 때문에 성능 향상을 기대할 수 있습니다. 이때 zip() 함수를 사용합니다.


zip() 함수를 활용한 CallbackHeaven 예

CommonUtils.exampleStart();
        
Observable<String> first=Observable.just(FIRST_URL)
            .subscribeOn(Schedulers.io())
            .map(OkHttpHelper::get);

Observable<String> second=Observable.just(SECOND_URL)
            .subscribeOn(Schedulers.io())
            .map(OkHttpHelper::get);
        
Observable.zip(first,second,(a,b)->("\n>>"+a+"\n>>"+b))
        .subscribe(Log::it);
        

CommonUtils.sleep(50000);



Observable을 2개로 나눈다고 생각하는 것이 중요합니다. 첫 번째 URL 호출은 first 변수, 두 번째 URL 호출은 second 변수에 Observable 을 할당했습니다. 마지막으로 zip() 함수를 활용하여 두 Observable을 실행합니다.


실행결과

RxCachedThreadScheduler-1 | 2687 | value = 
>>Responsive is better than fast.

>>Welcome to Callback Hell!!



앞서 concatWith() 함수를 활용한 예제는 3294ms 의 실행시간이 소요되었는데 zip() 함수는 2687ms 밖에 소요되지 않았습니다 


이처럼 RxJava의 스케줄러를 활용하면 비즈니스 로직과 비동기 프로그래밍을 분리할 수있기 때문에 프로그램의 효율을 향상시킬 수 있습니다.

댓글