Rx

RxJava 흐름제어 -3(throttleFirst 함수 ,throttleLast함수)

봄석 2019. 1. 1. 00:14

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




throttleFirst() 와 throttleLast() 함수

throttle은 영어로 '조절판'이라는 뜻입니다. 그것에 맞게 throttleFirst() 는 주어진 조건에서 가장 먼저 입력된 값을 발행합니다. throttleLast()는 주어진 조건에서 가장 마지막에 입력된 값을 발행합니다.


->throttleFirst() 함수와 throttleLast() 함수는 정반대의 의미가 아닙니다. throttleFrist() 함수는 어떤 데이터가 발행되지 못하도록 방지하지만, throttleLast() 함수는 smaple() 함수처럼 고정된 시간 간격(scheduled interval) 안에서 마지막 데이터만 발행합니다 .


throttleFirst() 함수는 sample() 함수와 비슷하지만 다릅니다. sample() 함수가 주어진 시간 동안 입력된 마지막 값을 발행한다면, throttleFirst() 함수는 어떤 데이터를 발행하면 지정된 시간 동안 다른 데이터를 발행하지 않도록 막습니다. 


throttleFirst() 함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


throttleFirst() 함수의 원형

@SchedulerSupport(SchedulerSupport.COMPUTATION)

public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit)



throttleFirst() 함수는 계산 스케줄러에서 실행합니다 . 즉 비동기로 동작하도록 설계된 함수입니다. windowDuration 인자는 시간 간격을 지정하며, unit은 시간단위 입니다.


throttleFirst() 함수의 활용 예

String[] data={"1","2","3","4","5","6"};
CommonUtils.exampleStart();
        
//앞의 1개는 100ms 간격으로 발행
Observable<String> earlySource= Observable.just(data[0])
            .zipWith(Observable.interval(100L,TimeUnit.MILLISECONDS), (a,b)->a);
        
//다음  1 개는 300ms 후에 발행
Observable<String> middleSource= Observable.just(data[1])
            .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS),(a,b)->a);
        
//마지막 4개는 100ms 후에 발행
Observable<String> lastSource=Observable.fromArray(data[2],data[3],data[4],data[5])
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),(a,b)->a)
.doOnNext(Log::dt);
        
 //200ms 간격으로 throttleFirst() 실행함
 Observable<String> source= Observable.concat(earlySource,middleSource,lastSource)
            .throttleFirst(200L,TimeUnit.MILLISECONDS);
        
        
 source.subscribe(Log::it);
 CommonUtils.sleep(1000);




처음 100ms가 지난 후에 빨간원을 발행한 후 300ms 동안 기다린 다음 노란원을 발행합니다.

그리고 100ms 간격으로 나머지 값들을 발행합니다. 마지막으로 throttleFirst() 함수를 호출하여 200ms 간격으로 타임 윈도에 맨 먼저 입력된 값을 발행합니다.


실행결과

RxComputationThreadPool-1 | 499 | value = 1
RxComputationThreadPool-3 | 805 | value = 2
RxComputationThreadPool-4 | 908 | debug = 3
RxComputationThreadPool-4 | 1006 | debug = 4
RxComputationThreadPool-4 | 1006 | value = 4
RxComputationThreadPool-4 | 1106 | debug = 5
RxComputationThreadPool-4 | 1208 | debug = 6

RxComputationThreadPool-4 | 1208 | value = 6



마블다이어그램 같이 1,2,4,6 (빨강,노랑,하늘,자주)원이 다운스트림으로 발행되었습니다.

throttleLast() 함수는 sample 함수와 기본개념은 동일합니다. throttleLast() 함수가 주어진 시간동안 입력된 값 중 마지막 값을 발행하는 기본 개념에 충실하다면 sample() 함수는 다양한 오버로딩을 제공합니다.