본문 바로가기
Rx

리액티브연산자[생성연산자]-11(repeat함수,haertbeat)

by 봄석 2018. 12. 26.

본 내용은 필자가 학습한 내용을 정리하는 내용입니다.

대부분 의 내용이 아래 책의 내용이므로 원서를 구매해서 직접보시는걸 추천드립니다!

RxJava 프로그래밍 리액티브 프로그래밍 기초부터 RxAndroid까지 한 번에

유동환 , 박정준 지음 | 한빛미디어 | 2017년 09월 04일 출간

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9788968488658&orderClick=LAV&Kc=


저자님의 블로그

https://brunch.co.kr/@yudong#info




리액티브 연산자(함수) 분류 - 카테고리

연산자 종류 

연산자 함수 

  생성 연산자 


 just(), fromXXX(), create(), interval(), ragne(), timer(),   intervalRange(), defer(), repeat() 


  변환 연산자


 map(),flatMap(), concatMap(), switchMap() , groupBy(),

 scan(), buffer(), window()


  필터 연산자


 filter(), take(), skip(), distinct()


  결합 연산자

 

 zip(), combineLatest(), merge(), concat()

 

  조건 연산자


 amb(), takeUtil(), skipUtil(), all()

 

  에러 처리 연산자 


 onErrorReturn(), onErrorResumeNext(), retry(), retryUntil()


  기타 연산자 

 

 subscribe(), subscribeOn(), observeOn(), reduce(), count()

 




생성연산자- repeat()함수

repeat()는 함수이름이 말해주듯이 단순히 반복실행을 합니다.

RxJava를 활용하여 코딩하게되면 한번쯤은 사용하게 되는 함수입니다. 

왜냐하면 서버와 통신을 하면 해당 서버가 잘 살아있는지 확인(이 확인 과정을 보통 ping 혹은 heart beat라 합니다) 하는 코드가 필요하기 때문입니다. 이떄 repeat() 함수를 활용하면 유용합니다. 기존 로직에 RxJava를 도입할 때 가장 쉽게 적용해볼 수 있는 상황이기도합니다.


repeat()함수의 마블 다이어그램

출처 - http://reactivex.io/documentation/operators/repeat.html


Observable에서 발행한 '빨강','초록','파랑' 원을 무한히 반복해서 발행합니다.


repeat() 함수 활용 예

String[] balls={"1","3","5"};
Observable<String> source=Observable.fromArray(balls).repeat(3);
        
//onComplete() 함수를 호출했던 로그 출력

 source.doOnComplete(()->Log.d("onComplete")).subscribe(Log::i);



실행결과

main | value = 1
main | value = 3
main | value = 5
main | value = 1
main | value = 3
main | value = 5
main | value = 1
main | value = 3
main | value = 5

main | debug = onComplete



정확히 3번 반복한 후에 onComplete 이벤트가 발생했습니다.



heart beat 구현하기

서버와 연동하는 앱을 작성하다 보면 서버가 동작하는지 확인하는 코드가 필요합니다.

지속적인 통신을 해야 하는 서버의 경우 명세서에 동작 확인 코드를 작성할 것을 명시하기도 합니다.


보통 일정 시간 안에 heart beat 패킷을 보내지 않으면 서버는 클라이언트와의 연결이 종료된 것으로 판단하고 연결을 해제합니다(보통 30초 간격으로 heart beat 신호를 보냅니다)

이럴 때 repeat() 함수를 활용하면 heart beat 패킷을 보내는 프로그램을 간단히 작성할 수 있습니다.


heart beat 전송 프로그램 예

CommonUtils.exampleStart();
String serverUrl= "https://api.github.com/zen";
        
//2초간격으로 서버에 ping 보내기
Observable.timer(2, TimeUnit.SECONDS)
    .map(val->serverUrl)
    .map(OkHttpHelper::get)
    .repeat()
    .subscribe(res->Log.it("Ping Result:"+res));

CommonUtils.sleep(30000);


먼저 서버의 URL을 입력합니다. 간단한 서버인 GitHub API v3의 zen URL을 입력했습니다.

호출할 때마다 매번 다른 문구들을 무작위로 출력합니다.


두번째로 앞서 학습했던 timer() 함수를 하용하여 2초마다 반복하여 실행되도록 했습니다.

이를위해 timer() 함수에서 설정하는 0L값을 serverURL로 바꿧습니다.  map(var->serverURL)


그 다음에는 serverUrl에 저장된 URL의 정보를 얻기위해 OkHttpHelper.get() 메소드를 실행합니다. OkHttpHelper.get() 메소드의 실제 구현코드는 아래와같습니다.

public class OkHttpHelper {
    private static OkHttpClient client = new OkHttpClient();
    public static String ERROR = "ERROR";
    
    public static String get(String url) throws IOException { 
        Request request = new Request.Builder()
                .url(url)
                .build();
        try {
            Response res = client.newCall(request).execute();
            return res.body().string();
        } catch (IOException e) {
            Log.e(e.getMessage());
            throw e;
        } 
    }
}



실행결과

RxComputationThreadPool-1 | 7198 | value = Ping Result:Mind your words, they are important.

RxComputationThreadPool-2 | 9506 | value = Ping Result:Responsive is better than fast.



첫번째로 실행되는 스레는 RxComputationThreadPool 입니다 약 2초간격으로 실행됩니다.

원래 timer() 함수는 한번 호출된 후 종료됩니다. 그런게 계속하여 반복 실행되는 이유는

바로 repeat() 때문입니다. repeat() 함수는 동작이 끝난 다음에 다시 구독하는 방식으로 동작합니다.

그리고 다시 구독할 때마다 동작하는 스레드의 번호가 달라집니다.

댓글