본문 바로가기
Rx

스케줄러 -7( observeOn함수의 활용)

by 봄석 2018. 12. 28.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




RxJava의 버전별 스케줄러 종류

스케줄러 

RxJava 2.x 

RxJava 1.x 

  뉴 스레드 스케줄러

newThread()

 newThread()

 싱글 스레드 스케줄러

single() 

지원 안 함 

계산 스케줄러 

computation() 

computation() 

IO 스케줄러 

io() 

io() 

트램펄린 스케줄러 

trampoline() 

trampoline() 

메인 스레드 스케줄러 

지원 안 함 

im 

 테스트 스케줄러

지원 안 함 

test() 


스케줄러 동작 방법

1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.

2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 

발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.

3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.

4. 스케줄러를 별도로 지정하지 않으면 현재(main)스레드에서 동작을 실행한다



observeOn() 함수의 활용
RxJava 스케줄러의 핵심은 결국 제공되는 스케줄러의 종류를 선택한 후 subscribeOn과 observeOn() 함수를 호출하는 것입니다 .

subscribeOn() 함수는 Observable에서 구독자가 subscribe() 함수를 호출했을 때 데이터 흐름을 발행하는 스레드를 지정하고, observeOn() 함수는 처리된 결과를 구독자에게 전달하는 스레드를 지정합니다. 또한 subscribeOn() 함수는 처음 지정한 스레드를 고정시키므로 다시 subscribeOn() 함수를 호출해도 무시합니다. 하지만 observeOn() 함수는 다릅니다.


observeOn()의 다양한 활용

출처 - http://reactivex.io/documentation/scheduler.html


위 그림은 스케줄러에 대한 많은 내용을 말해줍니다.

- subscribeOn(A)를 호출했을 때는 데이터를 발행하는 첫 줄이 스레드A(파랑실행줄)

에서 실행됩니다. 이후에는 observeOn() 함수가 호출될때 까지 스레드A(파랑실행줄)에서 실행됩니다.

- observeOn(B)를 호출하면 그 다음인 두번째 줄부터는 스레드B(주황실행줄)에서 실행됩니다.

- map( 0 - - ->ㅁ) 함수는 스레드 변경과 상관없으므로 계속 스레드B(주황실행줄)에서 실행됩니다.

- 이제 observeOn(C) 함수를 호출하면 그 다음 데이터 흐름은 스레드C(분홍실행줄)에서 실행됩니다.


요약하면 

1) subscribeOn() 함수는 한번 호출했을 때 결정한 스레드를 고정하며 이후에는 다시 호출해도 스레드가 바뀌지 않는다.

2) obseveOn()은 여러번 호출할 수 있으며 호출되면 그 다음부터 동작하는 스레드를 바꿀수 있다.


전통적인 스레드 프로그래밍에서는 일일히 스레드를 만들어야 하고 스레드가 늘어날 때마다 동기화 하는것이 매우 부담스럽기때문에 이런 로직을 구현하는것이 매우힘듭니다. 하지만 observeOn() 함수는 스레드 변경이 쉬우므로 활용할 수 있는 범위가 넓습니다.



실습예제: OpenWeatherMap 연동

