본문 바로가기
Rx

RxJava 테스팅과 Flowable-4( Flowable 클래스)

by 봄석 2019. 1. 1.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




Flowable 클래스

고급 주제인 배압과 Flowable에 대해서 알아보겠습니다.


Flowable은 RxJava 2.x에 새로 도입된 클래스입니다. RxJava에는 Observable 클래스의 수많은 변형이 존재하는데 Flowable은 배압backpressure 이슈를 위해 별도 분리한 클래스 입니다.

참고로 기존의 RxJava 1.x 에는 Observable이 배압 관련된 함수를 포함했었습니다.


Flowable 클래스를 도입한 이유는 Observable 클래스의 성능을 향상시키기 위해서 입니다.

기존의 Observable 클래스는 배압에 관한 처리가 불필요한 경우에는 초기 로딩 때문에 약간의 오버헤드가 있었습니다. 하지만 RxJava 2 의 Observable 클래스에는 배압으로 인한 성능 오버헤드가 사라졌습니다.


Flowable 클래스의 활용은 기본적으로 Observable과 동일합니다. 또한 Flowable에서 Observable 로 변환하는 것이나 Observable에서 Flowable로 변환하는 것도 어렵지 않습니다.

변환을 위해 toObservable() 과 toFlowable() 함수를 제공합니다 

지금까지 배웠던 Observable과 동일하게 사용하면 됩니다.


다음은 RxJava 2.x 의 GitHub에 공개된 기본 예제입니다. 

Observable처럼 Flowable에 입력된 'Hello world ' 라는 문자열을 출력합니다.

import io.reactivex.functions.Consumer;

Flowable.just("Hello world")
  .subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

Flowable 클래스를 Observable클래스로 바꿔도 똑같이 동작합니다 .subscribe 부분을 람다식으로 표현하면 아래처럼 단순화 할 수 있습니다.

package rxjava.examples;
 
import io.reactivex.*;
 
public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}


다음은 같은 곳에 있는 비동기 프로그래밍의 예입니다. subscribeOn() 과 observeOn() 함수도 동일하게 사용할 수 있습니다.



import io.reactivex.schedulers.Schedulers;
 
Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);
 

Thread.sleep(2000); // <--- wait for the flow to finish

지금까지는 Observable과 모든것이 동일합니다 그렇다면 다른점은 무었일까요 ?? 

바로 배압입니다.


Observable과 Flowable 선택 기준

RxJava 2.x 위키에는 Observable Flowable의 선택 기준이 공개되어 있습니다. 

먼저 Observable 을 사용해야 할 때는 다음과 같습니다


- 최대 1,000개 미만의 데이터 흐름. 예를들어 응용프로그램에서 OOME(Out of Memory Exception)이 발생할 확률 이 거의 없는경우입니다.

- 마우스 이벤트나 터치 이벤트를 다루는 GUI 프로그래밍. 이 경우에는 배압의 이슈가 거의 발생하지 않습니다. Observable로는 초당 1,000회 이하의 이벤트를 다루는데 이때 smaple() 이나 debounce() 같은 흐름제어 함수를 사용하면 됩니다.

- 데이터 흐름이 본질적으로 동기 방식이지만 프로젝트에서 사용하는 플랫폼이 자바 Stream API나 그에 준하는 기능을 제공하지 않을 때. Observable은 보통 Flowable과 비교했을 때 성능 오버헤드가 낮습니다.



한편 Observable 보다 Flowable을 선택해야 할 때는 다음과 같습니다.


- 특정 방식으로 생성된 10,000개 이상의 데이터를 처리하는 경우 . 이때 메소드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 합니다.

-디스크에서 파일을 읽어 들일 경우. 본직적으로 블로킹I/O 방식을 활용하고 내가 원하는 만큼 가져오는 방식(pull-based) 으로 처리해야하기 때문입니다. 예를들면 특정 단위로 잘라 몇 행씩 가져오도록 제어할 수 있습니다 .

- JDBC를 활용해 데이터 베이스의 쿼리 결과를 가져오는 경우. 블로킹 방식을 이용하므로 ResultSet.next() 를 호출하는 방식으로 쿼리의 결과를 읽어오도록 할 수 있습니다.

- 네트워크 I/O를 실행하는 경우. 네트워크나 프로토콜을 통해 서버에서 가져오길 원하는 만큼의 데이터양을 요청할 수 있을 때입니다.

-다수의 블로킹 방식을 사용하거나 가져오는 방식(pull-based)의 데이터 소스가 미래에는 논 블로킹(non-blocking) 방식의 리액티브 API나 드라이버를 제공할 수도 있는 경우입니다.




디스크에서 파일 읽기, JDBC를 활용한 데이터베이스 쿼리하기, 네트워크 I/O 등은 차가운 Observable에 해당합니다. 보통 차가운 Observable 은 결과 데이터를 처리할 수 있는 만큼 조금씩 가져오는 것이 아니라 한번에 모두 가져옵니다. 따라서  이 경우 반드시 Flowable을 활용해야 하는 것은 아닙니다. 업스트림에서 발행하는 데이터의 속도와 다운스트림에서 처리하는 속도의 차이가 작다면 Observable을 활용해도 됩니다.


즉, 데이터 발행과 처리속도가 차이 나더라도, sample(), throttle(), debounce() 같은 흐름 제어 함수를 활용하여 해결하는 것이 좋습니다. 이러한 함수로 해결하기 어려운때 Flowable 클래스로 전환하면 됩니다.




backpressure?

우리말로 번역하면 ‘등 뒤에서 떠밀리는 압박’ 정도가 될 듯 하다.

이런 상황을 가정해보자. 콘서트장을 사람들이 가득 메웠다. 콘서트장에 들어오려는 사람들은 저글링 개떼처럼 밀려드는데 나가는 사람은 별로 없다. 콘서트장 출입구를 통제하는 요원이 없다면? 콘서트장이 터지던지 안에 있던 사람들이 짜부러지던지 아무튼 대형 사고가 발생할거다.

publish / subscribe 모델에서도 이런 비극적인 시나리오가 발생할 수 있다. 생산자는 미친듯이 element 를 생산해 내는데 소비자가 처리하는 속도가 이를 따라가지 못한다면

  1. busy waiting 또는
  2. out of memory exception 이 발생할 것이다.

‘등 뒤에서 떠밀리는 압박’ 에 대한 흐름제어를 위한 버퍼가 바로 backpressure buffer 다. 버퍼가 가득 차면 어차피 소비자는 element 를 처리할 여유가 없는 상태이므로 더 이상 publish 를 하지 않는다.

기존에 없던 개념이 새로 추가된 것은 아니다. 기존 rxJava 1.xx 의 경우 Observable 에 backpressure buffer 를 직접 생성해 주면 사용이 가능하다. 허나 rxJava 개발자는 초보자들이 미처 알아채지 못하는 영역에서 기대하지 않는 동작이 일어날 가능성이 있다며 Flowable 을 추가하였다.

출처 - https://01010011.blog/2017/03/29/rxjava-flowable-%EA%B3%BC-observable-%EC%9D%98-%EC%B0%A8%EC%9D%B4/

댓글