Rx

RxJava 흐름제어 -4(window 함수)

봄석 2019. 1. 1. 00:32

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info



window() 함수

window() 함수는 groupBy() 함수와 개념적으로 비슷합니다. groupBy() 함수는 특정 조건에 맞는 입력값들을 그룹화해 별도의 Observable을 병렬로 만듭니다. 반면 window() 함수throttleFirst() 나 sample() 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있습니다. 흐름 제어 기능에 groupBy() 함수와 비슷한 별도의 Observable 분리 기능을 모두 갖추었다고 생각하면 됩니다. 


window() 함수의 마블 다이어그램


출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


window() 함수는 count를 인자로 받습니다. 위 마블다이어 그램은 count 3이라는 값을 받았으며 , 앞으로 데이터 3개가 발행될 때마다 새로운 Observable을 생성하겠다는 뜻입니다.


window() 함수의 원형

@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Observable<T> window(long count)



->> groupBy()와 window() 함수는 근본 목적이 다릅니다. groupBy() 함수는 Observable<GroupedObservable<K, T>>을 리턴하기 때문에 병렬적으로 여러 Observable을 생성합니다. 반면 window() 함수는 Observable<Observable<T>>를 리턴하여 차례로 여러  Observable을 만듭니다.


window() 함수는 현재 스레드를 그대로 활용합니다. 왜그럴까요? window() 함수의 다른 변형을 비교해보면 조금 힌트가 될 것 같습니다.

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<Observable<T> window(

    long timespan, long timeskip, TimeUnit unit) {}



앞 함수 원형에는 timespan 이라는 시간 동안 입력된 값 중에서 일부를 무시하는 기능을 포함합니다. 첫 번째 소개한 window(count) 함수 원형은 사실 입력된 값을 그대로 발행하기 때문에 비동기작업이라 보기 어렵습니다. 하지만 두 번재 함수 원형인 window(timespan, timeskip, unit)은 어떤 필터링 작업을 해줘야 하기 때문에 계산 스케줄러를 활용하게 됩니다. buffer() 함수가 현재 스레드에서 실행되는 이유와 동일합니다 .


window() 함수의 활용 예

String[] data={"1","2","3","4","5","6"};
CommonUtils.exampleStart();
        
//앞의 3개는 100ms 간격으로 발행
Observable<String> earlySource= Observable.fromArray(data)
            .take(3)
            .zipWith(Observable.interval(100L,TimeUnit.MILLISECONDS), (a,b)->a);
        
//가운데 1 개는 300ms 후에 발행
Observable<String> middleSource= Observable.just(data[3])
            .zipWith(Observable.timer(300L, TimeUnit.MILLISECONDS),(a,b)->a);
        
//마지막 2개는 100ms 후에 발행
Observable<String> lastSource=Observable.fromArray(data)
            .takeLast(2)
            .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),(a,b)->a);
        
//데이터는 3개씩 모아서 새로운 Observable을 생성함
Observable<Observable<String>> source= Observable.concat(earlySource, middleSource, lastSource)
            .window(3);
        
        
source.subscribe(observable->{
        Log.dt("new Observable started!!");
        observable.subscribe(Log::it);
});
CommonUtils.sleep(1000);



window() 함수에 count 인자로 3을 넣었습니다. 처음에 Observable을 생성하고 3개의 데이터를 전달받으면 새로운 Observable을 다시 생성하여 값을 발행합니다.


window() 함수의 리턴 타입이 Observable<Observable<T>> 이므로 subscribe() 함수의 람다표현식 인자로 Observable이 들어옵니다.  

새로운 Observable이 생성될 때마다 "new Observable started!!" 라는 문자열을 출력했습니다 .

그 다음 각 Observable에서 발행되는 값을 그대로 출력합니다 .