Rx

RxJava 흐름제어 -2( Buffer)

봄석 2018. 12. 31. 23:46

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




흐름제어

흐름제어는 Observable이 데이터를 발행하는 속도와 옵서버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수입니다. 예를 들어 센서에서 값을 받아서 그래프로 보여주는 기능을 구현하는 경우를 상상해 봅시다. 센서에서 내보내는 데이터의 발행속도와 그래프를 갱신하는 속도에는 차이가 있습니다. 또한 어떤 스위치 UI가 On 하는데 오랜시간이 걸린다거나 사용자가 ON이 완료되기 전에 ON/OFF를 반복하는 경우도 반응할 때 차이가 생가는 상황입니다.


RxJava는 push 방식으로 동작하므로 이러한 문제가 발생할 때 대처할 수 있어야합니다.

RxJava는 다양한 흐름 제어 함수를 제공합니다. 크게 sample(), buffer() , throttle(), window(), debounce() 함수를 제공합니다.



buffer()함수

buffer() 함수는 sample() 함수와는 조금 다릅니다.  sample() 함수는 특정 시간 간격을 기준으로 가장 최근에 발행된 데이터만 넘겨주고 나머지는 무시하는 반면 buffer() 함수는 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행합니다. 따라서 넘치는 데이터 흐름을 제어할 필요가 있을 때 활용합니다.


buffer(count) 함수의 마블 다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


처음 빨간원 노란원 초록원을 발행하면 그것을 모아서 List 객체에 전달해 줍니다. 그 다음 다시 하늘 파랑 자주 원이 생기면 그것을 모아서 한번에 발행해 줍니다 .


buffer() 함수의 원형

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<List<T>> buffer(int count)
cs


기본적으로 스케줄러 없이 현재스레드에서 동작합니다. 입력되는 값을 count에 저장된 수만큼 모아서 List<T>에 한꺼번에 발행합니다.


RxJava의 buffer() 함수는 정말 다양한 오버로딩을 제공합니다. 가장 간단한 것부터 몇 가지를 살펴보도록 하겠습니다. 첫 번째는 위의 마블 다이어그램을 코드로 count 인자에 데이터를 모을 개수를 입력합니다 .


buffer() 함수의 활용 예1

String[] data={"1","2","3","4","5","6"};
CommonUtils.exampleStart();
        
//앞의 3개는 100ms 간격으로 발행
Observable<String> earlySource= Observable.fromArray(data)
            .take(3)
            .zipWith(Observable.interval(100L,TimeUnit.MILLISECONDS), (a,b)->a);
        
//가운데 1 개는 300ms 후에 발행
Observable<String> middleSource= Observable.just(data[3])
            .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS),(a,b)->a);
        
//마지막 2개는 100ms 후에 발행
Observable<String> lastSource=Observable.fromArray(data)
            .takeLast(2)
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),(a,b)->a);
        
//3개씩 모아서 1번에 발행함
Observable<List<String>> source= Observable.concat(earlySource,middleSource,lastSource)
            .buffer(3);
        
        
source.subscribe(Log::it);
CommonUtils.sleep(1000);




마블 다이어그램 처럼 데이터를 발행하기 위해서 interval(), timer() 함수와 concat() 함수를 호출했습니다. buffer(3)는 데이터를 3개씩 모았다가 List<String> 에 채운후 한꺼번에 발행해 줍니다.


실행결과

RxComputationThreadPool-1 | 573 | value = [1, 2, 3]

RxComputationThreadPool-3 | 1080 | value = [4, 5, 6]




두번쨰 buffer() 함수에는 모으거나(count) 무시(skip)할 데이터 개수를 입력합니다.


buffer(count, skip) 함수의 마블 다이어 그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


skip 변수는 count 보다 값이 커야합니다. 만약에 count가 2이고 skip이 3이면 2개 데이터를 모으고 1개는 스킵합니다.


buffer() 함수의 활용 예 2

//3개씩 모아서 1번에 발행함
Observable<List<String>> source= Observable.concat(earlySource,middleSource,lastSource).buffer(2,3);



바뀐점은 count 값을 2로 줄이고 skip을 3으로 설정한 것입니다. 2개의 데이터가 발행되면 List<String>에 채워 발행하고 발행되는 데이터는 1개는 건너뜁니다.


실행결과

RxComputationThreadPool-1 | 481 | value = [1, 2]

RxComputationThreadPool-3 | 990 | value = [4, 5]



Observable에서 onNext 이벤트가 발생하면 내부데이터는 3개가 아니라 2개의 값을 모아 바로 List<String>에 채운 후 구독자에게 발행합니다 . buffer() 함수에는 시간 간격으로 데이터를 모으는 오버로딩도 제공합니다 .