본문 바로가기
Rx

RxJava 흐름제어 -1( sample )

by 봄석 2018. 12. 31.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info



흐름제어

흐름제어는 Observable이 데이터를 발행하는 속도와 옵서버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수입니다. 예를 들어 센서에서 값을 받아서 그래프로 보여주는 기능을 구현하는 경우를 상상해 봅시다. 센서에서 내보내는 데이터의 발행속도와 그래프를 갱신하는 속도에는 차이가 있습니다. 또한 어떤 스위치 UI가 On 하는데 오랜시간이 걸린다거나 사용자가 ON이 완료되기 전에 ON/OFF를 반복하는 경우도 반응할 때 차이가 생가는 상황입니다.


RxJava는 push 방식으로 동작하므로 이러한 문제가 발생할 때 대처할 수 있어야합니다.

RxJava는 다양한 흐름 제어 함수를 제공합니다. 크게 sample(), buffer() , throttle(), window(), debounce() 함수를 제공합니다.


sample() 함수

sample() 함수는 특정한 시간 동안 가장 최근에 발행된 데이터만 걸러줍니다. 해당 시간에는 아무리 많은 데이터가 들어와도 해당 구간의 마지막 데이터만 발행하고 나머지는 무시합니다.


sample() 함수의 마블 다이어 그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


sample() 함수의 원형은 다음과 같습니다

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> sample(long period, TimeUnit unit)

public final Observable<T> smaple(long period, TimeUnit unit, boolean emitLast)


emitLast 인자는 sample() 함수의 데이터 발행이 완료되지 않고 마지막에 데이터가 남아 있을 때 해당 데이터를 발행할 것인지 결정합니다. 만약 emitLast를 true로 설정되었다면 해당시간 

가장 마지막 데이터를 발행합니다.


sample() 함수의 활용

String[] data= {"1","7","2","3","6"};
        
//시간측정용
CommonUtils.exampleStart();
        
//앞의 4개는 100ms 간격으로 발행
Observable<String> earlySource=Observable.fromArray(data)
            .take(4)
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a,b)->a);
        
//마지막  데이터는 300ms 후에 발행
Observable<String> lastSource=Observable.just(data[4])
            .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS), (a,b)->a);
        
//2개의 Observable을 결합하고 300ms로 샘플링.
Observable<String> source= Observable.concat(earlySource,lastSource)
            .sample(300L,TimeUnit.MILLISECONDS);

source.subscribe(Log::it);
CommonUtils.sleep(1000);



먼저 100ms 간격으로 data 배열에 있는 4개를 발행합니다 . 마블 다이어그램처럼 빨강 원부터

초록원까지 발행합니다. 그리고 두번째 Observable을 이용해 초록원을 300ms이후에 발행합니다.


또한 내가 원하는 특정 시간 후에 발행하기 위해 concat() 함수를 호출하여 2개의 데이터 흐름을 결합했습니다. 이렇게 전체 데이터 흐름을 세부 데이터 흐름으로 나누면 코드의 가독성이 좋아집니다.


샘플링은 300ms 간격으로 수행합니다. 매 300ms 간격으로 가장 최근에 들어온 값만 최종적으로 발행합니다.

RxComputationThreadPool-1 | 556 | value = 7

RxComputationThreadPool-1 | 856 | value = 3


sample 함수에 의해서 7과 3만 최종적으로 발행되고 6값은 발행되지 않았습니다.


만약 sample() 함수의 실행이 끝나지 않았는데 Observable이 종료되는 경우에 마지막 값을 발행하려면 emitLast 인자를 true로 넣어주면 됩니다.

//2개의 Observable을 결합하고 300ms로 샘플링.
Observable<String> source= Observable.concat(earlySource,lastSource).sample(300L,TimeUnit.MILLISECONDS,true);



실행결과는 다음과 같습니다

RxComputationThreadPool-1 | 540 | value = 7
RxComputationThreadPool-1 | 840 | value = 3

RxComputationThreadPool-3 | 956 | value = 6



댓글