본문 바로가기
Rx

스케줄러란?

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




스케줄러란?

어떤 프로그램의 세부 일정을 주관하는 관리자로 생각하면 이해하기 조금 쉽습니다.

RxJava의 기본 개념을 비동기 프로그래밍에 적용하는 마지막 퍼즐입니다.


실무에서는 요구사항에 맞게 비동기적으로 동작할수 있어야하며,

 이때 스케줄러를 이용합니다.


스케줄러는 스레드를 지정할 수 있게 해줍니다.  단순히 새로운 스레드를 생성하거나 기존의 Executors를 활용하는것을 넘어 새로운 방식으로 우리를 맞이합니다.

지금까지 자바로 비동기프로그래밍을 할 때 자바 스레드를 만들면서 경쟁 조건이나 synchronized 키워드를 생각하셨다면 스케줄러의 코드를 보고 놀랄 것입니다. 그동안은 어렵게

다루어야 했던 비동기 프로그래밍이 간결한 코드로 다시 태어납니다.



filp()함수의 마블 다이어그램

flip() 함수의 활용 예

String[] objs={"1-S","2-T","3-P"};
Observable<String> source= Observable.fromArray(objs)
            .doOnNext(data->Log.v("Originla Data ="+ data))
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .map(Shape::flip);
source.subscribe(Log::i);

 CommonUtils.sleep(500);



objs에는 마블 다이어그램의 도형이 담겨져 있습니다. 이를 이용해 Observable을 생성합니다.

map() 함수에 Shape.filp() 함수를 호출해 도형을 뒤집습니다.


주목해야 할 코드는 첫 번째 doOnNext() 함수입니다. Observable에서 onNext 이벤트가 발생하면 실행되며 여기에서는 원래 데이터 값을 확인합니다.


두 번째는 subscribeOn() 함수입니다. 구독자가 Observable에 subscribe() 함수를 호출하여 구독할 때 실행되는 스레드를 지정합니다. 


마지막은 observeOn() 함수입니다. Observable에서 생성한 데이터 흐름이 여기저기 함수를 거치며 처리될 때 동작이 어느 스레드에서 일어나는지 지정할 수 있습니다. 


subscribeOn() 함수와 observeOn() 함수 모두에 인자로 Schedulers.newThread()를 넘겼습니다.

이는 새로운 스레드를 생성한다는 의미로 뉴 스레드 스케줄러라고 합니다.


실행결과

RxNewThreadScheduler-1 | Originla Data =1-S
RxNewThreadScheduler-1 | Originla Data =2-T
RxNewThreadScheduler-1 | Originla Data =3-P
RxNewThreadScheduler-2 | value = (flipped)1-S
RxNewThreadScheduler-2 | value = (flipped)2-T

RxNewThreadScheduler-2 | value = (flipped)3-P



최초의 데이터 흐름이 발생하는 스레드와 filp() 함수를 거쳐서 구독자에게 전달되는 스레드가 다릅니다.  우리는 새로운 스레드를 생성하거나, Runnable 혹은 Callable 객체를 전달한 적이 없습니다. 단지 subscribeOn() 과 observeOn() 함수에 어떤 스케줄러(뉴 스레드 스케줄러)를 지정했을 뿐 입니다.


이처럼 스케줄러를 활용하는 비동기 프로그래밍의 핵심은 바로 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다는 것입니다.



만약 observeOn 함수 호출부분을 호출하면 어떻게될까요 ?

String[] objs={"1-S","2-T","3-P"};
Observable<String> source= Observable.fromArray(objs)
            .doOnNext(data->Log.v("Originla Data ="+ data))
            .subscribeOn(Schedulers.newThread())
            //.observeOn(Schedulers.newThread())
            .map(Shape::flip);
source.subscribe(Log::i);

 CommonUtils.sleep(500);



실행결과는 다음과 같습니다.

RxNewThreadScheduler-3 | Originla Data =1-S
RxNewThreadScheduler-3 | value = (flipped)1-S
RxNewThreadScheduler-3 | Originla Data =2-T
RxNewThreadScheduler-3 | value = (flipped)2-T
RxNewThreadScheduler-3 | Originla Data =3-P

RxNewThreadScheduler-3 | value = (flipped)3-P


 observeOn() 함수를 지정하지 않으면 subscribeOn() 함수로 지정한 스레드에서 모든 로직을 실행합니다.




정리


1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.

2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 

발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.

3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.

4. 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행한다

댓글