본문 바로가기
Rx

리액티브연산자[변환 연산자]-12(concatMap함수,switchMap함수)

by 봄석 2018. 12. 26.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




리액티브 연산자(함수) 분류 - 카테고리

연산자 종류 

연산자 함수 

  생성 연산자 


 just(), fromXXX(), create(), interval(), ragne(), timer(),   intervalRange(), defer(), repeat() 


  변환 연산자


 map(),flatMap(), concatMap(), switchMap() , groupBy(),

 scan(), buffer(), window()


  필터 연산자


 filter(), take(), skip(), distinct()


  결합 연산자

 

 zip(), combineLatest(), merge(), concat()

 

  조건 연산자


 amb(), takeUtil(), skipUtil(), all()

 

  에러 처리 연산자 


 onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()


  기타 연산자 

 

 subscribe(), subscribeOn(), observeOn(), reduce(), count()

 




변환 연산자- concatMap()함수

concatMap() 함수는 flatMap() 함수와 매우 비슷합니다. flatMap()는 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리결과가 먼저 출력 될 수도있습니다. 이를 인터리빙(끼어들기)라고 합니다.


하지만 concatMap() 함수는 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장해줍니다.

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


concatMap() 함수의 활용 예

CommonUtils.exampleStart();
        
String[] balls ={"1","3","5"};
Observable<String> source= Observable.interval(100L,TimeUnit.MILLISECONDS)
            .map(Long::intValue)
            .map(idx->balls[idx])
            .take(balls.length)
            .concatMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS)
               .map(notUsed->ball+"<>")
               .take(2)
            );

source.subscribe(Log::it);
CommonUtils.sleep(2000);



먼저 시간을 측정하기 위해 CommonUtils.exmapleStart() 를 호출합니다


그리고 마블 다이어 그램에 등장하는 빨간색(1),초록색(3),파란색(5)값을 배열에 입력합니다.

이제 Observable 생성을 시작합니다. 100ms 간격으로 interval() 함수를 호출한 후 0부터 발생하는 Long 객체값을 Integer객체 값으로 변경합니다. (Long::intValue)

그리고 map() 함수를 이용해 숫자(int)를 '1', '3', '5' 문자열로 변경합니다.

take(balls.length)으로 3개만 가져오기 때문에 배열의 크기를 초과하지 않습니다.


입력인 원은 100ms 간격으로 발생하지만 출력인 다이아몬드(<>)는 200ms 간격으로 발생하기 떄문에 입력과 출력의 순서가 역전될 수 있습니다. 그것을 concatMap() 함수로 잡아줍니다.

concatMap() 함수의 인자로 포함된 람다 표현식에서는 다이아몬드를 2개 발행하기 떄문에 take(2) 호출했습니다.


if ,for ,while 등의 제어문이 없어져  책 읽듯이 코드를 쭉 설명할 수 있는 형태로 코드의 간결성이 높아졌습니다.


실행결과는 아래와 같습니다.

RxComputationThreadPool-2 | 604 | value = 1<>
RxComputationThreadPool-2 | 804 | value = 1<>
RxComputationThreadPool-3 | 1005 | value = 3<>
RxComputationThreadPool-3 | 1206 | value = 3<>
RxComputationThreadPool-4 | 1407 | value = 5<>

RxComputationThreadPool-4 | 1608 | value = 5<>


예상한대로 '1', '3', '5' 원이 100ms 간격으로 발생하고 각 원과 연결된 다이아몬드가 200ms 간격으로 발생하면서도 순서가 보장됩니다. 실행시간은 총 1608ms가 소요됐습니다.


concatMap()을 faltMap() 으로 변경한다면 어떤 결과가 나올까요 ? 


concatMap() 함수를 flatMap() 함수로 변경하기

CommonUtils.exampleStart();
String[] balls={"1","3","5"};
Observable<String> source=Observable.interval(100L,TimeUnit.MILLISECONDS)
           .map(Long::intValue)
           .map(idx->balls[idx])
           .take(balls.length)
           .flatMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS)
                .map(notUsed->ball+"<>")
                .take(2)
           );

source.subscribe(Log::it);

 CommonUtils.sleep(2000);


실행결과

