본문 바로가기
Rx

Observable -16 (ConnectableObservable 클래스)

by 봄석 2018. 12. 24.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


ConnectableObservable 클래스

ConnectableObservable 클래스는 Subject 클래스처럼 차가운 Observable을 뜨거운 Observable로 변환합니다 .Observable을 여러 구독자에게 공유할 수 있으므로 원 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용합니다.

특이한 점은 subscribe()함수를 호출해도 아무런 동작이 일어나지 않는다는 점입니다.

새로 추가된 connect() 함수는 호출한 시점부터 subscribe()함수를 호출한 구독자에게 데이터를 발행하기 때문입니다.


ConnectableObservable 객체를 생성하려면 먼저 Observable에 publish()함수를 호출해야합니다. 이 함수는 여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지 데이터 발행을 유예하는 역할을 합니다.


 ConnectableObservable.publish()의 마블다이어 그램


출처 - http://reactivex.io/RxJava/javadoc/io/reactivex/observables/ConnectableObservable.html


기존의 마블다이어그램과 달리 Observable에 subscribe()함수를 호출해야 데이터 발생하게되는데 이제는 아무런 일도 일어나지 않습니다. 오로지 connect()함수를 호출해야 그때 까지 구독했던 구독자 모두에게 데이터를 발행합니다.

connect()함수를 호출한 이후에 구독한 구독자 에게는 구독 이후에 발생한 데이터(파랑원)부터 발행합니다.


ConnectableObservable 클래스 활용 예

String[] dt ={"1","3","5"};
Observable<String> balls= Observable.interval(100L, TimeUnit.MILLISECONDS)
            .map(Long::intValue)
            .map(i->dt[i])
            .take(dt.length);

ConnectableObservable<String> source =balls.publish();
source.subscribe(data->System.out.println("Subscriber #1 =>"+data));
source.subscribe(data->System.out.println("Subscriber #2 =>"+data));
source.connect();
        
CommonUtils.sleep(250);
source.subscribe(data->System.out.println("Subscriber #3 =>"+data));
CommonUtils.sleep(100);



발행하려는 데이터는 '1','3','5'입니다 

Observable.interval()는 인자를 2개 받는데 각각 시간과 시간의 단위입니다.

여기서는 100ms(100L,TimeUnit.MILLISECONDS)단위로 0부터 발행합니다.


interval()함수는 테스트 코드를 작성할 때 많이 사용합니다.지금은 100ms간격으로 데이터를 발행한다고 생각하면 됩니다.


다음은 map()함수를 이용해 0부터 발생하는 데이터를 바탕으로 각 데이터를 발생합니다.

interval()함수는 0,1,2,3 ····식으로 숫자만 만들수 있으므로 배열에 들어있는 데이터를 발행하려고 변환한 것입니다.


첫번째와 두번째 구독자가 추가되면 connect() 함수를 호출해 데이터 발생을 시작합니다.

그다음 세번째 구독자가 나오기 전까지 sleep() 함수를 이용해 250ms를 기다리고 세번째 구독자를 추가합니다. connect() 함수를 호출했으므로 이후에는 구독하면 다음의 데이터를 바로 수신할 수 있습니다.


sleep() 함수를 이용해 100ms 기다린 후에는 balls객체의 데이터를 모두 발행합니다.

따라서 세 구독자 모두 구독해지 됩니다.


실행결과

Subscriber #1 =>1
Subscriber #2 =>1
Subscriber #1 =>3
Subscriber #2 =>3
Subscriber #1 =>5
Subscriber #2 =>5
Subscriber #3 =>5


댓글