본문 바로가기
Rx

리액티브연산자[결합 연산자]-14(zip함수)

by 봄석 2018. 12. 26.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




리액티브 연산자(함수) 분류 - 카테고리

연산자 종류 

연산자 함수 

  생성 연산자 


 just(), fromXXX(), create(), interval(), ragne(), timer(),   intervalRange(), defer(), repeat() 


  변환 연산자


 map(),flatMap(), concatMap(), switchMap() , groupBy(),

 scan(), buffer(), window()


  필터 연산자


 filter(), take(), skip(), distinct()


  결합 연산자

 

 zip(), combineLatest(), merge(), concat()

 

  조건 연산자


 amb(), takeUtil(), skipUtil(), all()

 

  에러 처리 연산자 


 onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()


  기타 연산자 

 

 subscribe(), subscribeOn(), observeOn(), reduce(), count()

 




결합 연산자란?

결합 연산자는 다수의 Observable을 하나로 합하는 방법을 제공합니다.

앞서 flatMap() 함수나 groupBy() 함수등은 1개의 Observable을 여러개로 확장해주는 반면,

결합 연산자들은 여러개의 Observable을 내가 원하는 Observable로 결합해 줍니다.


결합연산자 종류

zip() - 입력 Observable에서 데이터를 모두 새로 발행했을때 그것을 합해줍니다.

combineLatest() - 처음에 각 Observable에서 데이터를 발행한 후에는 어디에서 값을 발행하든 최신값으로 갱신합니다.

marge() - 최신 데이터 여부와 상관없이 각 Observable 에서 발행하는 데이터를 그대로 출력합니다.

concat() - 입력된 Observable을 Observable 단위로 이어 붙여줍니다.




결합 연산자 - zip() 함수

결합연산자에서 가장 먼저 알아야 할 함수는 zip() 입니다. zip() 함수의 특징은 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합한다는 데 있습니다.

예를 들어 A,B 두개의 Observable을 결합한다면 2개의 Observable 에서 모두 데이터를 발행해야 결합할 수 있습니다. 그전까지느 발행을 기다립니다.


zip() 함수의 마블 다이어그램

출처http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html


zip()함수의 원형은 아래와 같습니다

@SchedulerSupport(SchedulerSupport.NONE)
public static <T1, T2, R> Observable<R> zip(
    ObservableSource<? extends T1> source1,
    ObservableSource<? extends T1> source2,

    BiFunction<? super T1, ? super T2, ? extends R> zipper)



제네릭 때문에 다소 어려워 보이지만 결국 첫번째 Observable 은 source1에 넣고 두번째 Observable은 source2에 넣은 후 그것을 결합해줄 zipper 변수에 원하는 함수를 넣으면 된다는 뜻입니다. zip() 함수는 최대 9개의 Observable을 결합할 수 있지만 보통 2개 혹은 3개면 충분합니다.


zip() 함수의 활용 예

String[] shapes ={"BALL","PENTAGON","STAR"};
String[] coloredTriangles={"2-T","6-T","4-T"};
        
Observable<String> source= Observable.zip(
Observable.fromArray(shapes).map(Shape::getSuffix),
Observable.fromArray(coloredTriangles).map(Shape::getColor),
                (suffix, color)->color + suffix);

source.subscribe(Log::i);




첫 번째 Observable은 shapes에서 값을 받아온 다음 도형의 모양 접미사(suffix)를 가져옵니다. 

예를 들어 BALL의 경우 접미사는 없고 PENTAGON은 "-P"이고 STAR는 "-S"입니다

map() 함수를 활용하여 Shape.getSuffix() 메소드를 호출합니다.


두번째 Observable은 coloredTirangles에서 값을 받아와서 모양의 색상(color)값으로 변경합니다. 2-T는 2로 변환하고, 6-T와 4-T는 각각 6, 4로 변환합니다.

역시 map() 함수를 활용하여 Shape.getColor() 메소드를 호출합니다.


두정보 모두 String으로 변환하였으므로 스트링연산 (+)를 사용할수 있습니다.

실행결과는 아래와 같습니다

main | value = 2
main | value = 6-P

main | value = 4-S



두 번째 Observable 에서 색상을, 첫 번째 Observable에서 모양을 가져와 성공적으로 결합했습니다.


Shape클래스의 getSuffix() 와 getColor() 메소드

