본문 바로가기
Rx

리액티브연산자[결합 연산자]-15(combineLatest함수)

by 봄석 2018. 12. 27.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




리액티브 연산자(함수) 분류 - 카테고리

연산자 종류 

연산자 함수 

  생성 연산자 


 just(), fromXXX(), create(), interval(), ragne(), timer(),   intervalRange(), defer(), repeat() 


  변환 연산자


 map(),flatMap(), concatMap(), switchMap() , groupBy(),

 scan(), buffer(), window()


  필터 연산자


 filter(), take(), skip(), distinct()


  결합 연산자

 

 zip(), combineLatest(), merge(), concat()

 

  조건 연산자


 amb(), takeUtil(), skipUtil(), all()

 

  에러 처리 연산자 


 onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()


  기타 연산자 

 

 subscribe(), subscribeOn(), observeOn(), reduce(), count()

 





결합 연산자란?

결합 연산자는 다수의 Observable을 하나로 합하는 방법을 제공합니다.

앞서 flatMap() 함수나 groupBy() 함수등은 1개의 Observable을 여러개로 확장해주는 반면,

결합 연산자들은 여러개의 Observable을 내가 원하는 Observable로 결합해 줍니다.


결합연산자 종류

zip() - 입력 Observable에서 데이터를 모두 새로 발행했을때 그것을 합해줍니다.

combineLatest() - 처음에 각 Observable에서 데이터를 발행한 후에는 어디에서 값을 발행하든 최신값으로 갱신합니다.

marge() - 최신 데이터 여부와 상관없이 각 Observable 에서 발행하는 데이터를 그대로 출력합니다.

concat() - 입력된 Observable을 Observable 단위로 이어 붙여줍니다.




결합연산자 - combineLatest()함수

combineLatest() 함수는 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경 되었을 때 갱신해주는 함수입니다. 마지막 인자로 combiner가 들어가는데 그것이 각 Observable을 결합하여 어던 결과를 만들어주는 역할을 합니다. zip() 함수의 zipper 인자와 동일합니다.


예를들어 첫 번째 Observable과 두 번째 Observable을 결합하는 기능을 만든다고 하면 첫 번째 Observable의 값 혹은 두 번째 Observable의 값이 변경되었을 때 그 값을 자동으로 갱신해 줍니다.


combineLatest() 함수 마블다이어그램

출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


위의 마블 다이어그램에서 첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable 의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않습니다. 하지만 두 Observable 모두 값을 발행하면 그때는 결괏값이 나옵니다. 그 다음부터는 둘 중 어떤 것이 갱신되던지 최신 결괏값을 보여줍니다. (이 부분이 zip()함수와 다른점 입니다)


combineLatest() 함수의 원형

@Scheduler(SchedulerSupport.NONE)
public static <T1,T2,R> Observable<R> combineLatest(
    ObservableSource<? extends T1> source1,
    ObservableSource<? extends T1> source2,

    BiFunction<? super T1,? super T2,? extends R> combiner)


zip() 함수처럼 결합하고자 하는 첫 번째와 두 번째 Observable을 넣고 마지막으로 그것을 결합하는 combiner() 함수를 넣어주면 됩니다. 입력할 수 있는 Observable 인자의 개수는 최대 9개 입니다.


combineLatest() 함수의 활용 예

String[] data1={"6","7","4","2"};
String[] data2={"DIAMOND","STAR","PENTAGON"};
        
Observable<String> source=Observable.combineLatest(
Observable.fromArray(data1).zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
               (shape,notUsed)->Shape.getColor(shape)),
    Observable.fromArray(data2).zipWith(Observable.interval(150L,200L,TimeUnit.MILLISECONDS),
               (shape,notUsed)->Shape.getSuffix(shape)),
                (v1,v2)->v1+v2);

source.subscribe(Log::i);
CommonUtils.sleep(1000);



첫번째 Observable 에서는 색상(Color)를 얻어오고 두 번째 Observable 에서는 도형 모양에 대한 접미사(suffix)를 얻어옵니다. 첫 번째 Observable은 100ms 간격으로 값을 발행하고 두 번째 Observable은 최초에 150ms를 쉬고 200ms 간격으로 값을 발행합니다.

zip() 함수와는 다르게 어느 1개의 값만 변경되어도 결과가 발행 됩니다.

시간간격을 변경해보면 재미있는 결과를 얻을 수 있습니다.



실습예제 : 리액티브 연산자로 합계 구하기