세계의 날씨 정보를 제공하는 API 중 OpenWeatherMap(http://openweathermap.org/)가 있습니다. 무료로 제공되는 API고 REST 방식으로 제공됩니다.


이번 실습 예제에서는 OpenWeatherMap에서 제공하는 API key를 연동해 어떤 국가에 속한 도시의 온도를 나타내겠습니다.


먼저 API key를 생성하기 위해 OpenWeatherMap의 홈페이지에 접속한 후 중간 위의 [Sign Up]메뉴를 눌러 회원가입합니다.


회원가입을 완료하면 API 메뉴를 눌러 접속하고 Current weather data를 누른뒤 <Subscribe>를 누릅니다. 무료API 사용할 것이므로 [Free] 항목에 있는 <Get API key and Start>를 누릅니다 


그리고 맨위 중간에 [Sign in]을 누르고  그 항목에서 [API keys]를 누르면 Default API key가 하나 생성되어있을 것입니다.


이제 이 발급된 API key를 바탕으로 간단한 REST API 호출을 이용해 특정 도시의 현재 날씨를 얻어오도록 하겠습니다. 


json형식

{

"coord": {

"lon": -0.13,

"lat": 51.51

},

"weather": [

{

"id": 701,

"main": "Mist",

"description": "mist",

"icon": "50n"

},

{

"id": 721,

"main": "Haze",

"description": "haze",

"icon": "50n"

}

],

"base": "stations",

"main": {

"temp": 279.39,

"pressure": 1028,

"humidity": 93,

"temp_min": 278.15,

"temp_max": 280.15

},

"visibility": 7000,

"wind": {

"speed": 3.1,

"deg": 230

},

"clouds": {

"all": 90

},

"dt": 1545972600,

"sys": {

"type": 1,

"id": 1414,

"message": 0.0059,

"country": "GB",

"sunrise": 1545984367,

"sunset": 1546012697

},

"id": 2643743,

"name": "London",

"cod": 200

}

cs

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

{

"coord": {

"lon": -0.13,

"lat": 51.51

},

"weather": [

{

"id": 701,

"main": "Mist",

"description": "mist",

"icon": "50n"

},

{

"id": 721,

"main": "Haze",

"description": "haze",

"icon": "50n"

}

],

"base": "stations",

"main": {

"temp": 279.39,

"pressure": 1028,

"humidity": 93,

"temp_min": 278.15,

"temp_max": 280.15

},

"visibility": 7000,

"wind": {

"speed": 3.1,

"deg": 230

},

"clouds": {

"all": 90

},

"dt": 1545972600,

"sys": {

"type": 1,

"id": 1414,

"message": 0.0059,

"country": "GB",

"sunrise": 1545984367,

"sunset": 1546012697

},

"id": 2643743,

"name": "London",

"cod": 200

}




Json데이터에서 온도 도시,국가 이름 추출

public class OpenWeatherMapV1 {
    private static final String URL =
            "http://api.openweathermap.org/data/2.5/weather?q=London&APPID=";
    
    public void run(){
        Observable<String> source= Observable.just(URL+"API_KEY")
        .map(OkHttpHelper::getWithLog)
        .subscribeOn(Schedulers.io());
        
        //어떻게  한 번만 호출하게 만들 수 있을까?
        Observable<String> temperature=source.map(this::parseTemperature);
        Observable<String> city= source.map(this::parseCityName);
        Observable<String> country= source.map(this::parseCountry);
        CommonUtils.exampleStart();
        
        Observable.concat(temperature, city, country)
            .observeOn(Schedulers.newThread())
            .subscribe(Log::it);
        CommonUtils.sleep(5000);
    }
    private String parseTemperature(String json){
        return parse(json, "\"temp\":[0-9]*.[0-9]*");
    }
    private String parseCityName(String json){
        return parse(json, "\"name\":\"[a-zA-Z]*\"");
    }
    private String parseCountry(String json){
        return parse(json, "\"country\":\"[a-zA-Z]*\"");
    }
    private String parse(String json, String regex){
        Pattern pattern=Pattern.compile(regex);
        Matcher match =pattern.matcher(json);
        if(match.find()){
            return match.group();
        }
        return "N/A";
    }
    
    public static void main(String args[]){
        OpenWeatherMapV1 demo=new OpenWeatherMapV1();
        demo.run();
    }
}




먼저 API Call URL을 URL 변수에 넣습니다. APP_KEY 부분에는 각자 얻어온 API Key를 넣으면됩니다.

우리가 원하는 정보는 REST API 에서 가져온 JSON 데이터를 파싱하여 얻으면 됩니다. 

source 변수는 JSON 데이터 원본을 의미합니다.


여기에 내가 원하고자 하는 정보인 temperature, city, country 각각에 map() 함수를 호출하여 파싱합니다. 데이터를 파싱하는 것은 정규표현식을 활용했습니다.

JSON 정보의 내용이 <제목>: <정보> 형태를 띄고 있으므로 정규 표현식 패턴이 매우 간편합니다. 만약 좀 더 복잡한 내용을 파싱할 때는 Gson 등의 라이브러리를 활용하는 것이 더 좋습니다.


다음에는 파싱해서 얻은 각각의 정보를 취합하기 위해서 concat() 함수를 호출했습니다.

RxJava를 배울 때는 내가 원하는 정보 단위로 Observable을 분리한다고 생각하는 데 익숙하지 않으므로 어떻게 보면 가장 핵심 로직에 해당합니다 .


실행결과

RxCachedThreadScheduler-1 | debug = OkHttp call URL = http://api.openweathermap.org/data/2.5/weather?...
RxNewThreadScheduler-1 | 1883 | value = "temp":279.39
RxCachedThreadScheduler-2 | debug = OkHttp call URL = http://api.openweathermap.org/data/2.5/weather?...
RxNewThreadScheduler-1 | 2093 | value = "name":"London"
RxCachedThreadScheduler-1 | debug = OkHttp call URL = http://api.openweathermap.org/data/2.5/weather?....
RxNewThreadScheduler-1 | 2306 | value = "country":"GB"
 
    }
}


소스코드에서 subscribeOn(Schedulers.io()) 함수를 호출한 후 REST API 호출은 IO 스케줄러에서 실행했습니다.


API 호출 결과는 observeOn() 함수에 지정한 것처럼 뉴 스레드 스케줄러에서 실행했습니다.