public static String getColor(String shape) { 
        if (shape.endsWith("<>")) //diamond 
            return shape.replace("<>", "").trim();
        
        int hyphen = shape.indexOf("-");
        if (hyphen > 0) {
            return shape.substring(0, hyphen);
        }
        
        return shape; //for ball 
    }
 
    public static String getSuffix(String shape) { 
        if (HEXAGON.equals(shape)) return "-H";
        if (OCTAGON.equals(shape)) return "-O"; 
        if (RECTANGLE.equals(shape)) return "-R";
        if (TRIANGLE.equals(shape)) return "-T";
        if (DIAMOND.equals(shape)) return "<>";
        if (PENTAGON.equals(shape)) return "-P";
        if (STAR.equals(shape)) return "-S";
        return ""; //for BALL        

    }





심화예제 1: 숫자결합

zip() 함수로는 다양한 결합을 할 수 있습니다. 가장 간단한예는 숫자를 계산하는것입니다.

ZipExample demo=new ZipExample();
demo.marbleDiagram();

 demo.zipNumbers();


이번에는 3개의 Observable을 결합했습니다. 첫 번째 Observable은 100단위, 두번째 Observable은 10단위, 세번째 Observable은 1단위 값을 넣었습니다.


실행결과는 다음과 같습니다.

main | value = 111
main | value = 222

main | value = 333




심화예제 2: 숫자결합

두 번째는 interval() 함수와 결합하는 예제입니다. 얼핏 생각하면 같은 타입의 데이터만 결합 할 수있을 것으로 생각되지만, zip() 함수의 묘미는 데이터뿐만 아니라 시간과도 결합할 수 있다는 점입니다. 이를 zipInterval 기법이라고도 합니다.

시간과 결합하면 데이터를 발행하는 시간을 조절할 수 있습니다.

Observable<String> source=Observable.zip(
                Observable.just("RED","GREEN","BLUE"),
                Observable.interval(200L, TimeUnit.MILLISECONDS),
                (value,i)->value
                );
CommonUtils.exampleStart();
source.subscribe(Log::it);

 CommonUtils.sleep(1000);


source 변수는 RED,GREEN,BLUE를 발행하는 첫 번째 Observable 과 200ms 간격으로 0,1,2 값을 발행하는 

두 번째 Observable을 결합했습니다.


결합함수(zipper)로는 (value,i) -> value를 넣었습니다. 즉, 두 Observable를 결합하지만 결괏값은 value가 되는 것입니다.

덕분에 200ms를 지연시킬 수 있습니다.


실행결과는 아래와 같습니다

RxComputationThreadPool-1 | 205 | value = RED
RxComputationThreadPool-1 | 403 | value = GREEN

RxComputationThreadPool-1 | 605 | value = BLUE


200ms 간격으로 RED, GREEN, BLUE 값이 출력되었습니다


심화예제 3: 전기요금 계산예제

이번 예제는 주택용 저압 부분의 전기 요금 계산기를 만들어 보겠습니다.

기본요금(원/호)

전력량 요금(원/kWh) 

 200kWh 이하 사용

910 

 처음 200kWh 까지

93.3 

 201~400kWh 사용

1600 

 다음 200kWh 까지

187.9 

 400kWh 초과 사용

 7,300

  400kWh 초과 

280.6 


전기요금계산 예 (V1)

private int index = 0; //FIXME don't use it 
 
String[] data={
                "100", // 910*93.3*100 =10.240원
                "300" //  1600*93.3*200 + 187.9* 100 =39,050원
        };
        
        Observable<Integer> basePrice= Observable.fromArray(data)
                .map(Integer::parseInt)
                .map(val->{
                    if(val<=200) return 910;
                    if(val>200) return 1600;
                    return 7300;
                });
        
        Observable<Integer> usagePrice =Observable.fromArray(data)
                .map(Integer::parseInt)
                .map(val->{
                    double series1=min(200, val)*93.3;
                    double series2=min(200, max(val-200,0))*187.9;
                    double series3=min(200, max(val-400,0))*280.64;
                    return (int)(series1+series2+series3);
                    
                });
        Observable<Integer> source= Observable.zip(
                basePrice,
                usagePrice,
                (v1,v2)->v1+v2
                );
        source.map(val->new DecimalFormat("#,###").format(val))
        .subscribe(val->{
            StringBuilder sb=new StringBuilder();
            sb.append("Usage: "+ data[index]+" kWh =>");
            sb.append("Price: "+val+"원");
            Log.i(sb.toString());
            index++; // 부수효과
        });



RxJava로 코딩할 때 해경하고자 하는 문제를 여러개의 Observable로 나누어서 생각하는것이 중요합니다. 이 예제에서는 기본요금은 basePrice Observable로, 전력량 요금은 usagePrice Observable로 분리했습니다.


