Rx
Observable -6 (fromIterable함수,List,Set,BlockingQueue)
봄석
2018. 12. 23. 16:48
Observable의 팩토리 함수 구분
팩토리함수 | 함수 |
RxJava 1.x의 기본 팩토리 함수 | create(),just(),from() |
RxJava 2.x의 기본 팩토리 함수 | fromArray(),fromlterable(),fromCallable(),fromFuture(), fromPublisher() |
기타 팩토리 함수 | interval(),range(),timer(),defer()
|
fromIterable() 함수
Observable을 만드는 다른 방법은 Iterable 인터페이스를 구현한 클래스에서 Observable객체를 생성하는 것입니다. Iterable 인터페이스는 반복자(iterator)를 반환합니다.
Iterator 인터페이스는 이터레이터패턴*을 구현한 것으로 다음에 어떤 데이터(아이템)가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는 장점이 있습니다.
자바의 많은 컬렉션 클래스가 이 인터페이스를 구현합니다.
*이터레이터패턴 - https://en.wikipedia.org/wiki/Iterator_pattern
public interface Iterator<E>{
boolean hasNext();
E next();
}
Iterable<E> 인터페이스를 구현하는 대표적인 클래스는 ArrayList(List 인터페이스),Array BlockingQueue(BlockingQueue 인터페이스),HashSet(Set 인터페이스),LinkedList,Stack,TreeSet,Vector 등이 있습니다.
fromIterable()에 List사용
List<String> names=new ArrayList<>();
names.add("Jerry");
names.add("william");
names.add("Tom");
Observable<String> source =Observable.fromIterable(names);
source.subscribe(System.out::println);
fromIterable()에 Set사용
Set<String> cities=new HashSet<>();
cities.add("Seoul");
cities.add("London");
cities.add("Paris");
Observable<String> source= Observable.fromIterable(cities);
source.subscribe(System.out::println);
fromIterable()에 BlockingQueue사용
BlockingQueue 인터페이스의 객체로 Observable 만들기
BlockingQueue<Order> orderQueue=new ArrayBlockingQueue<>(100);
orderQueue.add(new Order("ORD-1"));
orderQueue.add(new Order("ORD-2"));
orderQueue.add(new Order("ORD-3"));
Observable<Order> source=Observable.fromIterable(orderQueue);
source.subscribe(order->System.out.println(order.getId()));
Order클래스
public class Order {
private String mId;
public Order(String id) {
mId = id;
}
public String getId() {
return mId;
}
@Override
public String toString() {
return "Order ID: " + mId;
}
}
BlockingQueue 객체는 구현클래스로 ArrayBlockingQueue를 사용했고 최대 대기 행렬수는 100으로 지정.
ORD-1에서 ORD-3까지 Order 객체를 입력했으므로 출력 결과는 Order 객체의 ID를 출력합니다.
BlockingQueue
멀티 스레드 환경에서 Queue는 생산 및 소비의 구조에 필수적인 자료구조이다.
여기서 우리는 BlockingQueue라는 interface를 구현한 객체를 가져다 쓸 수 있다.
Block 이라는 것은 먼저 무엇일까? '막는다'는 뜻이다.
그럼 무엇을 막는 다는 것인가? 그것은 바로!
Queue가 꽉찼을때의 삽입 시도 / Queue가 비어있을때의 추출 시도
를 막는 다는 것이다.
이 자동으로 '막는' 기능이 있어 BlockingQueue 의 구현체는 모두 Thread-safe 하다.
<BlockingQueue의 종류와 특징>
ArrayBlockingQueue
- 고정배열에 일반적인 Queue를 구현한 클래스, 생성 후 크기변경 불가
- 꽉찼을때 추가 block, 비었을때 추출 block
- 선택적으로 공평성 정책을 두어 block한 thread들의 순차적 대기열 생성
(대기열 처리에 대해 정확한 순서 보장 X, 공평성 따짐 - throughput 감소되나 variability를 줄이고 starvation을 해소한다.)