본문 바로가기
Rx

리액티브연산자[수학 및 기타연산자]- 19(수학, delay, timeInterval 함수)

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




수학함수

RxJava에는 여러 가지 확장 모듈이 존재합니다. 안드로이드에서 사용하는 RxAndroid, 자바네트워킹 라이브러리인 네티와 연동하는 RxNetty, 아파치 HTTP 모듈과 연동하는 RxApacheHttp등이 있습니다. RxJava 1.x 에는 수학 함수들을 모은 RxJavaMath가 있습니다.

단, RxJavaMath는 RxJava 2.x를 지원하지 않으므로 다른 라이브러리를 사용해야 합니다.

이번에는 RxJava 2의 핵심 커미터인 데이빗 카르녹이 만든 RxJava2Extensions 라이브러리를 활용하여 간단한 수학 함수 및 집합 함수의 활용법을 알아보겠습니다.


2017년 8월 현재 RxJava2Extensions의 최신 버전은 0.17.6입니다. 

따라서 build.gradle 파일의 dependencies 부분에 다음 코드를 추가합니다.

1
    implementation "com.github.akarnokd:rxjava2-extensions:0.17.6"
cs


라이브러리를 적용하려면 [프로젝트 이름]을 오른쪽 마우스 버튼으로 선택한 후[Gradle]->[Refresh Gradle Project]를 선택합니다



이제 다룰 count(), max(), sum(), average()의 원형을 살펴보겠습니다



count() 함수는 Observable 에서 발행한 데이터의 개수를 발행합니다. 

결과가 1개값이므로 SIngle<Long>을 발행합니다.


max()와 min()함수는 원형이 같은데 익숙하지 않은 클래스를 볼 수 있습니다.

Publisher클래스는 ObservableEmitter처럼 데이터를 발행해주는 인터페이스입니다.

Flowable은 뒤에서 다룰 것입니다.


sum()과 average() 함수는 각각 sumInt()와 averageDouble() 함수처럼 인자 타입이 함수 이름에

그대로 반영되어 있습니다.



수학함수 활용 예제

Integer[] data={1,2,3,4};
        
//1. count
Single<Long> source = Observable.fromArray(data).count();
source.subscribe(count->Log.i("count is "+count));
        
//2. max() & min()
Flowable.fromArray(data)
.to(MathFlowable::max)
.subscribe(max->Log.i("max is "+ max));
        
Flowable.fromArray(data)
        .to(MathFlowable::min)
        .subscribe(min->Log.i("min is "+ min));
        
//3. sum() & average
Flowable<Integer> flowable=Flowable.fromArray(data)
.to(MathFlowable::sumInt);

flowable.subscribe(sum->Log.i("sum is "+ sum));
        
Flowable<Double> flowable2 =Observable.fromArray(data)
         .toFlowable(BackpressureStrategy.BUFFER)
         .to(MathFlowable::averageDouble);

 flowable2.subscribe(avg->Log.i("avg is "+avg));


count() 함수는 Observable에 바로 count()를 붙여주면 됩니다.

 max()와 min() 함수는 to()함수 안에 MathFlowable 클래스의 max() 함수를 호출해 구현했습니다. 참고로 to()함수는 다른 타입으로 변환해주기 위한 함수라고 생각하면 됩니다.

max()와 min() 함수 모두 Flowable<T>를 리턴하기 때문에 크게 의식하지 않고 코딩하면 됩니다.

sum()과 average() 함수도 동일한 패턴으로 활용합니다.


toFlowable()은 Observable을 Flowable로 변환하는 함수입니다.인자로 전달하는

BackPressureStrategy.BUFFER는 Flowable을 생성할 때 활용하는 배압 전략의 기본 값입니다. 


실행결과

main | value = count is 4
main | value = max is 4
main | value = min is 1
main | value = sum is 10

main | value = avg is 2.5




delay() 함수

RxJava에는 유독 시간을 다루는 함수들이 많습니다. 주기적으로 Observable에서 값을 발행해주는 interval() 함수, 일정 시간이 지난 후 값을 발행해주는 timer() 함수, Callable을 등록해두고 실행을 지연하는 defer() 함수도 있습니다. 뒤에서 또 공부할 buffer(), debounce(), sample(), window() 함수 등도 있습니다.


