본문 바로가기
Rx

스케줄러 -5( 싱글 스레드 스케줄러, Executor 변환스케줄러 )

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




RxJava의 버전별 스케줄러 종류

스케줄러 

RxJava 2.x 

RxJava 1.x 

  뉴 스레드 스케줄러

newThread()

 newThread()

 싱글 스레드 스케줄러

single() 

지원 안 함 

계산 스케줄러 

computation() 

computation() 

IO 스케줄러 

io() 

io() 

트램펄린 스케줄러 

trampoline() 

trampoline() 

메인 스레드 스케줄러 

지원 안 함 

im 

 테스트 스케줄러

지원 안 함 

test() 


스케줄러 동작 방법

1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.

2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 

발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.

3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.

4. 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행한다



싱글 스레드 스케줄러

싱글 스레드 스케줄러는 RxJava 내부에서 단일 스레드를 별도로 생성하여 구독(Subscription) 작업을 처리합니다. 단, 생성된 스레드는 여러번 구독 요청이와도 공통으로 사용합니다.


리액티브 프로그래밍이 비동기 프로그래밍을 지얗아기 때문에 싱글 스레드 스케줄러를 활용할 확률은 낮습니다. 


싱글 스레드 스케줄러의 활용 예

Observable<Integer> numbers= Observable.range(100,5);
Observable<String> chars=Observable.range(0, 5)
        .map(CommonUtils::numberToAlphabet);
        
numbers.subscribeOn(Schedulers.single())
        .subscribe(Log::i);
        
chars.subscribeOn(Schedulers.single())
        .subscribe(Log::i);

 CommonUtils.sleep(500);


첫 번째 Observable은 100에서 104까지 5개의 숫자를 발행합니다.

이떄 range() 함수는 기본적으로 현재 스레드에서 실행됩니다.


두 번째 Observable은 0에서 4까지 5개의 숫자를 바탕으로 영문자를 발행합니다.

여기서 사용하는 CommonUtils.numberToAlphabet() 메소드의 구현은 아래와 같습니다.

private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
    
    public static String numberToAlphabet(long x) { 
        return Character.toString(ALPHABET.charAt((int) x % ALPHABET.length()));

    }


싱글 스레드 스케줄러에서 실행하면 비록 여러개 Observable이 있어도 별도 마련해놓은 단일 스레드에서 차례로 실행합니다,

실행결과
RxSingleScheduler-1 | value = 100
RxSingleScheduler-1 | value = 101
RxSingleScheduler-1 | value = 102
RxSingleScheduler-1 | value = 103
RxSingleScheduler-1 | value = 104
RxSingleScheduler-1 | value = A
RxSingleScheduler-1 | value = B
RxSingleScheduler-1 | value = C
RxSingleScheduler-1 | value = D
RxSingleScheduler-1 | value = E


트램펄린 스케줄러 예제와 비교해보면 실행 스레드가 다르다는 사실을 알 수 있습니다.

싱글 스레드 스케줄러는 RxSingleScheduler-1 스레드에서 실행합니다. 

그리고 결국 단일 스레드만을 사용한다는 사실을 알 수 있습니다.




Executor 변환 스케줄러

자바에서는 java.util.current 패키지에서 제공하는 실행자(Executor)를 변환하여 스케줄러를 생성 할 수 있습니다. 하지만 Executor 클래스와 스케줄러의 동작 방식과 다르므로 추천 방법은 아닙니다. 기존에 사용하던 Executor 클래스를 재사용 할 때만 한정적으로 활용합니다.


Executor 클래스를 활용하여 스케줄러를 지정하는 방법은 다음과 같습니다.


Executor를 활용한 스케줄러 지정 예

final int THREAD_NUM=10;
        
String[] data={"1","3","5"};
Observable<String> source=Observable.fromArray(data);
Executor executor=Executors.newFixedThreadPool(THREAD_NUM);
        
source.subscribeOn(Schedulers.from(executor)).subscribe(Log::i);
source.subscribeOn(Schedulers.from(executor)).subscribe(Log::i);
CommonUtils.sleep(500);



executor 변수는 고정 개수(10개) 의 스레드 풀을 생성합니다. 그리고 첫 번째 Observable과 두 번째 Observable에 subscribeOn() 함수를 호출하여 Executor 변환 스케줄러를 지정했습니다.


실행결과는 아래와 같습니다.

pool-1-thread-1 | value = 1
pool-1-thread-1 | value = 3
pool-1-thread-1 | value = 5
pool-1-thread-2 | value = 1
pool-1-thread-2 | value = 3

pool-1-thread-2 | value = 5


만약 Executors.newSingleThreadExecutor()로 Executors를 생성했다면 앞 실행 결과도 2개의 스레드가 아니라 1개의 스레드에서 모두 실행됩니다.

댓글