RxDart에 대하여 알아보기
이번 포스트에서는 RxDart에 대하여 자세하게 알아보려 합니다
목차
RxDart를 알아보기 전에
RxDart는`dart:async`패키지의 Dart Streams를 대체하기 위해 자체 Observable 클래스를 제공하지 않습니다.
기본 제공되는 `dart:async`패키지의 Stream 및 StreamController에 확장 기능을 구현한 것입니다.
✏️기본 제공되는 훌륭한 Stream API를 먼저 확인해보는 것을 추천합니다!
표준 Rx와 RxDart(Stream API) 비교해보기
많은 상황에서 Streams와 Observable은 같은 방식으로 하지만
표준 Rx에 익숙하다면 RxDart(with Stream API)의 일부 기능이 놀라 울 수 있습니다!
아래 표로 표준 Rx와 RxDart(with Stream API)를 비교해보겠습니다👏🏻
표준 Rx | Stream | |
오류가 발생했을때 | 관잘 가능한 오류와 함께 종료 | 오류가 발생하고 스트림이 이어짐 |
Cold Observable | 여러 구독자가 동일한 콜드 Observable을들을 수 있으며 각 구독은 고유 한 데이터 스트림을받습니다. | 단일 구독만 가능 |
Hot Observable 사용 | 그렇다 | BraodcastStream으로 사용 |
Publish, Behavior, ReplaySubject를 HotObservable로 사용하는지? |
그렇다 | 그렇다 |
Sinlge / Maybe / Completable 사용 | 그렇다 | 모두 `Future`로 대체하여 사용 |
Back pressure(배압) 지원여부 | 그렇다 | 그렇다 |
null을 방출할수 있는지 여부 | RxJava는 불가능 그외 Rx는 가능 |
그렇다 |
구독을 일시중지, 재개 할수 있는지 ? | 아니다 | 그렇다 |
Factory(생성) 함수
여러 스트림을 결합하거나 병합하는 것과 같은 특정 기능으로 스트림을 생성하는
Factory(생성) 함수에 대하여 알아보겠습니다!
CombineLatest
소스 스트림 중 하나가 항목을 방출할 때마다 결합하여
스트림을 하나의 스트림 시퀀스로 병합합니다.
모든 스트림이 하나 이상의 아이템을 방출할 때까지 스트림이 방출되지 않습니다.
🔻샘플 코드
test('각 스트림에서 모든 값이 한개이상 방출되었을때, 가장 최근값들을 합쳐 방출해야한다', () async {
// given
var a = Stream.fromIterable(['a']),
b = Stream.fromIterable(['b']),
c = Stream.fromIterable(['C', 'D']);
// when
final stream = Rx.combineLatestList([a, b, c]);
// then
await expectLater(
stream,
emitsInOrder([
['a', 'b', 'C'],
['a', 'b', 'D'],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('각 스트림에서 모든 값이 한개이상 방출되었을때, 가장 최근값들의 `가장 마지막` 값을 합쳐 방출해야한다', () async {
// given
var a = Stream.fromIterable(['a']),
b = Stream.fromIterable(['b']),
c = Stream.fromIterable(['C', 'D']);
// when
final stream =
Rx.combineLatest([a, b, c], (List<String> values) => values.last);//결합 조건 추가
// then
await expectLater(stream, emitsInOrder(['C', 'D', emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
Concat
이전 스트림 순서가 성공적으로 종료되는 한 지정된 모든 스트림 순서를 연결합니다.
각 스트림을 하나씩 구독하여 모든 항목을 방출하고 다음 스트림을 구독하기 전에 완료하여 이를 수행합니다.
🔻샘플 코드
test('0,1,2,3,4,5가 순서적으로 발행되야한다 emitsInOrder', () {
// given
var a = Stream.fromIterable([0, 1, 2]);
var b = Stream.fromIterable([3, 4, 5]);
// when
final stream = Rx.concat([a, b]);
// then
expect(stream, emitsInOrder([0, 1, 2, 3, 4, 5]));
}, timeout: Timeout(Duration(seconds: 5)));
ConcatEager
이전 스트림 순서가 성공적으로 종료되는 한 지정된 모든 스트림 순서를 연결합니다.
다음 스트림 이후에 하나의 스트림을 구독하지 않고 모든 스트림이 즉시 구독됩니다.
그런 다음 이전 스트림이 항목 방출을 완료한 후 이벤트가 올바른 시간에 생성됩니다.
🔻샘플 코드
test('0,1,2,3,4,5가 순서적으로 발행되야한다 emitsInOrder', () {
// given
var a = Stream.fromIterable([0, 1, 2]);
var b = Stream.fromIterable([3, 4, 5]);
// when
final stream = Rx.concatEager([a, b]);
// then
expect(stream, emitsInOrder([0, 1, 2, 3, 4, 5]));
}, timeout: Timeout(Duration(seconds: 5)));
Defer
Defer는 Stream이 구독할 때까지 기다린 다음 지정된 팩토리 기능으로 스트림을 만듭니다.
경우에 따라 스트림을 생성하기 위해 마지막 순간까지 (즉 구독 시간까지) 대기하면이 스트림에 최신 데이터가 포함됩니다.
기본적으로 DeferStreams는 단일 구독입니다. 그러나 재사용할 수 있습니다.
🔻샘플 코드
test('defer', () {
// given
var a = Stream.value(1);
// when
final stream = Rx.defer(() => a);
// then
stream.listen(expectAsync1(
(value) {
expect(value, 1);
},
count: 1,
));
}, timeout: Timeout(Duration(seconds: 5)));
test('defer는 기본적으로 단일구독이므로, 여러번 구독햇을때 실패해야한다', () {
// given
var a = Stream.value(1);
// when
final stream = Rx.defer(() => a);
// then
try {
stream.listen(null);
stream.listen(null);
expect(true, false);
} catch (e) {
// 'Bad state: Stream has already been listened to.'
expect(e, isStateError);
}
}, timeout: Timeout(Duration(seconds: 5)));
test('reusable이 true일때 defer는 재사용 가능해야한다', () {
// given
const value = 1;
// when
final stream = Rx.defer(
() => Stream.fromFuture(
Future.delayed(
Duration(seconds: 1),
() => value,
),
),
reusable: true,
);
// then
stream.listen(
expectAsync1(
(actual) => expect(actual, value),
count: 1,
),
);
stream.listen(
expectAsync1(
(actual) => expect(actual, value),
count: 1,
),
);
}, timeout: Timeout(Duration(seconds: 5)));
}
ForkJoin
이 연산자는 스트림 그룹이 있고 "각각의 최종 방출 값"에만 관심이 있는 경우에 가장 적합합니다.
이에 대한 한 가지 일반적인 사용 사례는 페이지 로드 (또는 다른 이벤트)에서 여러 요청을 발행하고,
모두에 대한 응답이 수신된 경우에만 조치를 수행하려는 경우입니다.
forkJoin 오류에 공급된 내부 스트림 중 하나라도 내부 스트림에서 오류를 올바르게 포착하지 않으면,
이미 완료되었거나 완료된 다른 스트림의 값을 잃게 됩니다.
모든 내부 스트림 만 성공적으로 완료되는 데 관심이 있는 경우 외부에서 오류를 잡을 수 있습니다.
또한 하나 이상의 항목을 방출하는 스트림이 있고 이전 배출 포크 우려가 우려되는 경우 올바른 선택이 아닙니다.
이 경우 combineLatest 또는 zip과 같은 연산자를 사용하는 것이 좋습니다.
🔻샘플 코드
test('각 스트림의 가장 최근의 값을 합쳐 List로 방출해야한다', () async {
// given
var a = Stream.fromIterable(['a']),
b = Stream.fromIterable(['b']),
c = Stream.fromIterable(['C', 'D']);
// when
final stream = Rx.forkJoinList([a, b, c]);
// then
await expectLater(
stream,
emitsInOrder([
['a', 'b', 'D'],
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('각 스트림의 가장 최근의 값을 합쳐 List로 방출해야한다, 결합 조건 추가', () async {
// given
var a = Stream.fromIterable(['a']),
b = Stream.fromIterable(['b']),
c = Stream.fromIterable(['C', 'D']);
// when
final stream = Rx.forkJoin([a, b, c], (List<String> values) => values.last);
// then
await expectLater(stream, emitsInOrder(['D', emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
test('ForkJoin N개의(특정수의) 스트림 존재', () async {
// given
var a = Stream.fromIterable(['1']), b = Stream.fromIterable(['2', '3']);
// when
final stream = Rx.forkJoin2(a, b, (a, b) => a + b);
// then
await expectLater(stream, emitsInOrder(['13', emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
test('ForkJoin 스트림중 에러가 포함되어있으면 결합하지 않아야한다', () async {
// given
var a = Stream.value(1),
b = Stream.value(1),
c = Stream<int>.error(Exception());
// when
final stream = Rx.forkJoin3(a, b, c, (a, b, c) => a + b + c);
// then
await expectLater(stream, emitsError(TypeMatcher<Exception>()));
}, timeout: Timeout(Duration(seconds: 5)));
Merge
지정된 항목에서 방출된 항목을 streams단일 스트림 리스트로 병합합니다.
🔻샘플 코드
test('각스트림에서 방출된 값을 리스트로 방출해야한다', () async {
// given
var a = Stream.periodic(const Duration(seconds: 1), (count) => count)
.take(3),
b = Stream.fromIterable(const [1, 2, 3, 4]);
// when
final stream = Rx.merge([a, b]);
// then
await expectLater(stream, emitsInOrder([1, 2, 3, 4, 0, 1, 2]));
}, timeout: Timeout(Duration(seconds: 5)));
test('merge중 에러 발생시, 에러발생하기 전 값들까지만 합쳐서 리스트로 반환하고 에러를 방출한다', () async {
// given
var a = Stream.periodic(const Duration(seconds: 1), (count) => count)
.take(3),
b = Stream.fromIterable(const [1, 2, 3, 4]),
c = Stream<int>.error(Exception());
// when
final streamWithError = Rx.merge([a, b, c]);
//then
streamWithError.listen(
null,
onError: expectAsync2((e, s) => expect(e, isException)),
);
}, timeout: Timeout(Duration(seconds: 5)));
Never
무한 지속 시간을 나타내는 데 사용할 수 있는 종료되지 않는 스트림 시퀀스를 반환합니다.
never 연산자는 매우 구체적이고 제한된 동작을 가진 연산자입니다.
이는 테스트 목적으로 유용하며
때로는 다른 스트림과 함께
또는 다른 스트림을 매개 변수로 기대하는 스트림에 매개 변수로 결합하는 데 유용합니다.
🔻샘플 코드
test('어떤 에러나 데이터등을 리턴하지않아야한다', () async {
// given
var onDataCalled = false, onDoneCalled = false, onErrorCalled = false;
// when
final stream = Rx.never<Null>();
final subscription = stream.listen(
expectAsync1((_) {
onDataCalled = true;
}, count: 0),
onError: expectAsync2((Exception e, StackTrace s) {
onErrorCalled = false;
}, count: 0),
onDone: expectAsync0(() {
onDataCalled = true;
}, count: 0));
await Future<Null>.delayed(Duration(milliseconds: 10));
await subscription.cancel();
// 어떤 에러나 데이터등을 리턴하는 콜백함수가 모두 호출되지않아 초기상태 모두 false임
await expectLater(onDataCalled, isFalse);
await expectLater(onDoneCalled, isFalse);
await expectLater(onErrorCalled, isFalse);
}, timeout: Timeout(Duration(seconds: 5)));
Race
두 개 이상의 source가 주어지면 streams처음 항목에서만 모든 항목 streams을 내보내 항목이나 알림을 내 보냅니다.
🔻샘플 코드
test('race', () {
// given
var a = Rx.timer(1, Duration(seconds: 3)),
b = Rx.timer(2, Duration(seconds: 2)),
c = Rx.timer(3, Duration(seconds: 1));
// when
final stream = Rx.race([a, b, c]);
// then
stream.listen(expectAsync1(
(value) => expect(value, 3),
count: 1,
));
}, timeout: Timeout(Duration(seconds: 5)));
test('race 수행중 에러발생', () async {
// given
var a = Rx.timer(1, Duration(seconds: 1)),
b = Stream<Null>.error(Exception('oh noes!'));
// when
final stream = Rx.race([a, b]);
// then
stream.listen(
null,
onError: expectAsync2((e, s) => expect(e, isException)),
);
});
Range
지정된 범위 내에서 일련의 정수를 방출하는 Stream을 반환합니다.
🔻샘플 코드
test('1-3범위 안의 값을 방출해야 한다', () async {
// given
// when
final stream = Rx.range(1, 3);
// then
await expectLater(stream, emitsInOrder([1, 2, 3, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
test('start와 end가 같으면 1개의 항목만 방출해야한다', () async {
// given
// when
final stream = Rx.range(1, 1);
// then
stream.listen(expectAsync1((actual) {
expect(actual, 1);
}, count: 1));
}, timeout: Timeout(Duration(seconds: 5)));
test('역순으로 배출해야한다', () async {
// given
// when
final stream = Rx.range(3, 1);
// then
await expectLater(stream, emitsInOrder([3, 2, 1, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
Reapeat
Stream 이 성공적으로 종료될 때까지 지정된 횟수만큼 소스 스트림을 재생성하고 다시 수신할 Stream을 만듭니다.
🔻샘플 코드
Stream<String> Function(int) _getRepeatStream(String symbol) =>
(int repeatIndex) async* {
yield await Future.delayed(
const Duration(milliseconds: 20),
() => '$symbol$repeatIndex',
);
};
Stream<String> Function(int) _getErroneusRepeatStream(String symbol) =>
(int repeatIndex) {
return Stream.value('A0')
// Emit the error
.concatWith([Stream<String>.error(Error())]);
};
test('repeat count 3일때 3번 반복 해야한다', () async {
// given
const repeatCount = 3;
var a = _getRepeatStream('A');
// when
final stream = Rx.repeat(a, repeatCount);
// then
await expectLater(
stream,
emitsInOrder(<dynamic>['A0', 'A1', 'A2', emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('반복 도중 에러가 발생하여도 지정된 횟수를 반복해야한다', () async {
// given
const repeatCount = 2;
var a = _getErroneusRepeatStream('A');
// when
final streamWithError = Rx.repeat(a, repeatCount);
// then
await expectLater(
streamWithError,
emitsInOrder(<dynamic>[
'A0',
emitsError(TypeMatcher<Error>()),
'A0',
emitsError(TypeMatcher<Error>()),
emitsDone
]));
});
Retry
Stream이 성공적으로 종료될 때까지 지정된 횟수만큼 소스 스트림을 재생성하고 다시 수신할 Stream을 만듭니다.
재시도 횟수를 지정하지 않으면 무기한 재 시도합니다. 재시도 횟수에 도달했지만 스트림이 성공적으로 종료되지 않은 경우 RetryError 가 발생합니다.
RetryError에는 오류를 일으킨 모든 오류 및 StackTrace가 포함됩니다.
🔻샘플 코드
Stream<int> Function() _getRetryStream(int failCount) {
var count = 0;
return () {
if (count < failCount) {
count++;
return Stream<int>.error(Error(), StackTrace.fromString('S'));
} else {
return Stream.value(1);
}
};
}
test('3번 재시도 해야한다', () async {
// given
const retries = 3;
var a = _getRetryStream(retries);
// when
final stream = Rx.retry(a, retries);
await expectLater(
stream,
emitsInOrder(<dynamic>[1, emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('무한 재시도해야한다', () async {
// given
const retries = 1000;
var a = _getRetryStream(retries);
// when
final stream = Rx.retry(a);
await expectLater(
stream,
emitsInOrder(<dynamic>[1, emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
RetryWhen
에러가 발생하였을 때 소스 스트림을 재생성하고 다시들을 스트림을 만듭니다.
소스 스트림에 오류가 발생하거나 완료되면 스트림이 종료됩니다.
retryWhenFactory오류를 방출하는 RetryError가 발생합니다.
RetryError는 실패를 일으킨 모든 Error 및 StackTrace를 포함합니다.
🔻샘플 코드
Stream<int> Function() _sourceStream(int i, [int throwAt]) {
return throwAt == null
? () => Stream.fromIterable(range(i))
: () =>
Stream.fromIterable(range(i)).map((i) => i == throwAt ? throw i : i);
}
Stream<int> _alwaysThrow(dynamic e, StackTrace s) =>
Stream<int>.error(Error(), StackTrace.fromString('S'));
Stream<void> _neverThrow(dynamic e, StackTrace s) => Stream.value('');
Iterable<int> range(int startOrStop, [int stop, int step]) sync* {
final start = stop == null ? 0 : startOrStop;
stop ??= startOrStop;
step ??= 1;
if (step == 0) throw ArgumentError('step cannot be 0');
if (step > 0 && stop < start) {
throw ArgumentError('if step is positive,'
' stop must be greater than start');
}
if (step < 0 && stop > start) {
throw ArgumentError('if step is negative,'
' stop must be less than start');
}
for (var value = start;
step < 0 ? value > stop : value < stop;
value += step) {
yield value;
}
}
main() {
test('retryWhen 에러가 발생하지 않았을때', () {
// givne
var a = _sourceStream(3);
var whenFactory = _alwaysThrow;
// when
var stream = Rx.retryWhen(a, whenFactory);
//then
expect(
stream,
emitsInOrder(<dynamic>[0, 1, 2, emitsDone]),
);
});
test('retryWhen Stream 에러가 발생하지 않았을때', () {
// givne
var a = _sourceStream(3);
var whenFactory = _alwaysThrow;
// when
var stream = Rx.retryWhen(a, whenFactory);
//then
expect(
stream,
emitsInOrder(<dynamic>[0, 1, 2, emitsDone]),
);
});
test('retryWhen 에러발생시에 whenFactory에서 다시 스트림으로 변환 하여, 무한으로 재시도 해야한다', () {
// given
var a = _sourceStream(1000, 2);
var whenFactory = _neverThrow;
// when
final stream = Rx.retryWhen(a, whenFactory).take(6);
// then
expect(
stream,
emitsInOrder(<dynamic>[0, 1, 0, 1, 0, 1, emitsDone]),
);
});
test('retryWhen 에러발생시에 whenFactory에서도 에러가 발생하면 RetryError를 반환해야한다', () {
// given
var a = _sourceStream(3, 0);
var whenFactory = _alwaysThrow;
// when
final streamWithError = Rx.retryWhen(a, whenFactory);
// then
expect(
streamWithError,
emitsInOrder(<dynamic>[emitsError(TypeMatcher<RetryError>()), emitsDone]),
);
});
test('retryWhen 에러발생시에 whenFactory에서도 에러가 발생하면 RetryError를 반환해야한다', () async {
// given
var a = _sourceStream(3, 0);
var whenFactory = _alwaysThrow;
// when
final streamWithError = Rx.retryWhen(a, whenFactory);
// then
await expectLater(
streamWithError,
emitsInOrder(<dynamic>[
emitsError(
predicate<RetryError>((a) {
return a.errors.length == 1 &&
a.errors
.every((es) => es.error != null && es.stackTrace != null);
}),
),
emitsDone,
]));
});
SequenceEqual
두 스트림이 동일한 순서의 항목을 방출하는지 확인합니다. 등식을 결정하기 위해 선택적인 등호 처리기를 제공할 수 있습니다.
🔻샘플 코드
test('SequenceEqual 두 스트림이 같아야한다', () {
// given
var a = Stream.fromIterable([0, 1, 2, 3, 4]);
var b = Stream.fromIterable([0, 1, 2, 3, 4]);
// when
final stream = Rx.sequenceEqual(a, b);
// then
expect(stream, emitsInOrder([true]));
}, timeout: Timeout(Duration(seconds: 5)));
test('시간차가 있어도 두스트림이 같은지 비교해야한다', () {
// given
var a = Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1)
.take(5);
var b = Stream.fromIterable(const [1, 2, 3, 4, 5]);
// when
final stream = Rx.sequenceEqual(a, b);
// then
expect(stream, emitsInOrder([true]));
}, timeout: Timeout(Duration(seconds: 5)));
test('비교 조건을 커스텀하여 항상 true일때 비교값은 true이다', () {
// given
var a = Stream.fromIterable(const [1, 1, 1, 1, 1]);
var b = Stream.fromIterable(const [2, 2, 2, 2, 2]);
// when
final stream = Rx.sequenceEqual(a, b, equals: (int a, int b) => true);
// then
expect(stream, emitsInOrder([true]));
}, timeout: Timeout(Duration(seconds: 5)));
});
test('비교하여 같지않아야한다', () async {
// given
var a = Stream.fromIterable(const [1, 1, 1, 1, 1]);
var b = Stream.fromIterable(const [2, 2, 2, 2, 2]);
// when
final stream = Rx.sequenceEqual(a, b);
// then
expect(stream, emitsInOrder([false]));
}, timeout: Timeout(Duration(seconds: 5)));
test('에러가 같은지 비교해야한다', () async {
// given
var a = Stream<void>.error(ArgumentError('error A'));
var b = Stream<void>.error(ArgumentError('error A'));
// when
final stream = Rx.sequenceEqual(a, b);
// then
expect(stream, emitsInOrder([true]));
}, timeout: Timeout(Duration(seconds: 5)));
test('에러가 같은지 비교하여 달라야한다', () async {
// given
var a = Stream<void>.error(ArgumentError('error A'));
var b = Stream<void>.error(ArgumentError('error B'));
// when
final stream = Rx.sequenceEqual(a, b);
// then
expect(stream, emitsInOrder([false]));
}, timeout: Timeout(Duration(seconds: 5)));
SwitchLatest
상위 스트림에서 가장 최근 방출된 스트림의 항목만을 방출하는 용도
이 스트림은 소스 스트림에서 새 스트림이 방출될 때 이전에 방출된 스트림에서 구독을 취소합니다.
상위 스트림 ex) Stream.value(Stream.value())
🔻샘플 코드
Stream<Stream<String>> get testStream => Stream.fromIterable([
Rx.timer('A', Duration(seconds: 2)),
Rx.timer('B', Duration(seconds: 1)),
Stream.value('C'),
]);
test('상위 스트림의에서 리스트데이터를 방출', () {
// given
var a = Stream.value(Stream.fromIterable(const ['A', 'B', 'C']));
// when
final stream = Rx.switchLatest(a);
// then
expect(
stream,
emitsInOrder(<dynamic>['A', 'B', 'C', emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('상위스트림의 가장 최근값이 먼저 방출되어야한다', () {
// given
// when
final stream = Rx.switchLatest(testStream);
// then
expect(
stream,
emits('C'),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('상위 스트림에서 방출된 오류를 방출해야한다', () {
// given
var a = Stream<Stream<void>>.error(Exception());
// when
final stream = Rx.switchLatest(a);
// then
expect(
stream,
emitsError(isException),
);
}, timeout: Timeout(Duration(seconds: 5)));
Timer
지정된 시간이 지나면 주어진 값을 방출합니다.
🔻샘플 코드
test('타이머', () async {
// given
const value = 1;
// when
final stream = Rx.timer(value, Duration(milliseconds: 1));
// then
await expectLater(stream, emitsInOrder(<dynamic>[value, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
Zip
각각 스트림이 최소 1개씩 값을 방출할 때마다
지정된 지퍼 함수를 사용하여 지정된 스트림을 하나의 스트림 시퀀스로 병합합니다.
🔻샘플 코드
test('Zip', () async {
// given
var a = Stream.fromIterable(['A']),
b = Stream.fromIterable(['B']),
c = Stream.fromIterable(['C', 'D']);
// when
final stream = Rx.zip([a, b, c], (values) => values);
// then
await expectLater(
stream,
emitsInOrder([
['A', 'B', 'C'],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('Zip N개의(특정수의) 스트림 존재', () async {
// given
var a = Stream.fromIterable(['1']), b = Stream.fromIterable(['2', '3']);
// when
final stream = Rx.zip2(a, b, (a, b) => a + b);
// then
await expectLater(stream, emitsInOrder(['12', emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
test('Zip 스트림중 에러가 포함되어있으면 에러를 방출한다', () async {
// given
var a = Stream.value(1),
b = Stream.value(1),
c = Stream<int>.error(Exception());
// when
final stream = Rx.zip3(a, b, c, (a, b, c) => a + b + c);
// then
await expectLater(stream, emitsError(TypeMatcher<Exception>()));
}, timeout: Timeout(Duration(seconds: 5)));
Transformer(변형) 배압 함수
Window
소스 Stream에서 수집 한 항목의 `창`을 내보내는 Stream을 반환합니다.
출력 Stream은 겹치지 않는 연결된 `창`을 내 보냅니다.
Stream 항목을 내보낼 때마다 현재 `창`을 내보내고 새 `창`을 엽니다.
각 `윈도(창)`는 Stream이므로 출력은 상위 Stream입니다.( Stream <Stream>())
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms
// source | 0 1 2 3 4
// window | 0 1 2 3
🔻샘플 코드
test('160ms마다 새 창을 만들어야한다', () async {
// given
var a = getStream(4);
// when
Stream<List<int>> result = a
.window(
Stream<Null>.periodic(const Duration(milliseconds: 160)).take(3))
.asyncMap((stream) => stream.toList());
// then
expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [2, 3],
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
WindowCount
소스 Stream에서 여러 값을 count, 버퍼링 한 다음 `창` a로 내보내고
Stream은 각 startBufferEvery값마다 새 `창`을 시작합니다.
경우에는 startBufferEvery 제공하지 않으면
새로운 count개수 때마다 창이 닫히고 방출됩니다
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('2개를 카운트하여 새 창을 열어야한다', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result =
a.windowCount(2).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [2, 3],
const [4],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('2개를 카운트하여 새 창을 열고, startBufferEvery값부터 다시 창을 열기 시작해야한다 ', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result =
a.windowCount(2, 1).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [1, 2],
const [2, 3],
const [3, 4],
const [4],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('windowCount count3, startBufferEvery2', () async {
// given
var a = Rx.range(0, 8);
// when
Stream<List<int>> result =
a.windowCount(3, 2).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1, 2],
const [2, 3, 4],
const [4, 5, 6],
const [6, 7, 8],
const [8],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('windowCount count3, startBufferEvery4', () async {
// given
var a = Rx.range(0, 8);
// when
Stream<List<int>> result =
a.windowCount(3, 4).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1, 2],
const [4, 5, 6],
const [8],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
WindowTest
항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 작성하고 테스트(조건)를 통과될 때마다 일괄 처리합니다.
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('지정한 조건마다 창을열어야한다', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result =
a.windowTest((i) => i % 2 == 0).asyncMap((stream) => stream.toList());
// then
expectLater(
result,
emitsInOrder(<dynamic>[
const [0],
const [1, 2],
const [3, 4],
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('windowTest Transformer함수 사용', () async {
// given
var a = Rx.range(0, 4);
final transformer = WindowTestStreamTransformer<int>((i) => i % 2 == 0);
// when
Stream<List<int>> result =
a.transform(transformer).asyncMap((stream) => stream.toList());
// then
expectLater(
result,
emitsInOrder(<dynamic>[
const [0],
const [1, 2],
const [3, 4],
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
WindowTime
각 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 생성하고
주어진 시간마다 샘플링하여 창을 내보냅니다.
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms
// source | 0 1 2 3 4
// window | 0 1 2 3
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('160ms마다 창을 새로 열어야한다', () async {
// given
var a = getStream(4);
// when
Stream<List<int>> result = a
.windowTime(Duration(milliseconds: 160))
.asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [2, 3],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
Buffer
각 항목을 Buffer에 쌓아 List로 방출하는 스트림을 만들어 window이벤트를 방출합니다.
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('160ms동안 버퍼에 쌓아 List로 방출해야한다', () async {
// given
var a = getStream(4);
// when
final result = a.buffer(
Stream<Null>.periodic(const Duration(milliseconds: 160)).take(3));
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [2, 3],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
BufferCount
소스 스트림에서 count만큼 버퍼링 한 다음 버퍼를 내보내고 지운 후
Stream은 각 startBufferEvery값마다 새 버퍼를 시작합니다.
startBufferEvery 제공하지 경우에는, 새로운 버퍼는 소스의 개시 때마다 버퍼가 닫히고 즉시 방출됩니다.
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('2개 마다 buffer에 쌓아 내보내야한다', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result =
a.bufferCount(2).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [2, 3],
const [4],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('2개씩 buffer에 쌓아 내보낸뒤, startBufferEvery값부터 새 버퍼링을 시작해야한다', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result =
a.bufferCount(2, 1).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [1, 2],
const [2, 3],
const [3, 4],
const [4],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('bufferCount count3, startBufferEvery2', () async {
// given
var a = Rx.range(0, 8);
// when
Stream<List<int>> result =
a.bufferCount(3, 2).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1, 2],
const [2, 3, 4],
const [4, 5, 6],
const [6, 7, 8],
const [8],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
test('bufferCount count3, startBufferEvery4', () async {
// given
var a = Rx.range(0, 8);
// when
Stream<List<int>> result =
a.bufferCount(3, 4).asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1, 2],
const [4, 5, 6],
const [8],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
BufferTest
각 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 작성하고 테스트(조건)를 통과될 때마다 일괄 처리합니다.
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('테스트 조건을 통과할때까지 buffer에 쌓고, 조건을 통과하면 방출해야한다', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result =
a.bufferTest((i) => i % 2 == 0).asyncMap((stream) => stream.toList());
// then
expectLater(
result,
emitsInOrder(<dynamic>[
const [0],
const [1, 2],
const [3, 4],
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('BufferTest Transformer사용', () async {
// given
var a = Rx.range(0, 4);
final transformer = BufferTestStreamTransformer<int>((i) => i % 2 == 0);
// when
Stream<List<int>> result =
a.transform(transformer).asyncMap((stream) => stream.toList());
// then
expectLater(
result,
emitsInOrder(<dynamic>[
const [0],
const [1, 2],
const [3, 4],
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
BufferTime
각 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 생성하고
주어진 시간마다 샘플링하여 창을 내보냅니다.
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms
// source | 0 1 2 3 4
// window | 0 1 2 3
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('지정된 시간(160ms)동안 버퍼에 쌓고 방출해야한다', () async {
// given
var a = getStream(4);
// when
Stream<List<int>> result = a
.bufferTime(Duration(milliseconds: 160))
.asyncMap((stream) => stream.toList());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [2, 3],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
Debounce
Stream소스 시퀀스가 다른 항목을 방출하지 않고 완료된 경우에만 소스 시퀀스에서 항목을 방출하도록을 변환합니다.
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms 650ms 700ms 750ms 800ms 850ms 900ms
// source | 1 2 3 4
// debounce | ------------------------------------> ----------------------------------> ----------------------------------> ---------------------------------> 4
🔻샘플 코드
Stream<int> getStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 100), () => controller.add(1));
Timer(const Duration(milliseconds: 200), () => controller.add(2));
Timer(const Duration(milliseconds: 300), () => controller.add(3));
Timer(const Duration(milliseconds: 400), () {
controller.add(4);
controller.close();
});
return controller.stream;
}
test('200ms동안 값이 방출되지 않았을때, 값을 방출한다 ', () async {
// given
var a = getStream();
// when
final result = a.debounce((_) => Stream<void>.fromFuture(
Future<void>.delayed(const Duration(milliseconds: 200))));
// then
await expectLater(result, emitsInOrder(<dynamic>[4, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
DebounceTime
DebounceTimeStream소스 시퀀스가 지정한 시간 동안
다른 항목을 방출하지 않고 완료된 경우에만 소스 시퀀스에서 항목을 방출하도록을 변환합니다.
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms 650ms 700ms 750ms 800ms 850ms 900ms
// source | 1 2 3 4
// debounce | ------------------------------------> ----------------------------------> ----------------------------------> ---------------------------------> 4
🔻샘플 코드
Stream<int> getStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 100), () => controller.add(1));
Timer(const Duration(milliseconds: 200), () => controller.add(2));
Timer(const Duration(milliseconds: 300), () => controller.add(3));
Timer(const Duration(milliseconds: 400), () {
controller.add(4);
controller.close();
});
return controller.stream;
}
test('200ms동안 값이 방출되지 않았을때, 값을 방출한다 ', () async {
// given
var a = getStream();
// when
final result = a.debounceTime(Duration(milliseconds: 200));
// then
await expectLater(result, emitsInOrder(<dynamic>[4, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
Sample
sampleStream에서 방출된 Stream이후 소스에서 방출된 가장 최근에 방출된 항목 (있는 경우)을 방출합니다.
// sample
// time 0ms 20ms 40ms 60ms 80ms 100ms 120ms 140ms
// source | 0 1 2 3 4
// sampleStream | 1 3 4
// s_event | O O o
🔻샘플 코드
Stream<int> getStream() =>
Stream<int>.periodic(const Duration(milliseconds: 20), (count) => count)
.take(5);
Stream<int> getSampleStream() =>
Stream<int>.periodic(const Duration(milliseconds: 35), (count) => count)
.take(10);
test('샘플 스트림이 방출될때마다 소스스트림에서 가장 최근에 방출한 값을 방출한다', () async {
// given
var a = getStream();
// when
final result = a.sample(getSampleStream());
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
1,
3,
4,
emitsDone,
]));
}, timeout: Timeout(Duration(seconds: 5)));
SampleTime
Stream반복 방출 시간 범위 내에서 이전 방출 이후 소스에서 방출된 가장 최근에 방출된 값이 있는 경우 방출합니다.
// sample
// time 0ms 20ms 40ms 60ms 80ms 100ms 120ms 140ms
// source | 0 1 2 3 4
// sampleStream | 1 3 4
// s_event | O O o
🔻샘플 코드
Stream<int> getStream() =>
Stream<int>.periodic(const Duration(milliseconds: 20), (count) => count)
.take(5);
test('SampleTime', () async {
// given
// when
final stream = getStream().sampleTime(const Duration(milliseconds: 35));
// then
await expectLater(stream, emitsInOrder(<dynamic>[1, 3, 4, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
Throttle (ThrottleFirst)
열려있는 Stream동안 소스에서 방출 한 첫 번째 항목만 방출합니다
trailing 값을 사용하여 다음 처음 스로틀을 시작할 시간을 결정할 수 있습니다.
// throttle
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms 650ms 700ms
// source | 1 2 3 4 5 6 7
// throttle | 1 4 7
// t_event | O O O
// throttle trailing=true
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms 650ms 700ms
// source | 1 2 3 4 5 6
// throttle | 3 6
// t_event | O O
🔻샘플 코드
Stream<int> getStream() =>
Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1).take(10);
test('throttle스트림이 값을 방출할때마다 소스스트림의 첫 번째 항목을 방출해야한다', () async {
// given
var a = getStream();
// when
Stream<int> result = a
.throttle(
(_) => Stream<void>.periodic(const Duration(milliseconds: 250)))
.take(3);
// then
expectLater(
result,
emitsInOrder(<dynamic>[
1,
4,
7,
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('trailing=true이면 throttle을 시간만큼 시작시간이 지연되서 시작되야한다', () async {
// given
var a = getStream();
// when
Stream<int> result = a
.throttle(
(_) => Stream<void>.periodic(const Duration(milliseconds: 250)),
trailing: true,
)
.take(3);
// then
expectLater(
result,
emitsInOrder(<dynamic>[
3,
6,
9,
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
ThrottleTime
Stream의 시간 범위 내에 소스에서 방출 한 첫 번째 항목만 표시합니다.
// throttle
// time 0ms 100ms 150ms 200ms 250ms 300ms 350ms 400ms 450ms 500ms 550ms 600ms 650ms 700ms
// source | 1 2 3 4 5 6 7
// throttle | 1 4 7
// t_event | O O O
🔻샘플 코드
Stream<int> getStream() =>
Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1).take(10);
test('throttleTime', () async {
// given
var a = getStream();
// when
Stream<int> result = a.throttleTime(Duration(milliseconds: 250)).take(3);
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
1,
4,
7,
emitsDone,
]));
}, timeout: Timeout(Duration(seconds: 5)));
Transformer(변형) 함수
ConcatWith
현재 Stream에서 모든 항목을 내 보낸 다음 지정된 스트림의 모든 항목을 차례로 내보내는 Stream을 반환합니다.
🔻샘플 코드
test('소스스트림을 내보낸뒤 지정된 스트림을 순서대로 내보내야한다', () async {
// given
final delayedStream = Rx.timer(1, Duration(milliseconds: 10));
final immediateStream = Stream.value(2);
// when
final result = delayedStream.concatWith([immediateStream]);
// then
await expectLater(result, emitsInOrder(<dynamic>[1, 2, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
StartWith
소스 스트림이 값을 방출할 때, 방출 값 앞에 값을 추가합니다
🔻샘플 코드
Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
test('소스스트림이 값을 방출할때, 앞에 0을 추가해아한다 ', () async {
// given
final a = _getStream();
// when
final result = a.startWith(0);
// then
await expectLater(result, emitsInOrder([0, 1, 2, 3, 4]));
}, timeout: Timeout(Duration(seconds: 5)));
StartWithMany
소스 스트림이 값을 방출할 때, 방출 값 앞에 List 값을 추가합니다
🔻샘플 코드
Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
test('소스스트림이 값을 방출할때, 앞에 -1,0을 추가해아한다', () async {
// given
final a = _getStream();
// when
final result = a.startWithMany([-1, 0]);
// then
await expectLater(result, emitsInOrder([-1, 0, 1, 2, 3, 4]));
}, timeout: Timeout(Duration(seconds: 5)));
EndWith
소스 스트림이 값을 방출한 후, 방출한 값 뒤에 값을 추가합니다
🔻샘플 코드
Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
test('소스트림이 값을 방출한후 ,5을 추가해야한다', () async {
// given
final a = _getStream();
// when
final result = a.endWith(5);
// then
await expectLater(result, emitsInOrder([1, 2, 3, 4, 5]));
}, timeout: Timeout(Duration(seconds: 5)));
EndWithMany
소스 Stream이 닫기 전에 값 List를 최종 이벤트로 소스에 추가합니다.
🔻샘플 코드
Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
test('소스스트림이 값을 방출한 후 5,6을 방출해야한다', () async {
// given
final a = _getStream();
// when
final result = a.endWithMany([5, 6]);
// then
await expectLater(result, emitsInOrder([1, 2, 3, 4, 5, 6]));
}, timeout: Timeout(Duration(seconds: 5)));
}
ZipWith
주어진 지퍼 함수를 사용하여 현재 스트림을 다른 스트림과 결합하는 Stream을 반환합니다.
🔻샘플 코드
test('ZipWith', () async {
// given
final a = Stream<int>.value(1);
// when
final result =
a.zipWith(Stream<int>.value(2), (int one, int two) => one + two);
// then
await expectLater(result, emitsInOrder([3]));
}, timeout: Timeout(Duration(seconds: 5)));
MergeWith
여러 스트림에서 내 보낸 항목을 단일 항목 스트림으로 결합합니다. 항목은 소스에서 방출되는 순서대로 방출됩니다.
🔻샘플코드
test('mergeWith', () async {
//given
final delayedStream = Rx.timer(1, Duration(milliseconds: 10));
final immediateStream = Stream.value(2);
//when
final result = delayedStream.mergeWith([immediateStream]);
//then
await expectLater(result, emitsInOrder([2, 1]));
}, timeout: Timeout(Duration(seconds: 5)));
DefaultIfEmpty
소스 스트림에서 아무것도 내 보내지 않는 경우 단일 기본 항목을 내 보냅니다.
🔻샘플코드
test('소스스트림이 값을 방출하지 않았을때, default값(true_를 내보내야한다', () async {
// given
var a = Stream<bool>.empty();
// when
final result = a.defaultIfEmpty(true);
// then
await expectLater(result, emitsInOrder(<dynamic>[true, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
test('소스스트림이 값을 방출했을때, default값을 내보내지않아야한다', () async {
// given
var a = Stream.fromIterable(const [false, false, false]);
// when
final result = a.defaultIfEmpty(true);
// then
await expectLater(
result,
emitsInOrder(<dynamic>[false, false, false, emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
SwitchIfEmpty
원래 Stream이 항목을 내 보내지 않으면이 연산자는 지정된 대체 스트림을 구독하고 대신 해당 Stream에서 항목을 내 보냅니다.
이는 여러 소스의 데이터를 사용할 때 특히 유용할 수 있습니다.
예를 들어, 레포지토리 패턴을 사용할 때. 로드해야 할 데이터가 있다고 가정하면
가장 빠른 액세스 포인트로 시작하고 가장 느린 포인트로 계속 돌아가는 것이 좋습니다.
예를 들어 먼저 메모리 내 데이터베이스를 쿼리 한 다음 파일 시스템의 데이터베이스를 쿼리 한 다음
데이터가 로컬 시스템에 없으면 네트워크 호출을 쿼리 합니다.
이것은 switchIfEmpty로 아주 간단하게 달성할 수 있습니다
ex)
Stream<Data> memory;
Stream<Data> disk;
Stream<Data> network;
Stream<Data> getThatData =
memory.switchIfEmpty(disk).switchIfEmpty(network);
🔻샘플 코드
test('switchIfEmpty is Empty', () async {
// given
final a = Stream<int>.empty();
// when
final result = a.switchIfEmpty(Stream.value(1));
// then
await expectLater(
result,
emitsInOrder(<dynamic>[1, emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('switchIfEmpty is Not Empty', () async {
// given
final a = Stream.value(99);
// when
final result = a.switchIfEmpty(Stream.value(1));
// then
await expectLater(
result,
emitsInOrder(<dynamic>[99, emitsDone]),
);
}, timeout: Timeout(Duration(seconds: 5)));
Distinct
현재 스트림에서 중복을 제거합니다.
🔻샘플코드
test('distinct', () async {
// given
final a = Stream.fromIterable(const [1, 1]);
// when
final result = a.distinct();
// then
await expectLater(result, emitsInOrder(<dynamic>[1, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
DistinctUnique
데이터 이벤트가 이전에 이미 발생한 경우 건너뛰는 Stream을 만듭니다.
같음은 제공된 equals 및 hashCode 메서드에 의해 결정됩니다.
생략하면 마지막으로 제공된 데이터 요소의 '=='연산자와 hashCode가 사용됩니다.
이 스트림이 있는 경우 반환된 스트림은 브로드 캐스트 스트림입니다.
브로드 캐스트 스트림이 두 번 이상 수신되는 경우 각 구독은 equals 및 hashCode 테스트를 개별적으로 수행합니다.
🔻샘플 코드
class _TestObject {
final String key;
const _TestObject(this.key);
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is _TestObject &&
runtimeType == other.runtimeType &&
key == other.key;
@override
int get hashCode => key.hashCode;
@override
String toString() => key;
}
test('distinctUnique (클래스의 equals 및 hascode와 함께 작동)', () async {
// given
final a = Stream.fromIterable(const [
_TestObject('a'),
_TestObject('a'),
_TestObject('b'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a')
]);
// when
final result = a.distinctUnique();
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const _TestObject('a'),
const _TestObject('b'),
const _TestObject('c'),
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('distinctUnique (조건으로 제공된 equals 및 해시 코드로 작동)', () async {
// given
final a = Stream.fromIterable(const [
_TestObject('a'),
_TestObject('a'),
_TestObject('b'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a')
]);
// when
final result = a.distinctUnique(
equals: (a, b) => a.key == b.key, hashCode: (o) => o.key.hashCode);
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const _TestObject('a'),
const _TestObject('b'),
const _TestObject('c'),
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
FlatMap
주어진 매퍼 함수를 사용하여 방출된 각 항목을 Stream으로 변환합니다.
새로 생성된 Stream이 수신되고 항목을 다운 스트림으로 방출하기 시작합니다.
각 스트림에서 방출되는 항목은 도착한 순서대로 다운 스트림으로 방출됩니다. 즉, 시퀀스가 함께 병합됩니다.
🔻샘플 코드
class _TestObject {
final String key;
const _TestObject(this.key);
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is _TestObject &&
runtimeType == other.runtimeType &&
key == other.key;
@override
int get hashCode => key.hashCode;
@override
String toString() => key;
}
test('distinctUnique (클래스의 equals 및 hascode와 함께 작동)', () async {
// given
final a = Stream.fromIterable(const [
_TestObject('a'),
_TestObject('a'),
_TestObject('b'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a')
]);
// when
final result = a.distinctUnique();
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const _TestObject('a'),
const _TestObject('b'),
const _TestObject('c'),
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('distinctUnique (조건으로 제공된 equals 및 해시 코드로 작동)', () async {
// given
final a = Stream.fromIterable(const [
_TestObject('a'),
_TestObject('a'),
_TestObject('b'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a'),
_TestObject('b'),
_TestObject('c'),
_TestObject('a')
]);
// when
final result = a.distinctUnique(
equals: (a, b) => a.key == b.key, hashCode: (o) => o.key.hashCode);
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const _TestObject('a'),
const _TestObject('b'),
const _TestObject('c'),
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
FlatMapIteratorble
소스 스트림이 방출될 때마다 Stream <List>로 변환하여 방출합니다
🔻샘플 코드
test('flatMapIteratorble', () async {
//given
var a = Rx.range(1, 4);
//when
Stream<int> stream =
a.flatMapIterable((int i) => Stream<List<int>>.value(<int>[i, i]));
//then
await expectLater(
stream, emitsInOrder(<dynamic>[1, 1, 2, 2, 3, 3, 4, 4, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
SwitchMap
주어진 매퍼 함수를 사용하여 방출된 각 항목을 Stream으로 변환합니다.
새로 생성된 스트림은 항목을 수신하고 방출을 시작하며 이전에 생성된 스트림은 방출을 중지합니다.
switchMap 연산자는 flatMap 및 concatMap 메서드와 유사하지만 가장 최근에 생성된 Stream에서만 항목을 내 보냅니다.
예를 들어 비동기 API의 최신 상태만 원할 때 유용할 수 있습니다.
🔻샘플 코드
Stream<int> _getStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 10), () => controller.add(1));
Timer(const Duration(milliseconds: 20), () => controller.add(2));
Timer(const Duration(milliseconds: 30), () => controller.add(3));
Timer(const Duration(milliseconds: 40), () {
controller.add(4);
controller.close();
});
return controller.stream;
}
Stream<int> _getOtherStream(int value) {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 15), () => controller.add(value + 1));
Timer(const Duration(milliseconds: 25), () => controller.add(value + 2));
Timer(const Duration(milliseconds: 35), () => controller.add(value + 3));
Timer(const Duration(milliseconds: 45), () {
controller.add(value + 4);
controller.close();
});
return controller.stream;
}
test('소스스트림과 다른스트림중, 가장 최근에 값을 방출한 스트림의 값들만 방출해야한다', () async {
//given
var a = _getStream();
//when
final result = a.switchMap(_getOtherStream);
//then
await expectLater(
result,
emitsInOrder(
[5, 6, 7, 8],
),
);
}, timeout: Timeout(Duration(seconds: 5)));
ExhaustMap
지정된 매퍼를 사용하여 소스 스트림의 항목들을 Stream으로 변환합니다.
새 스트림이 완료될 때까지 소스 스트림의 모든 항목을 무시합니다.
소스 스트림의 이전 비동기 작업이 완료된 후에 만 응답하려는 경우 유용합니다.
🔻샘플 코드
test('MapperStream을 방출하는 동안 소스스트림을 무시해야한다', () async {
//given
var calls = 0;
var a = Rx.range(0, 9);
//when
final stream = a.exhaustMap((i) {
calls++;
return Rx.timer(i, Duration(milliseconds: 100));
});
//then
await expectLater(stream, emitsInOrder(<dynamic>[0, emitsDone]));
await expectLater(calls, 1);
}, timeout: Timeout(Duration(seconds: 5)));
test('MapperStream을 방출하는 동안 소스스트림을 무시해야한다 2', () async {
//given
var a = Stream.fromIterable(const [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
//when
final stream =
a.interval(Duration(milliseconds: 30)).exhaustMap((i) async* {
yield await Future.delayed(Duration(milliseconds: 70), () => i);
});
//then
await expectLater(stream, emitsInOrder(<dynamic>[0, 3, 6, 9, emitsDone]));
}, timeout: Timeout(Duration(seconds: 5)));
MapTo
소스 Stream이 값을 내보낼 때마다, 무조건 출력 Stream에서 주어진 상수 값을 내 보냅니다.
🔻샘플 코드
test('mapTo', () async {
//given
var a = Rx.range(1, 4);
//when
final result = a.mapTo(true);
//then
await expectLater(
result,
emitsInOrder(<dynamic>[
true,
true,
true,
true,
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('mapTo with Error', () async {
//given
var a = Rx.range(1, 4).concatWith([Stream<int>.error(Error())]);
//when
final result = a.mapTo(true);
//then
await expectLater(
result,
emitsInOrder(<dynamic>[
true,
true,
true,
true,
emitsError(TypeMatcher<Error>()),
emitsDone,
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
GroupBy
각 값을 그룹 지어 GroupByStream으로 방출합니다
GroupByStream은 일반 스트림처럼 작동하지만
Function Type에서 값 을받는 'key'속성이 있습니다.
🔻샘플 코드
String _toEventOdd(int value) => value == 0 ? 'even' : 'odd';
test('groupBy', () async {
//given
var a = Stream.fromIterable([1, 2, 3, 4]);
//when
final result = a.groupBy((value) => value);
//then
await expectLater(
result,
emitsInOrder(<Matcher>[
TypeMatcher<GroupByStream<int, int>>()
.having((stream) => stream.key, 'key', 1),
TypeMatcher<GroupByStream<int, int>>()
.having((stream) => stream.key, 'key', 2),
TypeMatcher<GroupByStream<int, int>>()
.having((stream) => stream.key, 'key', 3),
TypeMatcher<GroupByStream<int, int>>()
.having((stream) => stream.key, 'key', 4),
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('groupBy, Map으로 각각 방출', () async {
//given
var a = Stream.fromIterable([1, 2, 3, 4]);
//when
final result = a
.groupBy((value) => _toEventOdd(value % 2))
.flatMap((stream) => stream.map((event) => {stream.key: event}));
//then
await expectLater(
result,
emitsInOrder(<dynamic>[
{'odd': 1},
{'even': 2},
{'odd': 3},
{'even': 4},
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
test('groupBy, Map으로 묶어 방출', () async {
//given
var a = Stream.fromIterable([1, 2, 3, 4]);
//when
final result = a
.groupBy((value) => _toEventOdd(value % 2))
.map((stream) async => await stream.fold(
{stream.key: <int>[]},
(Map<String, List<int>> previous, element) {
return previous..[stream.key].add(element);
},
));
// fold is called when onDone triggers on the Stream
//then
await expectLater(
result,
emitsInOrder(<dynamic>[
{
'odd': [1, 3]
},
{
'even': [2, 4]
},
emitsDone
]),
);
}, timeout: Timeout(Duration(seconds: 5)));
Interval
지정된 기간마다 소스 스트림의 각 항목을 내보내는 Stream을 만듭니다.
🔻샘플 코드
Stream<int> _getStream() => Stream.fromIterable(const [0, 1, 2, 3, 4]);
test('interval', () async {
//given
var a = _getStream();
const expectedOutput = [0, 1, 2, 3, 4];
var count = 0, lastInterval = -1;
final stopwatch = Stopwatch()..start();
//when
final result = a.interval(const Duration(milliseconds: 1));
//then
result.listen(
expectAsync1((result) {
expect(expectedOutput[count++], result);
if (lastInterval != -1) {
expect(stopwatch.elapsedMilliseconds - lastInterval >= 1, true);
}
lastInterval = stopwatch.elapsedMilliseconds;
}, count: expectedOutput.length),
onDone: stopwatch.stop);
}, timeout: Timeout(Duration(seconds: 5)));
Max
Stream에서 방출된 가장 큰 항목으로 완성되는 Future로 Stream을 변환합니다.
이는 목록에서 최대 값을 찾는 것과 유사하지만 값은 비동기 적입니다.
반환 값은 Future입니다
*
completion
성공적으로 완료된 Future와 일치하는 값을 찾습니다 matcher.
이것은 비동기 기대를 생성합니다. expect 호출이 즉시 반환되고 실행이 계속됩니다.
나중에 Future가 완료되면 기대치 matcher가 실행됩니다.
Future가 완료되고 실행될 것으로 예상되기를 기다리 려면 expectLater를 사용하고 반환된 Future를 기다립니다.
🔻샘플 코드
class _Class implements Comparable<_Class> {
final int value;
const _Class(this.value);
@override
String toString() => '_Class2{value: $value}';
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is _Class &&
runtimeType == other.runtimeType &&
value == other.value;
@override
int get hashCode => value.hashCode;
@override
int compareTo(_Class other) => value.compareTo(other.value);
}
Stream<int> _getStream() =>
Stream<int>.fromIterable(const <int>[2, 3, 3, 5, 2, 9, 1, 2, 0]);
test('max', () async {
//given
var a = _getStream();
//when
final result = a.max();
//then
await expectLater(result, completion(9));
expect(
await Stream.fromIterable(<num>[1, 2, 3, 3.5]).max(),
3.5,
);
}, timeout: Timeout(Duration(seconds: 5)));
test('max with Comparable', () async {
//given
const expected = _Class(3);
var a = Stream.fromIterable(const [
_Class(0),
expected,
_Class(2),
_Class(-1),
_Class(2),
]);
//when
final result = await a.max();
//then
expect(
result,
expected,
);
}, timeout: Timeout(Duration(seconds: 5)));
Min
Stream에서 내 보낸 가장 작은 항목으로 완료되는 Future로 Stream을 변환합니다.
이것은 List에서 최솟값을 찾는 것과 비슷하지만 값은 비동기 적입니다!
completion
성공적으로 완료된 Future와 일치하는 값을 찾습니다 matcher.
이것은 비동기 기대를 생성합니다. expect 호출이 즉시 반환되고 실행이 계속됩니다.
나중에 Future가 완료되면 기대치 matcher가 실행됩니다.
Future가 완료되고 실행될 것으로 예상되기를 기다리 려면 expectLater를 사용하고 반환된 Future를 기다립니다.
🔻샘플 코드
class _Class implements Comparable<_Class> {
final int value;
const _Class(this.value);
@override
String toString() => '_Class2{value: $value}';
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is _Class &&
runtimeType == other.runtimeType &&
value == other.value;
@override
int get hashCode => value.hashCode;
@override
int compareTo(_Class other) => value.compareTo(other.value);
}
Stream<int> _getStream() =>
Stream<int>.fromIterable(const <int>[2, 3, 3, 5, 2, 9, 1, 2, 0]);
test('min', () async {
//given
var a = _getStream();
//when
final result = a.min();
//then
await expectLater(result, completion(0));
expect(
await Stream.fromIterable(<num>[1, 2, 3, 3.5]).min(),
1,
);
}, timeout: Timeout(Duration(seconds: 5)));
test('min with Comparable', () async {
//given
const expected = _Class(-1);
var a = Stream.fromIterable(const [
_Class(0),
expected,
_Class(2),
_Class(3),
_Class(2),
]);
//when
final result = await a.min();
//then
expect(
result,
expected,
);
}, timeout: Timeout(Duration(seconds: 5)));
PairWise
n 번째 및 n-1 번째 이벤트를 쌍으로 표시합니다.
🔻샘플 코드
Stream<int> getStream(int n) async* {
var k = 0;
while (k < n) {
await Future<Null>.delayed(const Duration(milliseconds: 100));
yield k++;
}
}
test('Pairwise', () async {
// given
var a = Rx.range(0, 4);
// when
Stream<List<int>> result = a.pairwise();
// then
await expectLater(
result,
emitsInOrder(<dynamic>[
const [0, 1],
const [1, 2],
const [2, 3],
const [3, 4],
emitsDone
]));
}, timeout: Timeout(Duration(seconds: 5)));
SkipUntil
지정된 스트림이 항목을 방출한 후에 만 항목 방출을 시작합니다.
🔻샘플 코드
Stream<int> _getStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 100), () => controller.add(1));
Timer(const Duration(milliseconds: 200), () => controller.add(2));
Timer(const Duration(milliseconds: 300), () => controller.add(3));
Timer(const Duration(milliseconds: 400), () {
controller.add(4);
controller.close();
});
return controller.stream;
}
Stream<int> _getOtherStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 250), () {
controller.add(1);
controller.close();
});
return controller.stream;
}
test('other스트림이 값을 방출한 후에, 소스스트림값들만 방출해야한다', () async {
// given
final a = _getStream();
// when
final result = a.skipUntil(_getOtherStream());
// then
await expectLater(result, emitsInOrder([3, 4]));
}, timeout: Timeout(Duration(seconds: 5)));
TakeUntil
다른 Stream 시퀀스가 값을 생성할 때까지만
소스 Stream 시퀀스의 값을 반환합니다.
🔻샘플 코드
Stream<int> _getStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 100), () => controller.add(1));
Timer(const Duration(milliseconds: 200), () => controller.add(2));
Timer(const Duration(milliseconds: 300), () => controller.add(3));
Timer(const Duration(milliseconds: 400), () {
controller.add(4);
controller.close();
});
return controller.stream;
}
Stream<int> _getOtherStream() {
final controller = StreamController<int>();
Timer(const Duration(milliseconds: 250), () {
controller.add(1);
controller.close();
});
return controller.stream;
}
test('다른스트림의 값이 방출될때까지만, 소스스트림의 값들을 방출해야한다', () async {
// given
final a = _getStream();
// when
final result = a.takeUntil(_getOtherStream());
// then
await expectLater(result, emitsInOrder([1, 2]));
}, timeout: Timeout(Duration(seconds: 5)));
TakeWhileInclusive
각 값이 주어진 테스트(조건)를 충족할 때까지만 소스 스트림에서 값을 내 보냅니다.
테스트가 값으로 만족되지 않으면이 값을 모두 내보냅니다.
🔻샘플 코드
test('조건을 만족할때까지만 소스스트림의 값을 내보내야한다', () async {
// given
final a = Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3]);
// when
final result = a.takeWhileInclusive((i) => i < 4);
// then
await expectLater(result, emitsInOrder([2, 3]));
}, timeout: Timeout(Duration(seconds: 5)));
test('TakeWhilInclusive 조건에 일치하는 값이 없을때, 소스스트림의 첫 번째 값을 방출한다', () async {
// given
final a = Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3]);
// when
final result = a.takeWhileInclusive((i) => i > 9);
// then
await expectLater(result, emitsInOrder([2]));
}, timeout: Timeout(Duration(seconds: 5)));
Timestamp
소스 스트림에서 내 보낸 각 항목을 내 보낸 Timestamped항목과 항목이 내 보낸 시간을 포함하는 개체에 래핑 합니다.
🔻샘플 코드
test('timeStamp', () async {
// given
final a = Stream.fromIterable(const [1, 2, 3]);
// when
final result = a.timestamp();
// then
await expectLater(
result,
emitsInOrder([
TypeMatcher<Timestamped>(),
TypeMatcher<Timestamped>(),
TypeMatcher<Timestamped>(),
]));
// TimeStamp{timestamp: 2020-08-10 00:58:57.052087, value: 1}
// TimeStamp{timestamp: 2020-08-10 00:58:57.053311, value: 2}
// TimeStamp{timestamp: 2020-08-10 00:58:57.053397, value: 3}
}, timeout: Timeout(Duration(seconds: 5)));
WhereType
조건의 타입과 일치하지 않는 이벤트는 필터링되며 결과 Stream는 Type이 T 됩니다.
🔻샘플 코드
test('WhereType', () async {
// given
final a = Stream.fromIterable([1, 'two', 3, 'four']);
// when
final result = a.whereType<int>();
// then
await expectLater(result, emitsInOrder([1, 3]));
}, timeout: Timeout(Duration(seconds: 5)));
DoXX 함수
doOnData
스트림이 항목을 내보낼 때 지정된 콜백 함수를 호출합니다. 다른 rx구현에서는 이를 doOnNext라고 합니다.
🔻샘플 코드
test('아이템이 방출되었을때, doOnData가 호출되어야한다', () async {
//given
var onDataCalled = false;
var a = Stream.value(1);
//when
final stream = a.doOnData((_) => onDataCalled = true);
//then
await expectLater(stream, emits(1));
await expectLater(onDataCalled, isTrue);
});
test('브로드캐스트 스트림에서 doOnData는 1번만 호출되어야한다', () async {
//given
final actual = <int>[];
final controller = BehaviorSubject<int>(sync: true);
//when
final stream =
controller.stream.transform(DoStreamTransformer(onData: actual.add));
stream.listen(null);
stream.listen(null);
controller.add(1);
controller.add(2);
//then
await expectLater(actual, const [1, 2]);
await controller.close();
});
doOnDone
스트림이 항목 방출을 완료하면 지정된 콜백 함수를 호출합니다. 다른 rx구현에서는 이를 doOnComplete라고 합니다.
🔻샘플 코드
test('스트림이 종료되면 doOnDone이 호출되어야한다', () async {
//given
var onDoneCalled = false;
final a = Stream<void>.empty();
//when
final stream = a.doOnDone(() => onDoneCalled = true);
//then
await expectLater(stream, emitsDone);
await expectLater(onDoneCalled, isTrue);
});
doOnError
스트림에서 오류가 발생하면 지정된 콜백 함수를 호출합니다.
🔻샘플 코드
test('에러가 방출되었을때 doOnError가 호출되어야한다', () async {
//given
var onErrorCalled = false;
final a = Stream<void>.error(Exception());
//when
final stream = a.doOnError((dynamic e, dynamic s) => onErrorCalled = true);
//then
await expectLater(stream, emitsError(isException));
await expectLater(onErrorCalled, isTrue);
});
test('브로드캐스트 스트림에서 에러가 발생했을때, doOnError는 1번만 호출되어야한다', () async {
//given
var count = 0;
final subject = BehaviorSubject<int>(sync: true);
//when
final stream = subject.stream.doOnError(
(dynamic e, dynamic s) => count++,
);
stream.listen(null, onError: (dynamic e, dynamic s) {});
stream.listen(null, onError: (dynamic e, dynamic s) {});
subject.addError(Exception());
subject.addError(Exception());
//then
await expectLater(count, 2);
await subject.close();
});
doOnEach
스트림이 데이터를 내보내거나 error를 내거나 done를 내 보낸 경우 지정된 콜백 함수를 호출합니다.
콜백은 Notification객체를 수신합니다.
Notification(onData, onDone OnError)
🔻샘플 코드
test('Data, Error, Done알림이 있을때 doOnEach를 호출해야한다', () async {
//given
StackTrace stacktrace;
final actual = <Notification<int>>[];
final exception = Exception();
final a = Stream.value(1).concatWith([Stream<int>.error(exception)]);
//when
final stream = a.doOnEach((notification) {
actual.add(notification);
if (notification.isOnError) {
stacktrace = notification.stackTrace;
}
});
//then
await expectLater(
stream,
emitsInOrder(<dynamic>[1, emitsError(isException), emitsDone]),
);
await expectLater(actual, [
Notification.onData(1),
Notification<void>.onError(exception, stacktrace),
Notification<void>.onDone()
]);
});
test('브로드캐스트 스트림에서 doOnEach는 1번만 호출되어야한다', () async {
//given
var count = 0;
final controller = StreamController<int>.broadcast(sync: true);
final stream = controller.stream.transform(DoStreamTransformer(onEach: (_) {
count++;
}));
//when
stream.listen(null);
stream.listen(null);
controller.add(1);
controller.add(2);
//then
await expectLater(count, 2);
await controller.close();
});
doOnCancel
스트림 구독이 취소되면 지정된 콜백 함수를 호출합니다.
다른 rx구현에서는 종종 doOnUnsubscribe 또는 doOnDispose라고 합니다.
🔻샘플 코드
test('subscription을 취소했을때, doOnCancel이 호출되어야한다', () async {
//given
var onCancelCalled = false;
final stream = Stream.value(1);
//when
await stream.doOnCancel(() => onCancelCalled = true).listen(null).cancel();
//then
await expectLater(onCancelCalled, isTrue);
});
test('브로드캐스트 스트림에서 onCanceld은 1번만 호출되어야한다', () async {
// given
var count = 0;
final subject = BehaviorSubject<int>(sync: true);
// when
final stream = subject.doOnCancel(() => count++);
await stream.listen(null).cancel();
await stream.listen(null).cancel();
// then
await expectLater(count, 2);
await subject.close();
});
doOnPasue, doOnResume
doOnPause
스트림 구독이 일시 중지되면 지정된 콜백 함수를 호출합니다.
doOnResume
스트림 구독이 항목 수신을 재개할 때 지정된 콜백 함수를 호출합니다.
🔻샘플 코드
test('subscriptiond에서 호출시 onPause와 onResume이 호출되어야한다', () async {
//given
var onPauseCalled = false, onResumeCalled = false;
var a = Stream.value(1);
//when
final stream = a.doOnPause((_) {
onPauseCalled = true;
}).doOnResume(() {
onResumeCalled = true;
});
//then
stream.listen(null, onDone: expectAsync0(() {
expect(onPauseCalled, isTrue);
expect(onResumeCalled, isTrue);
}))
..pause()
..resume();
});
Subject
PublishSubject
PublishSubject는 구독 이후에 소스 Stream이 배출한 항목들만 옵서버에게 방출합니다.
BehaviorSubject
BehavoirSubject는 구독하기 시작하면,
소스 Stream이 가장 최근에 발행한 항목(또는 아직 아무 값도 발행되지 않았다면 맨 처음 값이나 기본 값)의 발행을 시작하며
그 이후 소스 Stream에 의해 발행된 항목들을 계속 발행합니다
ReplaySubject
ReplaySubject는 옵서버가 구독을 시작한 시점과 관계없이 소스 스트림을 처음부터 모두 방출합니다
전체 샘플 코드 보러 가기
https://github.com/qjatjr1108/rxdart_playground/blob/master/README.md
..
참고
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
'Rx' 카테고리의 다른 글
Debounce , Throttle (0) | 2019.09.10 |
---|---|
TestObserver ,TestScheduler 를 이용한 테스트작성하기 (0) | 2019.09.10 |
RxAndroid Scheduler 자세히 알아보기 (0) | 2019.09.09 |
RxJava 테스팅과 Flowable-5( Flowable을 활용한 배압 이슈 대응하기) (2) | 2019.01.01 |
RxJava 테스팅과 Flowable-4( Flowable 클래스) (0) | 2019.01.01 |
댓글