본문 바로가기
Rx

RxJava 테스팅과 Flowable-5( Flowable을 활용한 배압 이슈 대응하기)

by 봄석 2019. 1. 1.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




Flowable을 활용한 배압 이슈 대응하기

Flowable에서 추가로 제공하는 배압 이슈에 대응하는 함수는 다음과 같습니다 .


- onBackPressureBuffer() : 배압 이슈가 발생했을때 별도의 버퍼에 저장합니다 .

Flowable 클래스는 기본적으로 128개의 버퍼가 있습니다.


- onBackPressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시합니다.

- onBackPressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지합니다.


아래 예는 간단하게 배압 상황을 만든 것입니다

makeBackpressure() 예

CommonUtils.exampleStart();
PublishSubject<Integer> subject =PublishSubject.create();
subject.observeOn(Schedulers.computation())
.subscribe(data->{
            CommonUtils.sleep(100);
            Log.it(data);
},err->Log.e(err.toString()));
        
//뜨거운 Observable로 50,000,000 개의 데이터를 연속으로 발행함
for(int i=0;i<50_000_000;i++){
        subject.onNext(i);
}
subject.onComplete();



PublishSubejct 객체를 생성한 후 결과는 계산 스케줄러로 전달합니다. subscribe() 함수 호출 후 Subject 객체가 발행한 데이터는 100ms 후 로그를 출력합니다.


한편 PublishSubject 객체는 뜨거운 Observable 입니다. 데이터를 발행하는 속도와 데이터를 처리하는 속도의 차이가 발생했을 때 어떠한 보호장치도 제공하지 않습니다.


실행결과

RxComputationThreadPool-1 | 412 | value = 0
RxComputationThreadPool-1 | 1279 | value = 1
RxComputationThreadPool-1 | 3023 | value = 2
RxComputationThreadPool-1 | 3343 | value = 3
RxComputationThreadPool-1 | 6857 | value = 4
RxComputationThreadPool-1 | 7361 | value = 5
RxComputationThreadPool-1 | 8324 | value = 6
RxComputationThreadPool-1 | 13419 | value = 7
RxComputationThreadPool-1 | 13519 | value = 8
RxComputationThreadPool-1 | 14133 | value = 9
RxComputationThreadPool-1 | 14234 | value = 10

RxComputationThreadPool-1 | 20989 | value = 11



observeOn() 함수에서 지정한 것처럼 계산 스케줄러에서 결과 데이터를 출력합니다.

그런데 코드를 작성하면서 의도했던 100ms 간격보다 상당히 느리게 데이터를 처리합니다 .

전체실행시간도 약 20초 가량입니다.


윈도우의 작업관리자를 살펴보면 메모리 사용량이 빠르게 상승하는 것을 알 수 있습니다. 데이터는 PublishSubject 객체에서 매우 빠르게 발행되는데 데이터는 겨우 11개만 처리되었습니다. 만약 발생하는 데이터의 개수가 훨씬 많아지면 JVM은 곳 Out of Memory 예외를 발생하고 실행을 중단할 것입니다.

이러한 배압 이슈를 처리할 때 Flowable을 사용합니다.


배압 이슈의 첫 번째 대응 방법은 버퍼 만들기 입니다. Flowable의 onBackPressureBuffer() 함수에는 아래와 같은 오버로딩이 있습니다.

public final Flowable<T> onBackpreesureBuffer()
public final Flowable<T> onBackpreesureBuffer(boolean delayError)
public final Flowable<T> onBackpreesureBuffer(int capacity, Action onOverflow)
public final Flowable<T> onBackpreesureBuffer(long capacity, Action onOverflow,
     BackpressureOberflowStrategy overflowStrategy)



첫 번째 오버로딩은 기본값(128개)의 버퍼 개수가 있습니다.


두 번째 오버로딩은 delayError 여부를 지정할 수 있습니다. true 면 예외가 발생했을 때 버퍼에 쌓인 데이터를 모두 처리할 때까지 예외를 던지지 않고,false면 예외가 발생했을 때 바로 다운스트림에 예외를 던집니다. delayError의 기본값은 false입니다.


세 번째 오버로딩은 capacity 인자로 버퍼의 개수를 지정하고 onOverflow 인자에 버퍼가 넘쳤을 때 실행할 동작을 지정합니다.


마지막 오버로딩은 버퍼가 가득 찼을때 추가로 실행하는 전략을 지정할 수 있습니다.RxJava에서 지정할 수 있는 전략은 아래와 같습니다.

 - ERROR : MissingBackpressureException 예외를 던지고 데이터 흐름을 중단합니다.

 - DROP_LATEST : 버퍼에 쌓여 있는 최근 값을 제거합니다.

 - DROP_OLDEST : 버퍼에 쌓여 있는 가장 오래된 값을 제거합니다.



onBackpressureBuffer() 함수의 활용 예

CommonUtils.exampleStart();
Flowable.range(1,  50_000_000)
     .onBackpressureBuffer(128,()->{},BackpressureOverflowStrategy.DROP_OLDEST)
        .observeOn(Schedulers.computation())
        .subscribe(data->{
            CommonUtils.sleep(100);
            Log.it(data);
        },err->Log.e(err.toString()));


Flowable.range() 함수를 활용하여 동일한 개수의 데이터를 발행합니다. 그리고 128개의 버퍼를 생성한 후 버퍼의 넘침(overflow)이 발생하면 버퍼의 가장 오래된 데이터를 버리도록 전략을 설정합니다 .


실행결과

