본문 바로가기
Rx

Observable -13 (Subject클래스-AsyncSubject클래스)

by 봄석 2018. 12. 23.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=



Subject 클래스

Subject 클래스는 차가운 Observable을 뜨거운 Observable로 바꿔주는 클래스입니다.

Subject 클래스의 특성은 Observable의 속성과 구독자의 속성이 모두 있다는 점입니다.

Observable 처럼 데이터를 발행할 수도 있고, 구독자처럼 발행된 데이터를 바로 처리할 수도 있습니다.


RxJava에서 제공하는 주요 Subject 클래스에는 AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있습니다.




AsyncSubject 클래스

AsyncSubject 클래스는 Observable에서 발행한 마지막 데이터를 얻어올 수 있는 Subject 클래스입니다. 완료되기 전 마지막 데이터에만 관심이 있으며,이전 데이터는 무시합니다.



AsyncSubject 클래스의 마블 다이어 그램

 출처http://reactivex.io/RxJava/javadoc/io/reactivex/subjects/AsyncSubject.html


 출처- http://reactivex.io/RxJava/javadoc/rx/subjects/AsyncSubject.html


AsyncSubject 클래스는 지금까지와 다르게 마블다이어그램의 아래쪽에 있는 구독자의 시간 표시줄이 여러개인 것이 다릅니다. 처리흐름은 다음과 같습니다.


1) 처음 구독자가 subscribe()함수를 호출합니다.

2) 이후에 빨간원,초록원이 발행된 후 두번째 구독자가 subscribe를 호출합니다.

3) 마지막으로 파란원이 발행되고 데이터 발행을 완료(onComplete)합니다.



이떄 완료되기 전까지는 구독자에게 데이터를 전달하지 않다가 완료됨과 동시에 첫번째와 두번째 구독자에게 마지막 데이터를 발행하고 종료합니다.


AsyncSubject 클래스 활용 예


AsyncSubject<String> subject =AsyncSubject.create();
subject.subscribe(data->System.out.println("Subscriber #1 => " +data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data->System.out.println("Subscriber #2 => " +data));
subject.onNext("5");
subject.onComplete();


AysncSubject 객체인 subject 는 정적팩토리 함수인 create()로 생성합니다.

subscribe() 함수를 호출하여 구독을 시작합니다. subscribe() 함수의 원형은 Observable의 subscribe() 함수와 같습니다. 따라서 data->{} 형식의 람다 표현식을 활용합니다.


onNext() 함수를 호출하면 데이터를 발행합니다. subject 변수에서 String 타입을 지정했으므로 

String 타입을 onNext()의 인자로 넣어야 합니다. '2' 라는 데이터를 발행한 후에는 두번째 구독자가 subscribe()를 호출합니다.


'3' 이라는 데이터를 발행한 후에는 마지막으로 onComplete()함수를 호출했습니다.

이떄 마지막으로 입력된 데이터가 두 구독자에게 최종 전달 됩니다.


Subscriber #1 => 5

Subscriber #2 => 5


AsyncSubject 클래스가 Observable의 구독자로 동작하는 예제

AsyncSubject 클래스는 구독자로도 동작할 수 있습니다. 


Float[] temperature= {10.1f,13.4f,12.5f};
Observable<Float> source =Observable.fromArray(temperature);
        
AsyncSubject<Float> subject =AsyncSubject.create();
subject.subscribe(data->System.out.println("Subscriber #1 =>" +data));
source.subscribe(subject);



먼저 10.1f 등의 Float 타입 온도 데이터를 담는 Observable을 생성합니다. Observable 타입의 변수 이름은 source입니다. 다음은 subject 변수에 AsyncSubject 객체를 생성하고 data를 수신할 수 있도록 subscribe() 함수를 호출 합니다. 마지막으로 subject 변수는 Observable인 source를 구독합니다.


이런과정이 가능 한 이유는  아래처럼 Subject 클래스가 Observable을 상속하고 동시에 Observable 인터페이스를 구현하기 떄문입니다.


public abstract class Subject<T> extends Observable<T> implements Observer<T>


위 예제의 실행 결과


Subscriber #1 =>12.5


AsyncSubject 클래스에서 onComplete() 함수를 호출한 후에 구독할 때


AsyncSubject<Integer> subject = AsyncSubject.create(); subject.onNext(10);
subject.onNext(11);
subject.subscribe(data->System.out.println("Subscriber #1 => " + data ));
subject.onNext(12);
subject.onComplete();
subject.onNext(13);
subject.subscribe(data->System.out.println("Subscriber #2 => " + data ));

 subject.subscribe(data->System.out.println("Subscriber #3 => " + data ));




'10','11'이라는 데이터를 발행한 후 첫 번째 구독자가 subscribe()함수를 호출하고 '12'라는 데이터를 발행한 후 onComplete()함수를 호출했습니다. Observable과 마찬가지로 

onComplete()함수 호출 이후에는 onNext() 이벤트를 무시 합니다.

그다음 두번째와 세번째 구독자가 subscribe() 함수를 호출 했습니다.


실행결과는 모두 12라는 값을 전달 받습니다.

Subscriber #1 => 12
Subscriber #2 => 12

Subscriber #3 => 12


댓글