conbineLatest() 함수의 대표적인 활용 예는 마이크로소프트 엑셀의 셀 시뮬레이션입니다.

예를 들어 어떤 셀에 '=A+B'라는 수식을 넣었다면 A셀과 B셀의 어떤 값이 변경되든 즉시 새로운 합의 결과를 표시합니다.


리액티브연산자로 합계 구하기 예

public class ReactiveSum {

    public static void main(String args[]){
        new ReactiveSum().run();
    }
    public void run(){
        ConnectableObservable<String> source= userInput();
        Observable<Integer> a= source
                .filter(str->str.startsWith("a:"))
                .map(str->str.replace("a:", ""))
                .map(Integer::parseInt);
        Observable<Integer> b= source
                .filter(str->str.startsWith("b:"))
                .map(str->str.replace("b:", ""))
                .map(Integer::parseInt);
        Observable.combineLatest(
                a.startWith(0),
                b.startWith(0),
                (x,y)->x+y)
                    .subscribe(res->System.out.println("Result : "+res));
        source.connect();
    }
    public ConnectableObservable<String> userInput(){
        return Observable.create((ObservableEmitter<String> emitter)->{
            Scanner in=new Scanner(System.in);
            while(true){
                System.out.println("Input : ");
                String line=in.nextLine();
                emitter.onNext(line);
                
                if(line.indexOf("exit")>=0){
                    in.close();
                    break;
                }
            }
        }).publish();
    }
}


먼저 userInput()메소드에서 시작합니다.

Observable.create()를 활용하여 사용자 콘솔에서 값을 받아옵니다. ' Input: ' 프롬프트를 출력하고 사용자가 입력한 값을 그대로 발행합니다. 'exit'를 입력하면 프로그램을 종료합니다.


메인 함수인 run()의 source 변수는 ConnectableObservable 클래스로 userInput() 함수에서 Observable을 생성합니다. 그다음 첫 번째 Observable인 a는 source Observable에서 'a:'로 입력한 경우에만 값을 추출하여 Integer로 변환합니다. 두 번째 observable인 b도 a와 동일하게 동작하지만 변수를 구별학 ㅣ위해 'b:'로 입력한 경우에만 값을 추출하여 Integer로 변환합니다.


이제 2개의 값을 combineLatest() 함수로 결합합니다. a Observable과 b Observable모두 startWith(0)함수를 추가했습니다. startWith(0) 함수를 추가하지 않고 코드를 실행해보면 값을 입력해도 결과를 출력하지 않습니다. 그 이유는 combineLatest() 함수에서 처음 값을 발행하려면 입력 a와 b 모두에서 값을 발행해야 하기 때문입니다.


우리는 값을 입력했을때 바로 출력하기를 원하므로 startWith(0) 함수를 호출하여 0으로 초기화 했습니다. 이처럼 사용자 입력을 받을 때는 startWith() 함수를 유용하게 활용할 수 있습니다.


마지막으로 connect() 함수를 호출하여 데이터 흐름을 시작합니다.


실행결과

a:300
Result : 800
Input : 
b:200
Result : 500
Input : 
a:200
Result : 400
Input : 
b:200
Result : 400
Input : 
b:1000
Result : 1200
Input : 

exit










--------------------------------------------------------------------------------------------------------

startWith 알아보기


연속해서 서로 다른 Observable를 수행해야 한다고 가정해보겠습니다. 가장 심플한 방법은 먼저 수행되어야 할 Observable을 수행 후 onSuccess(또는 onNext)에서 결과를 받은 후 연속해서 그 다음 Observable를 수행 하는 것입니다. 이런 방식으로 코드를 만들면 코드라인이 길어지고 복잡도 또한 증가하게됩니다. startWith 연산자를 사용하면 보다 깔끔한 코드로 원하는 작업을 수행 할 수 있습니다.


@Test
public void observableStartWithTest() throws Exception {
    final List<String> items = Lists.newArrayList("one", "two", "three", "four");
    Observable<String> observableRet = Observable.fromIterable(items);
    observableRet.startWith("First Item.")
          .subscribe(ret -> TestLog.log(ret));
}

 


로그

> 1537536610914 [main] First Item.

> 1537536610914 [main] one

> 1537536610914 [main] two

> 1537536610914 [main] three

> 1537536610914 [main] four


첫 번째 아이템을 먼저 방출하고 그 다음에 연속해서 items의 배열에 있는 아이템들을 순차적으로 방출하여 처리하게 됩니다.



출처: http://softwaree.tistory.com/32 [Owl Life]

댓글