RxComputationThreadPool-1 | 329 | value = 1
RxComputationThreadPool-1 | 430 | value = 2
RxComputationThreadPool-1 | 531 | value = 3
RxComputationThreadPool-1 | 631 | value = 4
RxComputationThreadPool-1 | 732 | value = 5
RxComputationThreadPool-1 | 833 | value = 6
RxComputationThreadPool-1 | 934 | value = 7
RxComputationThreadPool-1 | 1034 | value = 8
RxComputationThreadPool-1 | 1135 | value = 9
RxComputationThreadPool-1 | 1236 | value = 10
RxComputationThreadPool-1 | 1337 | value = 11
RxComputationThreadPool-1 | 1437 | value = 12
RxComputationThreadPool-1 | 1538 | value = 13

RxComputationThreadPool-1 | 1639 | value = 14


버퍼 활용하여 데이터를 훨씬 바로게 다운 스트림으로 발행하는 것을 알 수 있습니다 .

거의 10배의 속도입니다. 데이터의 발행 속도가 워낙 빠르기 때문에 128개의 버퍼로 모두 대응하는 것은 무리입니다 .



배압 이슈에 대응하는 두 번째 방법은 onBackpressureDrop() 라는 함수를 활용하는 것입니다

onBackpreesureBuffer() 함수가 버퍼를 만들어 쌓아 두었다가 처리하는 방식이라면, onBackpressureDrop() 함수는 버퍼가 가득 찼을 때 이후 데이터를 그냥 무시합니다.


onBackpressureDrop () 예

CommonUtils.exampleStart();
        Flowable.range(1,  50_000_000)
        .onBackpressureDrop()
        .observeOn(Schedulers.computation())
        .subscribe(data->{
            CommonUtils.sleep(100);
            Log.it(data);
        },err->Log.e(err.toString()));
        

        CommonUtils.sleep(20_000);


살펴볼 특이 사항은 마지막 20초 간 sleep() 함수를 실행했던 점입니다.
onBackpressureDrop() 함수를 사용하면 버퍼에 128개의 데이터가 가득 찼을 때 데이터
계산 스케줄러에서 출력하기도 전에 예제가 끝납니다. 따라서 계산 스케줄러에서 데이터를 다운스트림으로 발행할 수 있도록 충분한 시간(여기서는 20초) 을 기다려주어야 합니다.
UI프로그래밍에서는 이와같은 기다림이 필요하지 않습니다.


RxComputationThreadPool-1 | 12137 | value = 118
RxComputationThreadPool-1 | 12238 | value = 119
RxComputationThreadPool-1 | 12339 | value = 120
RxComputationThreadPool-1 | 12441 | value = 121
RxComputationThreadPool-1 | 12542 | value = 122
RxComputationThreadPool-1 | 12642 | value = 123
RxComputationThreadPool-1 | 12743 | value = 124
RxComputationThreadPool-1 | 12844 | value = 125
RxComputationThreadPool-1 | 12944 | value = 126
RxComputationThreadPool-1 | 13045 | value = 127

RxComputationThreadPool-1 | 13146 | value = 128

기본 버퍼 개수만큼만 버퍼에 저장하고 나머지는 모두 무시했기 때문에 128을 출력합니다.


배압 이슈에 대응하는 마지막 방법은 onBackpressureLatest() 라는 함수를 활용하는 것입니다

onBackpressureBuffer()와 onBackpressureDrop() 함수의 기능을 섞은 것으로 마지막 값을 발행할 수 있도록 해줍니다


onBackpressrueLatest() 활용 예

CommonUtils.exampleStart();
        Flowable.range(1,  50_000_000)
        .onBackpressureLatest()
        .observeOn(Schedulers.computation())
        .subscribe(data->{
            CommonUtils.sleep(100);
            Log.it(data);
        },err->Log.e(err.toString()));
        

CommonUtils.sleep(20_000);


마지막 데이터를 다운 스트림으로 발행하여 확인할 수 있습니다

RxComputationThreadPool-1 | 10570 | value = 102
RxComputationThreadPool-1 | 10671 | value = 103
RxComputationThreadPool-1 | 10772 | value = 104
RxComputationThreadPool-1 | 10873 | value = 105
RxComputationThreadPool-1 | 10974 | value = 106
RxComputationThreadPool-1 | 11074 | value = 107
RxComputationThreadPool-1 | 11175 | value = 108
RxComputationThreadPool-1 | 11276 | value = 109
RxComputationThreadPool-1 | 11376 | value = 110
RxComputationThreadPool-1 | 11477 | value = 111
RxComputationThreadPool-1 | 11578 | value = 112
RxComputationThreadPool-1 | 11678 | value = 113
RxComputationThreadPool-1 | 11779 | value = 114
RxComputationThreadPool-1 | 11880 | value = 115
RxComputationThreadPool-1 | 11982 | value = 116
RxComputationThreadPool-1 | 12082 | value = 117
RxComputationThreadPool-1 | 12183 | value = 118
RxComputationThreadPool-1 | 12286 | value = 119
RxComputationThreadPool-1 | 12387 | value = 120
RxComputationThreadPool-1 | 12487 | value = 121
RxComputationThreadPool-1 | 12588 | value = 122
RxComputationThreadPool-1 | 12689 | value = 123
RxComputationThreadPool-1 | 12790 | value = 124
RxComputationThreadPool-1 | 12890 | value = 125
RxComputationThreadPool-1 | 12991 | value = 126
RxComputationThreadPool-1 | 13093 | value = 127
RxComputationThreadPool-1 | 13193 | value = 128

RxComputationThreadPool-1 | 13295 | value = 50000000



댓글