본문 바로가기
Rx

스케줄러 -2 (계산스케줄러)

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




RxJava의 버전별 스케줄러 종류

스케줄러 

RxJava 2.x 

RxJava 1.x 

  뉴 스레드 스케줄러

newThread()

 newThread()

 싱글 스레드 스케줄러

single() 

지원 안 함 

계산 스케줄러 

computation() 

computation() 

IO 스케줄러 

io() 

io() 

트램펄린 스케줄러 

trampoline() 

trampoline() 

메인 스레드 스케줄러 

지원 안 함 

im 

 테스트 스케줄러

지원 안 함 

test() 


스케줄러 동작 방법

1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.

2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 

발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.

3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.

4. 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행한다



계산스케줄러

RxJava에서 추천하는 스케줄러는 크게 3가지 입니다 .

첫 번쨰는 계산(Computation) 스케줄러

두 번째는 IO 스케줄러

세 번째는 트램펄린 스케줄러 입니다.

앞서배웠던 뉴스레드 스케줄러나 다른 스케줄러는 특수한 상황에서 적용하길 권장합니다.


사실 계산 스케줄러는 수차례 예제에 등장했습니다

interval()함수의 원형을 보면

@SchedulerSupport(SchedulerSupport.COMPUTATION)

public static Observable<Long> interval(long period, TimeUnit unit)


interval() 함수는 기본적으로 계산 스케줄러에서 동작합니다. 물론 내가 원하는 스케줄러에서 동작하도록 변경할 수도 있습니다.


스케줄러를 지정할 수 있는 interval() 함수의 원형은 다음과 같습니다.

@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(

    long period, TimeUnit unit, Scheduler scheduler)


CUSTOM은 개발자가 원하는 스케줄러를 지정할 수 있다는 의미입니다. 리액티브 함수 대부분은 마지막 인자로 스케줄러를 지정할 수 있습니다. flatMap()이나 scan() 함수 등은 대표적인 연산자지만 스케줄러를 인자로 받지 않는 경우도 있습니다.

interval( period, unit) 함수의 내부 구현을 보면 실제로 Schedulers.computation() 객체를 인자로 전달합니다.

계산스케줄러는 CPU에 대응하는 계산용 스케줄러입니다. '계산' 작업을 할 때는 대기 시간없이 바르게 결과를 도출하는 것이 중요합니다. 계산 작업이라고 하니까 어렵게 느껴진다면 
입출력(I/O) 작업을 하지 않는 스케줄러라고 생각하면 됩니다.
내부적으로 스레드 풀을 생성하며 스레드 개수는 기본적으로 프로세서 개수와 동일합니다.



계산스케줄러의 활용 예
String[] orgs ={"1","3","5"};
Observable<String> source=Observable.fromArray(orgs)
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),(a,b)->a);
        
//구독 (Subscription) #1
source.map(item->"<<"+item+">>")
.subscribeOn(Schedulers.computation())
.subscribe(Log::i);
        
//구독 (Subscription) #2
source.map(item->"##"+item+"##")
    .subscribeOn(Schedulers.computation())
    .subscribe(Log::i);
        
CommonUtils.sleep(1000);

데이터 흐름은 '1','3','5'로 동일합니다. 앞서 zipWith()함수로 데이터와 시간을 합성할 수 있다고 했었습니다. 여기서도 배열에 들어있는 데이터와 interval() 함수를 합성하여 시간간격으로 데이터를 발행했습니다. zipWith() 함수는 쓰임이 많으므로 잘 익혀두면 좋습니다.


람다 표현식으로 표현한 (a,b) -> a 는 시간은 그대로 두고 데이터만 취했습니다.


동시 실행을 위해서 첫 번재 구독과 두 번째 구독 사이에 sleep()을 주지않았고,

subscribeOn()을 호출하여 Schedulers.computation()으로 계산스케줄러로 설정했습니다.

사실 interval() 함수는 기본적으로 계산스케줄러를 사용하기 때문에 .subscribeOn()부분을 삭제

하여도 똑같이 동작합니다.


실행결과

RxComputationThreadPool-4 | value = ##1##
RxComputationThreadPool-3 | value = <<1>>
RxComputationThreadPool-4 | value = ##3##
RxComputationThreadPool-3 | value = <<3>>
RxComputationThreadPool-4 | value = ##5##

RxComputationThreadPool-3 | value = <<5>>


같은예제를 여러번 실행하다보면 아래와같은 결과가얻어지기도 하는데
RxComputationThreadPool-3 | value = <<1>>
RxComputationThreadPool-3 | value = ##1##
RxComputationThreadPool-3 | value = <<3>>
RxComputationThreadPool-3 | value = ##3##
RxComputationThreadPool-3 | value = <<5>>
RxComputationThreadPool-3 | value = ##5##


첫번째 구독과 두번째 구독이 거의 동시에 이루어지기 때문에 RxJava 내부에서 동일한 스레드에 작업을 할당했기 때문입니다. 반면에 뉴스레드 스케줄러로 한다면 매번 새로운 스레드가 할당될것입니다.

댓글