원하는 정보가 나왔지만 REST API 호출이 3번 발생했습니다.


만약 원하는 정보가 10개 였다면 API 호출도 그만큼 발생할 것이므로 비효율적입니다. 어떻게 하면 한번만 호출하고 원하는 결과를 얻을 수 있을까요 ? Observable분리를 생각하지 않으면 해결하기가 어렵습니다.


REST API 한번만 호출하도록 개선한 예

public class OpenWeatherMapV2 {
    private static final String URL =
            "http://api.openweathermap.org/data/2.5/weather?q=London&APPID=";
    
    public void run(){
        Observable<String> source= Observable.just(URL+"API_KEY")
        .map(OkHttpHelper::getWithLog)
        .subscribeOn(Schedulers.io())
        .share()
        .observeOn(Schedulers.newThread());
        
        source.map(this::parseTemperature).subscribe(Log::it);
        source.map(this::parseCityName).subscribe(Log::it);
        source.map(this::parseCountry).subscribe(Log::it);
        CommonUtils.sleep(5000);
    }
    private String parseTemperature(String json){
        return parse(json, "\"temp\":[0-9]*.[0-9]*");
    }
    private String parseCityName(String json){
        return parse(json, "\"name\":\"[a-zA-Z]*\"");
    }
    private String parseCountry(String json){
        return parse(json, "\"country\":\"[a-zA-Z]*\"");
    }
    private String parse(String json, String regex){
        Pattern pattern=Pattern.compile(regex);
        Matcher match =pattern.matcher(json);
        if(match.find()){
            return match.group();
        }
        return "N/A";
    }
    
    public static void main(String args[]){
        OpenWeatherMapV2 demo=new OpenWeatherMapV2();
        demo.run();
    }
}




정답은 바로 ConnetableObservable 클래스 사용입니다. 

ConnectableObservable 클래스는 1개의 Observable을 여러 구독자가 공유하는 방식으로 

차가운 Observable을 뜨거운 Observable로 변환해 줍니다.


여기서는 ConnectableObservable 클래스의 publish() 함수와 refCount() 함수를 활용했습니다.

소스 코드에 보이지 않는 이유는 두 함수를 합하면 Observable의 share() 함수가 되기 때문입니다.


도시의 온도,도시 이름, 국가이름 을 추출하기 위해 map()함수와 subscribe() 함수를 차례로 호출했습니다. subscribe 함수를 호출하면 Observable의 데이터가 다시 발행되기 때문에  서버의 REST API를 다시 호출하지 않아도 됩니다.


실행결과

RxCachedThreadScheduler-1 | debug = OkHttp call URL = http://api.openweathermap.org/data/2.5/weather?q=London&APPID=....
RxNewThreadScheduler-1 | 1545975338647 | value = "temp":279.26
RxNewThreadScheduler-2 | 1545975338647 | value = "name":"London"

RxNewThreadScheduler-3 | 1545975338649 | value = "country":"GB"




ConnectableObservable 클래스 

1) connect() : 이 메서드를 호출하면 그제야 데이터가 나옵니다. 

2) refCount() : 몇 명의 구독자가 있는지 알려줍니다. 

3) share() : publish()함수와 refConut() 함수를 합친것입니다.

4) publish() : Observable을 ConnectableObservable로 변환해줍니다.

(차가운 Observable을 뜨거운 Observable로 변환)




더이상 배출할 Observer가 없을시 refCount는 자동으로 자신을 해지를 하고 다시 새로운 Observer 이 오면 처음부터 자동으로 시작을 한다.

Observable<Long> threeRandoms = Observable.interval(1,TimeUnit.SECONDS)
    .publish()
    .refCount();

//Observable 1 배출
threeRandoms.take(5).subscribe( i->System.out.println("Observer 1 : " + i));

sleep(3000); // 3초동안 잠자기

//Observable 2 배출
threeRandoms.take(2).subscribe( i -> System.out.println("Observer 2: " + i));

sleep(3000); // 3초동안 잠자기

//Observable 3 배출
threeRandoms.subscribe( i-> System.out.println("Observer 3: " + i));

sleep(3000); // 3초동안 잠자기
....................
Observable 1 : 0
Observable 1 : 1
Observable 1 : 2
Observable 1 : 3
Observable 2: 3
Observable 1 : 4
Observable 2: 4 <-- 배출이 완료가 되고 dispose가 됐다. 
Observable 3: 0 <-- 새로운 Observer 왔기 때문에 시작된다. 
Observable 3: 1

그리고 publish().refCount() 을 줄여서 share() 라고 사용가능하다.

Observable.interval(1,TimeUnit.SECONDS).share(); // publish().refCount() 와 같은 의미


댓글