dealy() 함수도 시간을 인자로 전달받습니다. 앞서 소개한 세 가지 함수가 Observable을 생성하는 역할이라면 delay() 함수는 유틸리티 연산자로서 보조 역할을 합니다.


delay() 함수의 마블 다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


단순하게 인자로 전달받는 time과 시간단위 (ms, m등) 입력받은 Observable의 데이터 발행을 지연시켜주는 역할을 합니다. 


delay()함수의 원형은 다음과 같습니다.

1
2
@SchedulerSupport(SchedulerSupportCOMPUTATION)
public final Observable<T> delay(long delay, TimeUnit unit)
cs

delay() 함수는 인자로 delay 변수와 시간단위(ms)등을 받습니다, 그리고 interval()함수와 마찬가지로 계산 스케줄러에서 실행합니다.


delay()함수의 활용 예

String[] data={"1","7","2","3","4"};
CommonUtils.exampleStart();
Observable<String> source= Observable.fromArray(data).delay(100L,TimeUnit.MILLISECONDS);
source.subscribe(Log::it);

 CommonUtils.sleep(1000);


source Observable은 data 배열에서 값을 발행합니다. 그런데 delay() 함수를 적용하므로 100ms 후부터 값을 발행합니다.


실행결과는 다음과 같습니다.

RxComputationThreadPool-1 | 370 | value = 1
RxComputationThreadPool-1 | 370 | value = 7
RxComputationThreadPool-1 | 370 | value = 2
RxComputationThreadPool-1 | 370 | value = 3

RxComputationThreadPool-1 | 370 | value = 4


delay를 지우면 100ms 만큼 빨라집니다.

main | 288 | value = 1
main | 289 | value = 7
main | 289 | value = 2
main | 289 | value = 3
main | 289 | value = 4

 

delay() 함수를 호출하지 않았기 때문에 메인스레드에서 실행되고,100ms만큼 빨라집니다.


timeInterval() 함수

timeInterval() 함수는 조금 특이한 함수입니다. 어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지를 알려줍니다.  


timeInterval() 함수의 마블 다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html



timeInterval() 함수의 원형은 다음과 같습니다

@ShcedulerSupport(SchedulerSupprot.NONE)

public final Observable<Timed<T>> timeInterval()


Timed<T> 객체에는 다음처럼 시간을 얻어오거나 Observable의 데이터를 얻을 수 있는 메소드를 제공할 수 있습니다.

public T value()
public TimeUnit unit()
public long time()
public long time(TimeUnit unit)




timeInterval() 함수의 활용 예

String[] data={"1","3","7"};
        
CommonUtils.exampleStart();
Observable<Timed<String>> source=Observable.fromArray(data)
            .delay(item->{
                CommonUtils.doSomething();
                return Observable.just(item);
            })
            .timeInterval();

source.subscribe(Log::it);

 CommonUtils.sleep(1000);


발행되는 값 사이의 시간 간격을 알기 위해 무작위로 스레드에 sleep() 메소드를 실행했습니다.

CommonUtils.doSomething()은 100ms 미만의 무작위로 스레드에 sleep()메소드를 실행합니다.


doSomething() 함수는 common 패키지의 CommonUtils 클래스에 구현되어있고 아래와같습니다.

public static void doSomething() { 
        try { 
            Thread.sleep(new Random().nextInt(100));
        } catch (InterruptedException e) { 
            e.printStackTrace();
        }        

    }



실행결과는 아래와 같습니다. 무작위 수만큼 sleep() 메소드를 실행하기 위해 실제실행한 시간을 같이표기했습니다

main | 384 | value = Timed[time=17, unit=MILLISECONDS, value=1]
main | 472 | value = Timed[time=88, unit=MILLISECONDS, value=3]

main | 530 | value = Timed[time=58, unit=MILLISECONDS, value=7]


첫 번째 값인 '1'과 두번째 값이 '3'의 발행시간의 차이는 88로 472-384=88과 일치합니다.

참고로 Timed 객체의 메소드를 호출하여 time과 unit의 출력양식을 조절할 수 있습니다.


Timed[time=]은 timeInterval로 얻은 값입니다

댓글