전기요금을 출력하기 위해서는 천 원 단위로 콤마( ,)를 붙여줘야 하는데 source 변수에 map() 함수를 호출하여 간단하게 처리했습니다. 이때 source 변수를 선언할 때가 아니라 마지막 subscribe() 함수 호출 전에 DecimalFormat.fomat() 메소드를 인자로 넣은 후 map()함수를 호출

해서 데이터를 발행하는 곳과 그것을 활용하여 결과를 표시하는 로직을 분리했음을 눈여겨 보기 바랍니다.


*min(a,b)함수는 둘중 작은수를 , max(a,b)는 둘중 더 큰수를 리턴한다.

DecimalFormat참고 -> http://hyeonstorage.tistory.com/163

실행결과

main | value = Usage: 100 kWh =>Price: 10,240원

main | value = Usage: 300 kWh =>Price: 39,050원


이 예제의 커다란 문제는 바로 전력 사용량(100, 300kWh)을 출력하기 위해 멤버 변수 index를 

참조한 것입니다.


이는 부수효과가 발생할 수 있기 때문에 함수형 프로그래밍 기본 원칙에 어긋납니다.


심화예제 4: 부수효과를 없앤 전기요금 계산 예 (V2)

부수효과 index를 없애는 방법

1) data를 추가로 넘겨주는 방법을 고민해봐야합니다.

2) zip() 함수는 2개뿐만 아니라 3개의 Observable도 결합할 수 있습니다.

3) 새로운 클래스를 사용하는 것 보다 앞서서 사용했었던 아파치 커먼즈 Pari클래스를 사용하는것이 좋습니다.

String[] data={
                "100", // 910*93.3*100 =10.240원
                "300" //  1600*93.3*200 + 187.9* 100 =39,050원
        };
        
        Observable<Integer> basePrice= Observable.fromArray(data)
                .map(Integer::parseInt)
                .map(val->{
                    if(val<=200) return 910;
                    if(val>200) return 1600;
                    return 7300;
                });
        
        Observable<Integer> usagePrice =Observable.fromArray(data)
                .map(Integer::parseInt)
                .map(val->{
                    double series1=min(200, val)*93.3;
                    double series2=min(200, max(val-200,0))*187.9;
                    double series3=min(200, max(val-400,0))*280.64;
                    return (int)(series1+series2+series3);
                    
                });
        
        //V1과 다른부분
        Observable<Pair<String,Integer>> source= Observable.zip(
                basePrice,
                usagePrice,
                Observable.fromArray(data),
                (v1,v2,i)->Pair.of(i,v1+v2));
        
        source.map(val->Pair.of(val.getLeft(),
            new DecimalFormat("#,###").format(val.getValue())))
                .subscribe(val->{
                    StringBuilder sb=new StringBuilder();
                    sb.append("Usage: "+ val.getLeft()+" kWh =>");
                    sb.append("Price: "+val.getRight()+"원");
                    Log.i(sb.toString());
                });



위의 예제는 앞 세가지 힌트를 그대로 활용한 것입니다. 

zip() 함수에 세 번째 인자로 Observable.fromArray(data)를 넣어서 원래 데이터를 그대로 넣었습니다. 여기에 Pair.of(i, v1+v2)를 호출하여 Pair 객체를 생성했습니다. 함수형 프로그래밍에서는 임시 자료구조를 활용할 때 새로운 클래스를 만드는 것보다 Pair 같은 범용적으로 활용할 수 있는 자료구조 사용을 선호합니다.


결과를 출력할 때는 Pair 객체의 getLeft()와 getRight() 메소드를 호출했습니다. 실행 결과

는 이전과 동일하지만 부수효과를 제거할 수 있었습니다.





zipWith()함수

zipWith()함수는 앞서서 실습했던 심화예제 1에서 숫자들을 zip() 함수로 결합했던 코드를

zipWith() 함수로 다시 작성해보겠습니다.

Observable<Integer> source= Observable.zip(
                Observable.just(100,200,300),
                Observable.just(10,20,30),
                (a,b)->a+b).zipWith(Observable.just(1, 2, 3),(ab,c)->ab+c);
source.subscribe(Log::i);




심화예제1에서 3개의 Observable을 결합했다면, 이 예제에서는 두 Observable을 zip으로 묶고 세번째 Observable을 다시 zipWith() 함수로 결합했 습니다.

zipWith()을 호출할때는 앞서 a와 b를 결합햇기때문에 ab로 명명했습니다.


실행결과

main | value = 111
main | value = 222

main | value = 333



댓글