RxComputationThreadPool-2 | 386 | value = 1<>
RxComputationThreadPool-3 | 487 | value = 3<>
RxComputationThreadPool-4 | 585 | value = 5<>
RxComputationThreadPool-4 | 585 | value = 1<>
RxComputationThreadPool-3 | 687 | value = 3<>

RxComputationThreadPool-4 | 786 | value = 5<>


총 실행시간은 786ms 입니다. 앞 concatMap()함수의 1608ms와 비교했을때 훨씬 빠릅니다.

그 이유는 인터리빙을 허용하기 때문입니다. 즉, concatMap() 함수의 순서를 보장해 주려면

추가 시간이 필요하다는 사실을 알 수 있습니다.




변환 연산자- switchMap()함수

만약 실무에서 switchMap() 함수를 일반 스레드로 구현하고자 한다면 상당한 도전과제가 될 것입니다. swtichMap() 함수는 concatMap() 함수와는 또 다릅니다. concatMap() 함수가 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해 준다면 switchMap() 함수는 순서를 보장하기 위해 기존에 진행중이던 작업을 바로 중단합니다. 그리고 여러개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을때 사용합니다. 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문입니다.


switchMap()함수의 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


마블 다이어 그램이 복잡합니다. 하지만 시간이 겹치지 않는다는 것만 기억하면 내용을 이해하는 것이 어렵지 않을 것입니다.  발간색 도형의 경우 정상 처리 했지만, 초록색 원을 처리하는 도중에 파란원이 들어왔으므로 초록원의 처리를 중단하고 파란원을 처리합니다.



concatMap()을 switchMap() 으로 변경해보는 예제

CommonUtils.exampleStart();
        
String[] balls={"1","3","5"};
Observable<String> source =Observable.interval(100L, TimeUnit.MILLISECONDS)
            .map(Long::intValue)
            .map(idx->balls[idx])
            .take(balls.length)
            .doOnNext(Log::dt)//중간결과 확인용 함수
            .switchMap(ball-> Observable.interval(200L,TimeUnit.MILLISECONDS)
                    .map(notUsed->ball+"<>")
                    .take(2)
            );

source.subscribe(Log::it);
CommonUtils.sleep(2000);



실행결과

RxComputationThreadPool-1 | 460 | debug = 1
RxComputationThreadPool-1 | 562 | debug = 3
RxComputationThreadPool-1 | 659 | debug = 5
RxComputationThreadPool-4 | 861 | value = 5<>

RxComputationThreadPool-4 | 1060 | value = 5<>


실행결과에서 알수있는점은

첫 번째로 Observable은 데이터를 발행하는 스레드와 그 값을 전달하는 스레드를 다르게 사용한다는 점입니다. 실제로 concatMap(), flatMap(), switchMap() 함수 활용 예의 실행 결과에서 봤던 스레드(RxComputationThreadPool)는 1번 스레드없이 2번,3번,4번 만 발생했습니다.
이유는 1번 스레드가 값을 발행하는데 사용했기 때문입니다. 1번스레드에서는 값을 발행하고, 2번, 3번, 4번 스레드를 통해서 구독자에게 그 값을 전달한 것입니다.

두 번째로 파란색(다이아몬드5)만 출력했다는 점입니다. 원은 100ms 간격으로 발생하고 다이아몬드(<>)는 200ms 간격으로 발행하기 때문에  빨간원(1)의 다이아몬드가 발행되기 전에 파란원(5)이 발행되어버렸습니다. 그러므로 중간에 있는 초록색(3) 다이아몬드의 발행이 취소되고 마지막 파란원(5)를 이용한 처리결과인 다이아몬드5<>만 두번 출력합니다.

여전히 어려울 것으로 생각합니다. 하지만 interval() 함수의 호출 시간 간격과 switchMap() 함수의 호출 시간 간격을 여러 가지 조건으로 바꿔서 테스트 하면 이해하는데 도움이 될 것입니다.

swtichMap() 함수는 센서 등의 값을 얻어와서 동적으로 처리하는데 경우에 매우 유용합니다.
센서값은 중간값 보다는 최종적인 값으로 처리하는  경우가 많기 때문입니다 이럴 때는 
flatMap() 함수로 매번 새로운 결과가 나왔는지 검사하지 말고, 손쉽게 switchMap() 함수를 사용하는것을 추천합니다.



댓글