Rx

RxDart에 대하여 알아보기

봄석 2020. 8. 13. 01:38

RxDart에 대하여 알아보기

이번 포스트에서는 RxDart에 대하여 자세하게 알아보려 합니다

 

 

 

목차

 

Table of Contents

 

 

RxDart를 알아보기 전에 

RxDart는`dart:async`패키지의  Dart Streams를 대체하기 위해 자체 Observable 클래스를 제공하지 않습니다.
기본 제공되는 `dart:async`패키지의 Stream 및 StreamController에 확장 기능을 구현한 것입니다.

✏️기본 제공되는 훌륭한 Stream API를 먼저 확인해보는 것을 추천합니다!



 

표준 Rx와 RxDart(Stream API) 비교해보기

많은 상황에서 Streams와 Observable은 같은 방식으로 하지만

표준 Rx에 익숙하다면 RxDart(with Stream API)의 일부 기능이 놀라 울 수 있습니다!

 

아래 표로 표준 Rx와 RxDart(with Stream API)를 비교해보겠습니다👏🏻

 

  표준 Rx Stream
오류가 발생했을때 관잘 가능한 오류와 함께 종료 오류가 발생하고 스트림이 이어짐
Cold Observable 여러 구독자가 동일한 콜드 Observable을들을 수 있으며 각 구독은 고유 한 데이터 스트림을받습니다. 단일 구독만 가능
Hot Observable 사용 그렇다 BraodcastStream으로 사용
Publish, Behavior, ReplaySubject를
HotObservable로 사용하는지?
그렇다 그렇다
Sinlge / Maybe / Completable 사용 그렇다  모두 `Future`로 대체하여 사용
Back pressure(배압) 지원여부 그렇다 그렇다
null을 방출할수 있는지 여부 RxJava는 불가능
그외 Rx는 가능
그렇다
구독을 일시중지, 재개 할수 있는지 ? 아니다 그렇다

 

 

 

Factory(생성) 함수

여러 스트림을 결합하거나 병합하는 것과 같은 특정 기능으로 스트림을 생성하는 
Factory(생성) 함수에 대하여 알아보겠습니다!

 

 

CombineLatest

소스 스트림 중 하나가 항목을 방출할 때마다 결합하여
스트림을 하나의 스트림 시퀀스로 병합합니다.
모든 스트림이 하나 이상의 아이템을 방출할 때까지 스트림이 방출되지 않습니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기

 

 test('각 스트림에서 모든 값이 한개이상 방출되었을때, 가장 최근값들을 합쳐 방출해야한다', () async {
    // given
    var a = Stream.fromIterable(['a']),
        b = Stream.fromIterable(['b']),
        c = Stream.fromIterable(['C', 'D']);

    // when
    final stream = Rx.combineLatestList([a, b, c]);

    // then
    await expectLater(
        stream,
        emitsInOrder([
          ['a', 'b', 'C'],
          ['a', 'b', 'D'],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('각 스트림에서 모든 값이 한개이상 방출되었을때, 가장 최근값들의 `가장 마지막` 값을 합쳐 방출해야한다', () async {
    // given
    var a = Stream.fromIterable(['a']),
        b = Stream.fromIterable(['b']),
        c = Stream.fromIterable(['C', 'D']);

    // when
    final stream =
        Rx.combineLatest([a, b, c], (List<String> values) => values.last);//결합 조건 추가

    // then
    await expectLater(stream, emitsInOrder(['C', 'D', emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

 

Concat

이전 스트림 순서가 성공적으로 종료되는 한 지정된 모든 스트림 순서를 연결합니다.
각 스트림을 하나씩 구독하여 모든 항목을 방출하고 다음 스트림을 구독하기 전에 완료하여 이를 수행합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
test('0,1,2,3,4,5가 순서적으로 발행되야한다 emitsInOrder', () {
      // given
      var a = Stream.fromIterable([0, 1, 2]);
      var b = Stream.fromIterable([3, 4, 5]);

      // when
      final stream = Rx.concat([a, b]);

      // then
      expect(stream, emitsInOrder([0, 1, 2, 3, 4, 5]));
    }, timeout: Timeout(Duration(seconds: 5)));

 

 

ConcatEager

이전 스트림 순서가 성공적으로 종료되는 한 지정된 모든 스트림 순서를 연결합니다.
다음 스트림 이후에 하나의 스트림을 구독하지 않고 모든 스트림이 즉시 구독됩니다. 

그런 다음 이전 스트림이 항목 방출을 완료한 후 이벤트가 올바른 시간에 생성됩니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

🔻샘플 코드

더보기
  test('0,1,2,3,4,5가 순서적으로 발행되야한다 emitsInOrder', () {
      // given
      var a = Stream.fromIterable([0, 1, 2]);
      var b = Stream.fromIterable([3, 4, 5]);

      // when
      final stream = Rx.concatEager([a, b]);

      // then
      expect(stream, emitsInOrder([0, 1, 2, 3, 4, 5]));
    }, timeout: Timeout(Duration(seconds: 5)));

 

 

Defer

Defer는 Stream이 구독할 때까지 기다린 다음 지정된 팩토리 기능으로 스트림을 만듭니다.
경우에 따라 스트림을 생성하기 위해 마지막 순간까지 (즉 구독 시간까지) 대기하면이 스트림에 최신 데이터가 포함됩니다.
기본적으로 DeferStreams는 단일 구독입니다. 그러나 재사용할 수 있습니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
test('defer', () {
    // given
    var a = Stream.value(1);

    // when
    final stream = Rx.defer(() => a);

    // then
    stream.listen(expectAsync1(
      (value) {
        expect(value, 1);
      },
      count: 1,
    ));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('defer는 기본적으로 단일구독이므로, 여러번 구독햇을때 실패해야한다', () {
    // given
    var a = Stream.value(1);

    // when
    final stream = Rx.defer(() => a);

    // then
    try {
      stream.listen(null);
      stream.listen(null);
      expect(true, false);
    } catch (e) {
      // 'Bad state: Stream has already been listened to.'
      expect(e, isStateError);
    }
  }, timeout: Timeout(Duration(seconds: 5)));

  test('reusable이 true일때 defer는 재사용 가능해야한다', () {
    // given
    const value = 1;

    // when
    final stream = Rx.defer(
      () => Stream.fromFuture(
        Future.delayed(
          Duration(seconds: 1),
          () => value,
        ),
      ),
      reusable: true,
    );

    // then
    stream.listen(
      expectAsync1(
        (actual) => expect(actual, value),
        count: 1,
      ),
    );
    stream.listen(
      expectAsync1(
        (actual) => expect(actual, value),
        count: 1,
      ),
    );
  }, timeout: Timeout(Duration(seconds: 5)));
}

 

 

ForkJoin

이 연산자는 스트림 그룹이 있고 "각각의 최종 방출 값"에만 관심이 있는 경우에 가장 적합합니다. 
이에 대한 한 가지 일반적인 사용 사례는 페이지 로드 (또는 다른 이벤트)에서 여러 요청을 발행하고,
 모두에 대한 응답이 수신된 경우에만 조치를 수행하려는 경우입니다.
 
forkJoin 오류에 공급된 내부 스트림 중 하나라도 내부 스트림에서 오류를 올바르게 포착하지 않으면,
 이미 완료되었거나 완료된 다른 스트림의 값을 잃게 됩니다.
  
모든 내부 스트림 만 성공적으로 완료되는 데 관심이 있는 경우 외부에서 오류를 잡을 수 있습니다. 
또한 하나 이상의 항목을 방출하는 스트림이 있고 이전 배출 포크 우려가 우려되는 경우 올바른 선택이 아닙니다.
  
이 경우 combineLatest 또는 zip과 같은 연산자를 사용하는 것이 좋습니다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
 test('각 스트림의 가장 최근의 값을 합쳐 List로 방출해야한다', () async {
    // given
    var a = Stream.fromIterable(['a']),
        b = Stream.fromIterable(['b']),
        c = Stream.fromIterable(['C', 'D']);

    // when
    final stream = Rx.forkJoinList([a, b, c]);

    // then
    await expectLater(
      stream,
      emitsInOrder([
        ['a', 'b', 'D'],
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('각 스트림의 가장 최근의 값을 합쳐 List로 방출해야한다, 결합 조건 추가', () async {
    // given
    var a = Stream.fromIterable(['a']),
        b = Stream.fromIterable(['b']),
        c = Stream.fromIterable(['C', 'D']);

    // when
    final stream = Rx.forkJoin([a, b, c], (List<String> values) => values.last);

    // then
    await expectLater(stream, emitsInOrder(['D', emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('ForkJoin N개의(특정수의) 스트림 존재', () async {
    // given
    var a = Stream.fromIterable(['1']), b = Stream.fromIterable(['2', '3']);

    // when
    final stream = Rx.forkJoin2(a, b, (a, b) => a + b);

    // then
    await expectLater(stream, emitsInOrder(['13', emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('ForkJoin 스트림중 에러가 포함되어있으면 결합하지 않아야한다', () async {
    // given
    var a = Stream.value(1),
        b = Stream.value(1),
        c = Stream<int>.error(Exception());

    // when
    final stream = Rx.forkJoin3(a, b, c, (a, b, c) => a + b + c);

    // then
    await expectLater(stream, emitsError(TypeMatcher<Exception>()));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Merge

지정된 항목에서 방출된 항목을 streams단일 스트림 리스트로 병합합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
 test('각스트림에서 방출된 값을 리스트로 방출해야한다', () async {
    // given
    var a = Stream.periodic(const Duration(seconds: 1), (count) => count)
            .take(3),
        b = Stream.fromIterable(const [1, 2, 3, 4]);

    // when
    final stream = Rx.merge([a, b]);

    // then
    await expectLater(stream, emitsInOrder([1, 2, 3, 4, 0, 1, 2]));
  }, timeout: Timeout(Duration(seconds: 5)));
  

  test('merge중 에러 발생시, 에러발생하기 전 값들까지만 합쳐서 리스트로 반환하고 에러를 방출한다', () async {
    // given
    var a = Stream.periodic(const Duration(seconds: 1), (count) => count)
            .take(3),
        b = Stream.fromIterable(const [1, 2, 3, 4]),
        c = Stream<int>.error(Exception());

    // when
    final streamWithError = Rx.merge([a, b, c]);

    //then
    streamWithError.listen(
      null,
      onError: expectAsync2((e, s) => expect(e, isException)),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Never

무한 지속 시간을 나타내는 데 사용할 수 있는 종료되지 않는 스트림 시퀀스를 반환합니다.
never 연산자는 매우 구체적이고 제한된 동작을 가진 연산자입니다. 
이는 테스트 목적으로 유용하며 

때로는 다른 스트림과 함께

또는 다른 스트림을 매개 변수로 기대하는 스트림에 매개 변수로 결합하는 데 유용합니다.

 

🔻샘플 코드

더보기
 test('어떤 에러나 데이터등을 리턴하지않아야한다', () async {
    // given
    var onDataCalled = false, onDoneCalled = false, onErrorCalled = false;

    // when
    final stream = Rx.never<Null>();

    final subscription = stream.listen(
        expectAsync1((_) {
          onDataCalled = true;
        }, count: 0),
        onError: expectAsync2((Exception e, StackTrace s) {
          onErrorCalled = false;
        }, count: 0),
        onDone: expectAsync0(() {
          onDataCalled = true;
        }, count: 0));

    await Future<Null>.delayed(Duration(milliseconds: 10));

    await subscription.cancel();

    // 어떤 에러나 데이터등을 리턴하는 콜백함수가 모두 호출되지않아 초기상태 모두 false임
    await expectLater(onDataCalled, isFalse);
    await expectLater(onDoneCalled, isFalse);
    await expectLater(onErrorCalled, isFalse);
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Race

두 개 이상의 source가 주어지면 streams처음 항목에서만 모든 항목 streams을 내보내 항목이나 알림을 내 보냅니다.

 

https://rxmarbles.com/#race

 

🔻샘플 코드

더보기
test('race', () {
    // given
    var a = Rx.timer(1, Duration(seconds: 3)),
        b = Rx.timer(2, Duration(seconds: 2)),
        c = Rx.timer(3, Duration(seconds: 1));

    // when
    final stream = Rx.race([a, b, c]);

    // then
    stream.listen(expectAsync1(
      (value) => expect(value, 3),
      count: 1,
    ));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('race 수행중 에러발생', () async {
    // given
    var a = Rx.timer(1, Duration(seconds: 1)),
        b = Stream<Null>.error(Exception('oh noes!'));

    // when
    final stream = Rx.race([a, b]);

    // then
    stream.listen(
      null,
      onError: expectAsync2((e, s) => expect(e, isException)),
    );
  });

 

 

Range

지정된 범위 내에서 일련의 정수를 방출하는 Stream을 반환합니다.

 

http://reactivex.io/documentation/operators/range.html

🔻샘플 코드

더보기
  test('1-3범위 안의 값을 방출해야 한다', () async {
    // given

    // when
    final stream = Rx.range(1, 3);

    // then
    await expectLater(stream, emitsInOrder([1, 2, 3, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 
  test('start와 end가 같으면 1개의 항목만 방출해야한다', () async {
    // given

    // when
    final stream = Rx.range(1, 1);

    // then
    stream.listen(expectAsync1((actual) {
      expect(actual, 1);
    }, count: 1));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('역순으로 배출해야한다', () async {
    // given

    // when
    final stream = Rx.range(3, 1);

    // then
    await expectLater(stream, emitsInOrder([3, 2, 1, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

Reapeat

Stream 이 성공적으로 종료될 때까지 지정된 횟수만큼 소스 스트림을 재생성하고 다시 수신할 Stream을 만듭니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
Stream<String> Function(int) _getRepeatStream(String symbol) =>
      (int repeatIndex) async* {
        yield await Future.delayed(
          const Duration(milliseconds: 20),
          () => '$symbol$repeatIndex',
        );
      };

  Stream<String> Function(int) _getErroneusRepeatStream(String symbol) =>
      (int repeatIndex) {
        return Stream.value('A0')
            // Emit the error
            .concatWith([Stream<String>.error(Error())]);
      };




  test('repeat count 3일때 3번 반복 해야한다', () async {
    // given
    const repeatCount = 3;
    var a = _getRepeatStream('A');

    // when
    final stream = Rx.repeat(a, repeatCount);

    // then
    await expectLater(
      stream,
      emitsInOrder(<dynamic>['A0', 'A1', 'A2', emitsDone]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));


 test('반복 도중 에러가 발생하여도 지정된 횟수를 반복해야한다', () async {
    // given
    const repeatCount = 2;
    var a = _getErroneusRepeatStream('A');

    // when
    final streamWithError = Rx.repeat(a, repeatCount);

    // then
    await expectLater(
        streamWithError,
        emitsInOrder(<dynamic>[
          'A0',
          emitsError(TypeMatcher<Error>()),
          'A0',
          emitsError(TypeMatcher<Error>()),
          emitsDone
        ]));
  });

 

 

Retry

Stream이 성공적으로 종료될 때까지 지정된 횟수만큼 소스 스트림을 재생성하고 다시 수신할 Stream을 만듭니다.
재시도 횟수를 지정하지 않으면 무기한 재 시도합니다. 재시도 횟수에 도달했지만 스트림이 성공적으로 종료되지 않은 경우 RetryError 가 발생합니다.
RetryError에는 오류를 일으킨 모든 오류 및 StackTrace가 포함됩니다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
 Stream<int> Function() _getRetryStream(int failCount) {
    var count = 0;

    return () {
      if (count < failCount) {
        count++;
        return Stream<int>.error(Error(), StackTrace.fromString('S'));
      } else {
        return Stream.value(1);
      }
    };
  }

 
 test('3번 재시도 해야한다', () async {
    // given
    const retries = 3;
    var a = _getRetryStream(retries);

    // when
    final stream = Rx.retry(a, retries);

    await expectLater(
      stream,
      emitsInOrder(<dynamic>[1, emitsDone]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

  test('무한 재시도해야한다', () async {
    // given
    const retries = 1000;
    var a = _getRetryStream(retries);

    // when
    final stream = Rx.retry(a);

    await expectLater(
      stream,
      emitsInOrder(<dynamic>[1, emitsDone]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));


 

RetryWhen

에러가 발생하였을 때 소스 스트림을 재생성하고 다시들을 스트림을 만듭니다. 

소스 스트림에 오류가 발생하거나 완료되면 스트림이 종료됩니다.
retryWhenFactory오류를 방출하는 RetryError가 발생합니다. 

RetryError는 실패를 일으킨 모든 Error 및 StackTrace를 포함합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드

더보기
Stream<int> Function() _sourceStream(int i, [int throwAt]) {
  return throwAt == null
      ? () => Stream.fromIterable(range(i))
      : () =>
          Stream.fromIterable(range(i)).map((i) => i == throwAt ? throw i : i);
}

Stream<int> _alwaysThrow(dynamic e, StackTrace s) =>
    Stream<int>.error(Error(), StackTrace.fromString('S'));

Stream<void> _neverThrow(dynamic e, StackTrace s) => Stream.value('');

Iterable<int> range(int startOrStop, [int stop, int step]) sync* {
  final start = stop == null ? 0 : startOrStop;
  stop ??= startOrStop;
  step ??= 1;

  if (step == 0) throw ArgumentError('step cannot be 0');
  if (step > 0 && stop < start) {
    throw ArgumentError('if step is positive,'
        ' stop must be greater than start');
  }
  if (step < 0 && stop > start) {
    throw ArgumentError('if step is negative,'
        ' stop must be less than start');
  }

  for (var value = start;
      step < 0 ? value > stop : value < stop;
      value += step) {
    yield value;
  }
}

main() {
  test('retryWhen 에러가 발생하지 않았을때', () {
    // givne
    var a = _sourceStream(3);
    var whenFactory = _alwaysThrow;

    // when
    var stream = Rx.retryWhen(a, whenFactory);

    //then
    expect(
      stream,
      emitsInOrder(<dynamic>[0, 1, 2, emitsDone]),
    );
  });

  test('retryWhen Stream 에러가 발생하지 않았을때', () {
    // givne
    var a = _sourceStream(3);
    var whenFactory = _alwaysThrow;

    // when
    var stream = Rx.retryWhen(a, whenFactory);

    //then
    expect(
      stream,
      emitsInOrder(<dynamic>[0, 1, 2, emitsDone]),
    );
  });

  test('retryWhen 에러발생시에 whenFactory에서 다시 스트림으로 변환 하여, 무한으로 재시도 해야한다', () {
    // given
    var a = _sourceStream(1000, 2);
    var whenFactory = _neverThrow;

    // when
    final stream = Rx.retryWhen(a, whenFactory).take(6);

    // then
    expect(
      stream,
      emitsInOrder(<dynamic>[0, 1, 0, 1, 0, 1, emitsDone]),
    );
  });
  test('retryWhen 에러발생시에 whenFactory에서도 에러가 발생하면 RetryError를 반환해야한다', () {
    // given
    var a = _sourceStream(3, 0);
    var whenFactory = _alwaysThrow;

    // when

    final streamWithError = Rx.retryWhen(a, whenFactory);

    // then
    expect(
      streamWithError,
      emitsInOrder(<dynamic>[emitsError(TypeMatcher<RetryError>()), emitsDone]),
    );
  });

  test('retryWhen 에러발생시에 whenFactory에서도 에러가 발생하면 RetryError를 반환해야한다', () async {
    // given
    var a = _sourceStream(3, 0);
    var whenFactory = _alwaysThrow;

    // when
    final streamWithError = Rx.retryWhen(a, whenFactory);

    // then
    await expectLater(
        streamWithError,
        emitsInOrder(<dynamic>[
          emitsError(
            predicate<RetryError>((a) {
              return a.errors.length == 1 &&
                  a.errors
                      .every((es) => es.error != null && es.stackTrace != null);
            }),
          ),
          emitsDone,
        ]));
  });

 

 

SequenceEqual

두 스트림이 동일한 순서의 항목을 방출하는지 확인합니다. 등식을 결정하기 위해 선택적인 등호 처리기를 제공할 수 있습니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드 

더보기
test('SequenceEqual 두 스트림이 같아야한다', () {
      // given
      var a = Stream.fromIterable([0, 1, 2, 3, 4]);
      var b = Stream.fromIterable([0, 1, 2, 3, 4]);

      // when
      final stream = Rx.sequenceEqual(a, b);

      // then
      expect(stream, emitsInOrder([true]));
    }, timeout: Timeout(Duration(seconds: 5)));

    test('시간차가 있어도 두스트림이 같은지 비교해야한다', () {
      // given
      var a = Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1)
          .take(5);
      var b = Stream.fromIterable(const [1, 2, 3, 4, 5]);

      // when
      final stream = Rx.sequenceEqual(a, b);

      // then
      expect(stream, emitsInOrder([true]));
    }, timeout: Timeout(Duration(seconds: 5)));

    test('비교 조건을 커스텀하여 항상 true일때 비교값은 true이다', () {
      // given
      var a = Stream.fromIterable(const [1, 1, 1, 1, 1]);
      var b = Stream.fromIterable(const [2, 2, 2, 2, 2]);

      // when
      final stream = Rx.sequenceEqual(a, b, equals: (int a, int b) => true);

      // then
      expect(stream, emitsInOrder([true]));
    }, timeout: Timeout(Duration(seconds: 5)));
  });

  test('비교하여 같지않아야한다', () async {
    // given
    var a = Stream.fromIterable(const [1, 1, 1, 1, 1]);
    var b = Stream.fromIterable(const [2, 2, 2, 2, 2]);

    // when
    final stream = Rx.sequenceEqual(a, b);

    // then
    expect(stream, emitsInOrder([false]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('에러가 같은지 비교해야한다', () async {
    // given
    var a = Stream<void>.error(ArgumentError('error A'));
    var b = Stream<void>.error(ArgumentError('error A'));

    // when
    final stream = Rx.sequenceEqual(a, b);

    // then
    expect(stream, emitsInOrder([true]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('에러가 같은지 비교하여 달라야한다', () async {
    // given
    var a = Stream<void>.error(ArgumentError('error A'));
    var b = Stream<void>.error(ArgumentError('error B'));

    // when
    final stream = Rx.sequenceEqual(a, b);

    // then
    expect(stream, emitsInOrder([false]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

SwitchLatest

상위 스트림에서 가장 최근 방출된 스트림의 항목만을 방출하는 용도
이 스트림은 소스 스트림에서 새 스트림이 방출될 때 이전에 방출된 스트림에서 구독을 취소합니다.

 

상위 스트림 ex) Stream.value(Stream.value())

 

🔻샘플 코드 

더보기
 Stream<Stream<String>> get testStream => Stream.fromIterable([
      Rx.timer('A', Duration(seconds: 2)),
      Rx.timer('B', Duration(seconds: 1)),
      Stream.value('C'),
    ]);
    
 test('상위 스트림의에서 리스트데이터를 방출', () {
      // given
      var a = Stream.value(Stream.fromIterable(const ['A', 'B', 'C']));

      // when
      final stream = Rx.switchLatest(a);

      // then
      expect(
        stream,
        emitsInOrder(<dynamic>['A', 'B', 'C', emitsDone]),
      );
    }, timeout: Timeout(Duration(seconds: 5)));

    test('상위스트림의 가장 최근값이 먼저 방출되어야한다', () {
      // given

      // when
      final stream = Rx.switchLatest(testStream);

      // then
      expect(
        stream,
        emits('C'),
      );
    }, timeout: Timeout(Duration(seconds: 5)));

    test('상위 스트림에서 방출된 오류를 방출해야한다', () {
      // given
      var a = Stream<Stream<void>>.error(Exception());

      // when
      final stream = Rx.switchLatest(a);

      // then
      expect(
        stream,
        emitsError(isException),
      );
    }, timeout: Timeout(Duration(seconds: 5)));

 

Timer

지정된 시간이 지나면 주어진 값을 방출합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드 

더보기
 test('타이머', () async {
    // given
    const value = 1;

    // when
    final stream = Rx.timer(value, Duration(milliseconds: 1));

    // then
    await expectLater(stream, emitsInOrder(<dynamic>[value, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

Zip

각각 스트림이 ​ 최소 1개씩 값을 방출할 때마다 
지정된 지퍼 함수를 ​​사용하여 지정된 스트림을 하나의 스트림 시퀀스로 병합합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드 

더보기
test('Zip', () async {
    // given
    var a = Stream.fromIterable(['A']),
        b = Stream.fromIterable(['B']),
        c = Stream.fromIterable(['C', 'D']);

    // when
    final stream = Rx.zip([a, b, c], (values) => values);

    // then
    await expectLater(
        stream,
        emitsInOrder([
          ['A', 'B', 'C'],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('Zip N개의(특정수의) 스트림 존재', () async {
    // given
    var a = Stream.fromIterable(['1']), b = Stream.fromIterable(['2', '3']);

    // when
    final stream = Rx.zip2(a, b, (a, b) => a + b);

    // then
    await expectLater(stream, emitsInOrder(['12', emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('Zip 스트림중 에러가 포함되어있으면 에러를 방출한다', () async {
    // given
    var a = Stream.value(1),
        b = Stream.value(1),
        c = Stream<int>.error(Exception());

    // when
    final stream = Rx.zip3(a, b, c, (a, b, c) => a + b + c);

    // then
    await expectLater(stream, emitsError(TypeMatcher<Exception>()));
  }, timeout: Timeout(Duration(seconds: 5)));

 

Transformer(변형) 배압 함수

 

Window

소스 Stream에서 수집 한 항목의 `창`을 내보내는 Stream을 반환합니다.
출력 Stream은 겹치지 않는 연결된 `창`을 내 보냅니다.
Stream 항목을 내보낼 때마다 현재 `창`을 내보내고 새 `창`을 엽니다.
각 `윈도(창)`는 Stream이므로 출력은 상위 Stream입니다.( Stream <Stream>())

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// time    0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms
// source |       0                 1                 2                 3                 4        
// window |       0                              1                         2                                     3

 

🔻샘플 코드 

더보기
test('160ms마다 새 창을 만들어야한다', () async {
    // given
    var a = getStream(4);

    // when
    Stream<List<int>> result = a
        .window(
            Stream<Null>.periodic(const Duration(milliseconds: 160)).take(3))
        .asyncMap((stream) => stream.toList());

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        const [0, 1],
        const [2, 3],
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

WindowCount

소스 Stream에서 여러 값을 count, 버퍼링 한 다음 `창` a로 내보내고 
Stream은 각 startBufferEvery값마다 새 `창`을 시작합니다. 
경우에는 startBufferEvery 제공하지 않으면 
새로운 count개수 때마다 창이 닫히고 방출됩니다

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드  

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

  test('2개를 카운트하여 새 창을 열어야한다', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result =
        a.windowCount(2).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [2, 3],
          const [4],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('2개를 카운트하여 새 창을 열고, startBufferEvery값부터 다시 창을 열기 시작해야한다 ', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result =
        a.windowCount(2, 1).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [1, 2],
          const [2, 3],
          const [3, 4],
          const [4],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('windowCount count3, startBufferEvery2', () async {
    // given
    var a = Rx.range(0, 8);

    // when
    Stream<List<int>> result =
        a.windowCount(3, 2).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1, 2],
          const [2, 3, 4],
          const [4, 5, 6],
          const [6, 7, 8],
          const [8],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('windowCount count3, startBufferEvery4', () async {
    // given
    var a = Rx.range(0, 8);

    // when
    Stream<List<int>> result =
        a.windowCount(3, 4).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1, 2],
          const [4, 5, 6],
          const [8],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

WindowTest

 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 작성하고 테스트(조건)를 통과될 때마다 일괄 처리합니다. 

🔻샘플 코드  

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}


test('지정한 조건마다 창을열어야한다', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result =
        a.windowTest((i) => i % 2 == 0).asyncMap((stream) => stream.toList());

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        const [0],
        const [1, 2],
        const [3, 4],
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('windowTest Transformer함수 사용', () async {
    // given
    var a = Rx.range(0, 4);
    final transformer = WindowTestStreamTransformer<int>((i) => i % 2 == 0);

    // when
    Stream<List<int>> result =
        a.transform(transformer).asyncMap((stream) => stream.toList());

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        const [0],
        const [1, 2],
        const [3, 4],
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

 

WindowTime

각 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 생성하고
주어진 시간마다 샘플링하여 창을 내보냅니다.

// time    0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms
// source |       0                 1                 2                 3                 4
// window |       0                              1                         2                                     3

 

🔻샘플 코드  

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

test('160ms마다 창을 새로 열어야한다', () async {
    // given
    var a = getStream(4);

    // when
    Stream<List<int>> result = a
        .windowTime(Duration(milliseconds: 160))
        .asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [2, 3],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Buffer

각 항목을 Buffer에 쌓아 List로 방출하는 스트림을 만들어 window이벤트를 방출합니다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드  

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

test('160ms동안 버퍼에 쌓아 List로 방출해야한다', () async {
    // given
    var a = getStream(4);

    // when
    final result = a.buffer(
        Stream<Null>.periodic(const Duration(milliseconds: 160)).take(3));

    // then

    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [2, 3],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

BufferCount

소스 스트림에서 count만큼 버퍼링 한 다음 버퍼를 내보내고 지운 후
Stream은 각 startBufferEvery값마다 새 버퍼를 시작합니다.
startBufferEvery 제공하지 경우에는, 새로운 버퍼는 소스의 개시 때마다 버퍼가 닫히고 즉시 방출됩니다. 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

 test('2개 마다 buffer에 쌓아 내보내야한다', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result =
        a.bufferCount(2).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [2, 3],
          const [4],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('2개씩 buffer에 쌓아 내보낸뒤, startBufferEvery값부터 새 버퍼링을 시작해야한다', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result =
        a.bufferCount(2, 1).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [1, 2],
          const [2, 3],
          const [3, 4],
          const [4],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('bufferCount count3, startBufferEvery2', () async {
    // given
    var a = Rx.range(0, 8);

    // when
    Stream<List<int>> result =
        a.bufferCount(3, 2).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1, 2],
          const [2, 3, 4],
          const [4, 5, 6],
          const [6, 7, 8],
          const [8],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('bufferCount count3, startBufferEvery4', () async {
    // given
    var a = Rx.range(0, 8);

    // when
    Stream<List<int>> result =
        a.bufferCount(3, 4).asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1, 2],
          const [4, 5, 6],
          const [8],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

BufferTest

각 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 작성하고 테스트(조건)를 통과될 때마다 일괄 처리합니다.

🔻샘플 코드   

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

test('테스트 조건을 통과할때까지 buffer에 쌓고, 조건을 통과하면 방출해야한다', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result =
        a.bufferTest((i) => i % 2 == 0).asyncMap((stream) => stream.toList());

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        const [0],
        const [1, 2],
        const [3, 4],
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('BufferTest Transformer사용', () async {
    // given
    var a = Rx.range(0, 4);
    final transformer = BufferTestStreamTransformer<int>((i) => i % 2 == 0);

    // when
    Stream<List<int>> result =
        a.transform(transformer).asyncMap((stream) => stream.toList());

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        const [0],
        const [1, 2],
        const [3, 4],
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

BufferTime

각 항목이 Stream소스 시퀀스의 항목을 포함하는 스트림을 생성하고
주어진 시간마다 샘플링하여 창을 내보냅니다.

// time    0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms
// source |       0                 1                 2                 3                 4
// window |       0                              1                         2                                     3

 

🔻샘플 코드   

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}

  test('지정된 시간(160ms)동안 버퍼에 쌓고 방출해야한다', () async {
    // given
    var a = getStream(4);

    // when
    Stream<List<int>> result = a
        .bufferTime(Duration(milliseconds: 160))
        .asyncMap((stream) => stream.toList());

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [2, 3],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

Debounce

Stream소스 시퀀스가 다른 항목을 방출하지 않고 완료된 경우에만 소스 시퀀스에서 항목을 방출하도록을 변환합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// time      0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms    650ms    700ms    750ms    800ms    850ms    900ms
// source   |       1                 2                 3                 4                                                                                             
// debounce |       ------------------------------------> ----------------------------------> ----------------------------------> ---------------------------------> 4

 

🔻샘플 코드   

더보기
Stream<int> getStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 100), () => controller.add(1));
  Timer(const Duration(milliseconds: 200), () => controller.add(2));
  Timer(const Duration(milliseconds: 300), () => controller.add(3));
  Timer(const Duration(milliseconds: 400), () {
    controller.add(4);
    controller.close();
  });

  return controller.stream;
}

 test('200ms동안 값이 방출되지 않았을때, 값을 방출한다 ', () async {
    // given
    var a = getStream();

    // when
    final result = a.debounce((_) => Stream<void>.fromFuture(
        Future<void>.delayed(const Duration(milliseconds: 200))));

    // then
    await expectLater(result, emitsInOrder(<dynamic>[4, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

DebounceTime

DebounceTimeStream소스 시퀀스가 지정한 시간 동안 
다른 항목을 방출하지 않고 완료된 경우에만 소스 시퀀스에서 항목을 방출하도록을 변환합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// time      0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms    650ms    700ms    750ms    800ms    850ms    900ms
// source   |       1                 2                 3                 4                                                                                             
// debounce |       ------------------------------------> ----------------------------------> ----------------------------------> ---------------------------------> 4

 

🔻샘플 코드   

더보기
Stream<int> getStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 100), () => controller.add(1));
  Timer(const Duration(milliseconds: 200), () => controller.add(2));
  Timer(const Duration(milliseconds: 300), () => controller.add(3));
  Timer(const Duration(milliseconds: 400), () {
    controller.add(4);
    controller.close();
  });

  return controller.stream;
}


  test('200ms동안 값이 방출되지 않았을때, 값을 방출한다 ', () async {
    // given
    var a = getStream();

    // when
    final result = a.debounceTime(Duration(milliseconds: 200));

    // then
    await expectLater(result, emitsInOrder(<dynamic>[4, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Sample

 sampleStream에서 방출된 Stream이후 소스에서 방출된 가장 최근에 방출된 항목 (있는 경우)을 방출합니다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// sample
// time          0ms    20ms    40ms    60ms    80ms    100ms    120ms    140ms
// source       |       0       1       2       3       4
// sampleStream |                    1               3                 4
// s_event      |                    O               O                 o

 

🔻샘플 코드   

더보기
Stream<int> getStream() =>
    Stream<int>.periodic(const Duration(milliseconds: 20), (count) => count)
        .take(5);

Stream<int> getSampleStream() =>
    Stream<int>.periodic(const Duration(milliseconds: 35), (count) => count)
        .take(10);
        
        
 test('샘플 스트림이 방출될때마다 소스스트림에서 가장 최근에 방출한 값을 방출한다', () async {
    // given
    var a = getStream();

    // when
    final result = a.sample(getSampleStream());

    // then

    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          1,
          3,
          4,
          emitsDone,
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

SampleTime

Stream반복 방출 시간 범위 내에서 이전 방출 이후 소스에서 방출된 가장 최근에 방출된 값이 있는 경우 방출합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// sample
// time          0ms    20ms    40ms    60ms    80ms    100ms    120ms    140ms
// source       |       0       1       2       3       4
// sampleStream |                    1               3                 4
// s_event      |                    O               O                 o

 

🔻샘플 코드   

더보기
Stream<int> getStream() =>
    Stream<int>.periodic(const Duration(milliseconds: 20), (count) => count)
        .take(5);


  test('SampleTime', () async {
    // given

    // when
    final stream = getStream().sampleTime(const Duration(milliseconds: 35));

    // then
    await expectLater(stream, emitsInOrder(<dynamic>[1, 3, 4, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Throttle (ThrottleFirst)

 열려있는 Stream동안 소스에서 방출 한 첫 번째 항목만 방출합니다
 trailing 값을 사용하여 다음 처음 스로틀을 시작할 시간을 결정할 수 있습니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// throttle
// time      0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms    650ms    700ms
// source   |       1                 2                 3                 4                 5                 6                 7
// throttle |       1                                                     4                                                     7
// t_event  |       O                                            O                                            O

// throttle trailing=true
// time      0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms    650ms    700ms        
// source   |       1                 2                 3                 4                 5                 6                              
// throttle |                                           3                                                     6                                      
// t_event  |                                  O                                            O                                                

 

🔻샘플 코드   

더보기
Stream<int> getStream() =>
    Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1).take(10);
    
    
 test('throttle스트림이 값을 방출할때마다 소스스트림의 첫 번째 항목을 방출해야한다', () async {
    // given
    var a = getStream();

    // when
    Stream<int> result = a
        .throttle(
            (_) => Stream<void>.periodic(const Duration(milliseconds: 250)))
        .take(3);

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        1,
        4,
        7,
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('trailing=true이면 throttle을 시간만큼 시작시간이 지연되서 시작되야한다', () async {
    // given
    var a = getStream();

    // when
    Stream<int> result = a
        .throttle(
          (_) => Stream<void>.periodic(const Duration(milliseconds: 250)),
          trailing: true,
        )
        .take(3);

    // then
    expectLater(
      result,
      emitsInOrder(<dynamic>[
        3,
        6,
        9,
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

ThrottleTime

Stream의 시간 범위 내에 소스에서 방출 한 첫 번째 항목만 표시합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
// throttle
// time      0ms    100ms    150ms    200ms    250ms    300ms    350ms    400ms    450ms    500ms    550ms    600ms    650ms    700ms
// source   |       1                 2                 3                 4                 5                 6                 7
// throttle |       1                                                     4                                                     7
// t_event  |       O                                            O                                            O

 

🔻샘플 코드   

더보기
Stream<int> getStream() =>
    Stream.periodic(const Duration(milliseconds: 100), (i) => i + 1).take(10);
    
    
 test('throttleTime', () async {
    // given
    var a = getStream();

    // when
    Stream<int> result = a.throttleTime(Duration(milliseconds: 250)).take(3);

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          1,
          4,
          7,
          emitsDone,
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

Transformer(변형) 함수

 

 

ConcatWith

현재 Stream에서 모든 항목을 내 보낸 다음 지정된 스트림의 모든 항목을 차례로 내보내는 Stream을 반환합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

 

🔻샘플 코드   

더보기
test('소스스트림을 내보낸뒤 지정된 스트림을 순서대로 내보내야한다', () async {
    // given
    final delayedStream = Rx.timer(1, Duration(milliseconds: 10));
    final immediateStream = Stream.value(2);

    // when
    final result = delayedStream.concatWith([immediateStream]);

    // then
    await expectLater(result, emitsInOrder(<dynamic>[1, 2, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

StartWith

소스 스트림이 값을 방출할 때, 방출 값 앞에 값을 추가합니다 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
  Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
  test('소스스트림이 값을 방출할때, 앞에 0을 추가해아한다 ', () async {
    // given
    final a = _getStream();

    // when
    final result = a.startWith(0);

    // then
    await expectLater(result, emitsInOrder([0, 1, 2, 3, 4]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

StartWithMany

소스 스트림이 값을 방출할 때, 방출 값 앞에 List 값을 추가합니다 

🔻샘플 코드   

더보기
Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);

  test('소스스트림이 값을 방출할때, 앞에 -1,0을 추가해아한다', () async {
    // given
    final a = _getStream();

    // when
    final result = a.startWithMany([-1, 0]);

    // then
    await expectLater(result, emitsInOrder([-1, 0, 1, 2, 3, 4]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

EndWith

소스 스트림이 값을 방출한 후, 방출한 값 뒤에 값을 추가합니다 

🔻샘플 코드   

더보기
 Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
 
  test('소스트림이 값을 방출한후 ,5을 추가해야한다', () async {
    // given
    final a = _getStream();

    // when
    final result = a.endWith(5);

    // then
    await expectLater(result, emitsInOrder([1, 2, 3, 4, 5]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

EndWithMany

소스 Stream이 닫기 전에 값 List를 최종 이벤트로 소스에 추가합니다.

🔻샘플 코드   

더보기
 Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3, 4]);
 
  test('소스스트림이 값을 방출한 후 5,6을 방출해야한다', () async {
    // given
    final a = _getStream();

    // when
    final result = a.endWithMany([5, 6]);

    // then
    await expectLater(result, emitsInOrder([1, 2, 3, 4, 5, 6]));
  }, timeout: Timeout(Duration(seconds: 5)));
}

 

 

ZipWith

주어진 지퍼 함수를 ​​사용하여 현재 스트림을 다른 스트림과 결합하는 Stream을 반환합니다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
  test('ZipWith', () async {
    // given
    final a = Stream<int>.value(1);

    // when
    final result =
        a.zipWith(Stream<int>.value(2), (int one, int two) => one + two);

    // then
    await expectLater(result, emitsInOrder([3]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

MergeWith

여러 스트림에서 내 보낸 항목을 단일 항목 스트림으로 결합합니다. 항목은 소스에서 방출되는 순서대로 방출됩니다.

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

 

🔻샘플코드   

더보기
test('mergeWith', () async {
    //given
    final delayedStream = Rx.timer(1, Duration(milliseconds: 10));
    final immediateStream = Stream.value(2);

    //when
    final result = delayedStream.mergeWith([immediateStream]);

    //then
    await expectLater(result, emitsInOrder([2, 1]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

DefaultIfEmpty

 소스 스트림에서 아무것도 내 보내지 않는 경우 단일 기본 항목을 내 보냅니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플코드   

더보기
  test('소스스트림이 값을 방출하지 않았을때, default값(true_를 내보내야한다', () async {
    // given
    var a = Stream<bool>.empty();

    // when
    final result = a.defaultIfEmpty(true);

    // then
    await expectLater(result, emitsInOrder(<dynamic>[true, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('소스스트림이 값을 방출했을때, default값을 내보내지않아야한다', () async {
    // given
    var a = Stream.fromIterable(const [false, false, false]);

    // when
    final result = a.defaultIfEmpty(true);

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[false, false, false, emitsDone]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

SwitchIfEmpty

원래 Stream이 항목을 내 보내지 않으면이 연산자는 지정된 대체 스트림을 구독하고 대신 해당 Stream에서 항목을 내 보냅니다.
이는 여러 소스의 데이터를 사용할 때 특히 유용할 수 있습니다. 
  
예를 들어, 레포지토리 패턴을 사용할 때. 로드해야 할 데이터가 있다고 가정하면 
가장 빠른 액세스 포인트로 시작하고 가장 느린 포인트로 계속 돌아가는 것이 좋습니다. 
  
예를 들어 먼저 메모리 내 데이터베이스를 쿼리 한 다음 파일 시스템의 데이터베이스를 쿼리 한 다음 
데이터가 로컬 시스템에 없으면 네트워크 호출을 쿼리 합니다.
 
이것은 switchIfEmpty로 아주 간단하게 달성할 수 있습니다

 

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html


 

ex)
Stream<Data> memory;
Stream<Data> disk;
Stream<Data> network;

 
Stream<Data> getThatData =
   memory.switchIfEmpty(disk).switchIfEmpty(network);

 

🔻샘플 코드   

더보기
test('switchIfEmpty is Empty', () async {
    // given
    final a = Stream<int>.empty();

    // when
    final result = a.switchIfEmpty(Stream.value(1));

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[1, emitsDone]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));
  test('switchIfEmpty is Not Empty', () async {
    // given
    final a = Stream.value(99);

    // when
    final result = a.switchIfEmpty(Stream.value(1));

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[99, emitsDone]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

 

Distinct

현재 스트림에서 중복을 제거합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플코드   

더보기
  test('distinct', () async {
    // given
    final a = Stream.fromIterable(const [1, 1]);

    // when
    final result = a.distinct();

    // then
    await expectLater(result, emitsInOrder(<dynamic>[1, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

 

DistinctUnique

 데이터 이벤트가 이전에 이미 발생한 경우 건너뛰는 Stream을 만듭니다.
 
 같음은 제공된 equals 및 hashCode 메서드에 의해 결정됩니다.
 생략하면 마지막으로 제공된 데이터 요소의 '=='연산자와 hashCode가 사용됩니다.
 이 스트림이 있는 경우 반환된 스트림은 브로드 캐스트 스트림입니다. 
 브로드 캐스트 스트림이 두 번 이상 수신되는 경우 각 구독은 equals 및 hashCode 테스트를 개별적으로 수행합니다.

 

🔻샘플 코드   

더보기
class _TestObject {
  final String key;

  const _TestObject(this.key);

  @override
  bool operator ==(Object other) =>
      identical(this, other) ||
      other is _TestObject &&
          runtimeType == other.runtimeType &&
          key == other.key;

  @override
  int get hashCode => key.hashCode;

  @override
  String toString() => key;
}





  test('distinctUnique (클래스의 equals 및 hascode와 함께 작동)', () async {
    // given
    final a = Stream.fromIterable(const [
      _TestObject('a'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a')
    ]);

    // when
    final result = a.distinctUnique();

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        const _TestObject('a'),
        const _TestObject('b'),
        const _TestObject('c'),
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('distinctUnique (조건으로 제공된 equals 및 해시 코드로 작동)', () async {
    // given
    final a = Stream.fromIterable(const [
      _TestObject('a'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a')
    ]);

    // when
    final result = a.distinctUnique(
        equals: (a, b) => a.key == b.key, hashCode: (o) => o.key.hashCode);

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        const _TestObject('a'),
        const _TestObject('b'),
        const _TestObject('c'),
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));


 

 

FlatMap

주어진 매퍼 함수를 ​​사용하여 방출된 각 항목을 Stream으로 변환합니다.
새로 생성된 Stream이 수신되고 항목을 다운 스트림으로 방출하기 시작합니다.
각 스트림에서 방출되는 항목은 도착한 순서대로 다운 스트림으로 방출됩니다. 즉, 시퀀스가 ​​함께 병합됩니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
class _TestObject {
  final String key;

  const _TestObject(this.key);

  @override
  bool operator ==(Object other) =>
      identical(this, other) ||
      other is _TestObject &&
          runtimeType == other.runtimeType &&
          key == other.key;

  @override
  int get hashCode => key.hashCode;

  @override
  String toString() => key;
}





  test('distinctUnique (클래스의 equals 및 hascode와 함께 작동)', () async {
    // given
    final a = Stream.fromIterable(const [
      _TestObject('a'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a')
    ]);

    // when
    final result = a.distinctUnique();

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        const _TestObject('a'),
        const _TestObject('b'),
        const _TestObject('c'),
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('distinctUnique (조건으로 제공된 equals 및 해시 코드로 작동)', () async {
    // given
    final a = Stream.fromIterable(const [
      _TestObject('a'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a'),
      _TestObject('b'),
      _TestObject('c'),
      _TestObject('a')
    ]);

    // when
    final result = a.distinctUnique(
        equals: (a, b) => a.key == b.key, hashCode: (o) => o.key.hashCode);

    // then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        const _TestObject('a'),
        const _TestObject('b'),
        const _TestObject('c'),
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));


 

 

FlatMapIteratorble

소스 스트림이 방출될 때마다 Stream <List>로 변환하여 방출합니다

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
 test('flatMapIteratorble', () async {
    //given
    var a = Rx.range(1, 4);

    //when
    Stream<int> stream =
        a.flatMapIterable((int i) => Stream<List<int>>.value(<int>[i, i]));

    //then
    await expectLater(
        stream, emitsInOrder(<dynamic>[1, 1, 2, 2, 3, 3, 4, 4, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

SwitchMap

주어진 매퍼 함수를 ​​사용하여 방출된 각 항목을 Stream으로 변환합니다. 
새로 생성된 스트림은 항목을 수신하고 방출을 시작하며 이전에 생성된 스트림은 방출을 중지합니다.
switchMap 연산자는 flatMap 및 concatMap 메서드와 유사하지만 가장 최근에 생성된 Stream에서만 항목을 내 보냅니다.
예를 들어 비동기 API의 최신 상태만 원할 때 유용할 수 있습니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
Stream<int> _getStream() {
    final controller = StreamController<int>();

    Timer(const Duration(milliseconds: 10), () => controller.add(1));
    Timer(const Duration(milliseconds: 20), () => controller.add(2));
    Timer(const Duration(milliseconds: 30), () => controller.add(3));
    Timer(const Duration(milliseconds: 40), () {
      controller.add(4);
      controller.close();
    });

    return controller.stream;
  }

  Stream<int> _getOtherStream(int value) {
    final controller = StreamController<int>();

    Timer(const Duration(milliseconds: 15), () => controller.add(value + 1));
    Timer(const Duration(milliseconds: 25), () => controller.add(value + 2));
    Timer(const Duration(milliseconds: 35), () => controller.add(value + 3));
    Timer(const Duration(milliseconds: 45), () {
      controller.add(value + 4);
      controller.close();
    });

    return controller.stream;
  }
  
  
  test('소스스트림과 다른스트림중, 가장 최근에 값을 방출한 스트림의 값들만 방출해야한다', () async {
    //given
    var a = _getStream();

    //when
    final result = a.switchMap(_getOtherStream);

    //then
    await expectLater(
      result,
      emitsInOrder(
        [5, 6, 7, 8],
      ),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

ExhaustMap

지정된 매퍼를 사용하여 소스 스트림의 항목들을 Stream으로 변환합니다. 
새 스트림이 완료될 때까지 소스 스트림의 모든 항목을 무시합니다.
소스 스트림의 이전 비동기 작업이 완료된 후에 만 ​​응답하려는 경우 유용합니다.

 

🔻샘플 코드   

더보기
test('MapperStream을 방출하는 동안 소스스트림을 무시해야한다', () async {
    //given
    var calls = 0;
    var a = Rx.range(0, 9);

    //when
    final stream = a.exhaustMap((i) {
      calls++;
      return Rx.timer(i, Duration(milliseconds: 100));
    });

    //then
    await expectLater(stream, emitsInOrder(<dynamic>[0, emitsDone]));
    await expectLater(calls, 1);
  }, timeout: Timeout(Duration(seconds: 5)));

  test('MapperStream을 방출하는 동안 소스스트림을 무시해야한다 2', () async {
    //given
    var a = Stream.fromIterable(const [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

    //when
    final stream =
        a.interval(Duration(milliseconds: 30)).exhaustMap((i) async* {
      yield await Future.delayed(Duration(milliseconds: 70), () => i);
    });

    //then
    await expectLater(stream, emitsInOrder(<dynamic>[0, 3, 6, 9, emitsDone]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

MapTo

소스 Stream이 값을 내보낼 때마다, 무조건 출력 Stream에서 주어진 상수 값을 내 보냅니다.

https://www.google.com/url?sa=i&url=https%3A%2F%2Frxjs-dev.firebaseapp.com%2Fapi%2Foperators%2FmapTo&psig=AOvVaw0wptxS5i0wWKoZqxvsHgCc&ust=1597412692943000&source=images&cd=vfe&ved=0CAIQjRxqFwoTCKCU7paomOsCFQAAAAAdAAAAABAD

🔻샘플 코드   

더보기
test('mapTo', () async {
    //given
    var a = Rx.range(1, 4);

    //when
    final result = a.mapTo(true);

    //then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        true,
        true,
        true,
        true,
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('mapTo with Error', () async {
    //given
    var a = Rx.range(1, 4).concatWith([Stream<int>.error(Error())]);

    //when
    final result = a.mapTo(true);

    //then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        true,
        true,
        true,
        true,
        emitsError(TypeMatcher<Error>()),
        emitsDone,
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

GroupBy

각 값을 그룹 지어 GroupByStream으로 방출합니다
GroupByStream은 일반 스트림처럼 작동하지만
Function Type에서 값 을받는 'key'속성이 있습니다.

 

🔻샘플 코드   

더보기
 String _toEventOdd(int value) => value == 0 ? 'even' : 'odd';

  test('groupBy', () async {
    //given
    var a = Stream.fromIterable([1, 2, 3, 4]);

    //when
    final result = a.groupBy((value) => value);

    //then
    await expectLater(
      result,
      emitsInOrder(<Matcher>[
        TypeMatcher<GroupByStream<int, int>>()
            .having((stream) => stream.key, 'key', 1),
        TypeMatcher<GroupByStream<int, int>>()
            .having((stream) => stream.key, 'key', 2),
        TypeMatcher<GroupByStream<int, int>>()
            .having((stream) => stream.key, 'key', 3),
        TypeMatcher<GroupByStream<int, int>>()
            .having((stream) => stream.key, 'key', 4),
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('groupBy, Map으로 각각 방출', () async {
    //given
    var a = Stream.fromIterable([1, 2, 3, 4]);

    //when
    final result = a
        .groupBy((value) => _toEventOdd(value % 2))
        .flatMap((stream) => stream.map((event) => {stream.key: event}));

    //then

    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        {'odd': 1},
        {'even': 2},
        {'odd': 3},
        {'even': 4},
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('groupBy, Map으로 묶어 방출', () async {
    //given
    var a = Stream.fromIterable([1, 2, 3, 4]);

    //when
    final result = a
        .groupBy((value) => _toEventOdd(value % 2))
        .map((stream) async => await stream.fold(
              {stream.key: <int>[]},
              (Map<String, List<int>> previous, element) {
                return previous..[stream.key].add(element);
              },
            ));
    // fold is called when onDone triggers on the Stream

    //then
    await expectLater(
      result,
      emitsInOrder(<dynamic>[
        {
          'odd': [1, 3]
        },
        {
          'even': [2, 4]
        },
        emitsDone
      ]),
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Interval

지정된 기간마다 소스 스트림의 각 항목을 내보내는 Stream을 만듭니다.

http://reactivex.io/documentation/operators/interval.html

🔻샘플 코드   

더보기
  Stream<int> _getStream() => Stream.fromIterable(const [0, 1, 2, 3, 4]);

  test('interval', () async {
    //given
    var a = _getStream();
    const expectedOutput = [0, 1, 2, 3, 4];
    var count = 0, lastInterval = -1;
    final stopwatch = Stopwatch()..start();

    //when
    final result = a.interval(const Duration(milliseconds: 1));

    //then
    result.listen(
        expectAsync1((result) {
          expect(expectedOutput[count++], result);

          if (lastInterval != -1) {
            expect(stopwatch.elapsedMilliseconds - lastInterval >= 1, true);
          }

          lastInterval = stopwatch.elapsedMilliseconds;
        }, count: expectedOutput.length),
        onDone: stopwatch.stop);
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Max

Stream에서 방출된 가장 큰 항목으로 완성되는 Future로 Stream을 변환합니다.
이는 목록에서 최대 값을 찾는 것과 유사하지만 값은 비동기 적입니다.
반환 값은 Future입니다
 * 
completion
성공적으로 완료된 Future와 일치하는 값을 찾습니다 matcher.
이것은 비동기 기대를 생성합니다. expect 호출이 즉시 반환되고 실행이 계속됩니다. 
나중에 Future가 완료되면 기대치 matcher가 실행됩니다. 
Future가 완료되고 실행될 것으로 예상되기를 기다리 려면 expectLater를 사용하고 반환된 Future를 기다립니다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
class _Class implements Comparable<_Class> {
  final int value;

  const _Class(this.value);

  @override
  String toString() => '_Class2{value: $value}';

  @override
  bool operator ==(Object other) =>
      identical(this, other) ||
      other is _Class &&
          runtimeType == other.runtimeType &&
          value == other.value;

  @override
  int get hashCode => value.hashCode;

  @override
  int compareTo(_Class other) => value.compareTo(other.value);
}


Stream<int> _getStream() =>
      Stream<int>.fromIterable(const <int>[2, 3, 3, 5, 2, 9, 1, 2, 0]);



  test('max', () async {
    //given
    var a = _getStream();

    //when
    final result = a.max();

    //then

    await expectLater(result, completion(9));
    expect(
      await Stream.fromIterable(<num>[1, 2, 3, 3.5]).max(),
      3.5,
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('max with Comparable', () async {
    //given
    const expected = _Class(3);
    var a = Stream.fromIterable(const [
      _Class(0),
      expected,
      _Class(2),
      _Class(-1),
      _Class(2),
    ]);

    //when
    final result = await a.max();

    //then
    expect(
      result,
      expected,
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

 

Min

Stream에서 내 보낸 가장 작은 항목으로 완료되는 Future로 Stream을 변환합니다.
이것은 List에서 최솟값을 찾는 것과 비슷하지만 값은 비동기 적입니다!
 
completion
성공적으로 완료된 Future와 일치하는 값을 찾습니다 matcher.
이것은 비동기 기대를 생성합니다. expect 호출이 즉시 반환되고 실행이 계속됩니다. 
나중에 Future가 완료되면 기대치 matcher가 실행됩니다. 
Future가 완료되고 실행될 것으로 예상되기를 기다리 려면 expectLater를 사용하고 반환된 Future를 기다립니다.

 

🔻샘플 코드   

더보기
class _Class implements Comparable<_Class> {
  final int value;

  const _Class(this.value);

  @override
  String toString() => '_Class2{value: $value}';

  @override
  bool operator ==(Object other) =>
      identical(this, other) ||
      other is _Class &&
          runtimeType == other.runtimeType &&
          value == other.value;

  @override
  int get hashCode => value.hashCode;

  @override
  int compareTo(_Class other) => value.compareTo(other.value);
}


Stream<int> _getStream() =>
      Stream<int>.fromIterable(const <int>[2, 3, 3, 5, 2, 9, 1, 2, 0]);

  test('min', () async {
    //given
    var a = _getStream();

    //when
    final result = a.min();

    //then

    await expectLater(result, completion(0));
    expect(
      await Stream.fromIterable(<num>[1, 2, 3, 3.5]).min(),
      1,
    );
  }, timeout: Timeout(Duration(seconds: 5)));

  test('min with Comparable', () async {
    //given
    const expected = _Class(-1);
    var a = Stream.fromIterable(const [
      _Class(0),
      expected,
      _Class(2),
      _Class(3),
      _Class(2),
    ]);

    //when
    final result = await a.min();

    //then
    expect(
      result,
      expected,
    );
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

PairWise

n 번째 및 n-1 번째 이벤트를 쌍으로 표시합니다.

🔻샘플 코드   

더보기
Stream<int> getStream(int n) async* {
  var k = 0;

  while (k < n) {
    await Future<Null>.delayed(const Duration(milliseconds: 100));

    yield k++;
  }
}


  test('Pairwise', () async {
    // given
    var a = Rx.range(0, 4);

    // when
    Stream<List<int>> result = a.pairwise();

    // then
    await expectLater(
        result,
        emitsInOrder(<dynamic>[
          const [0, 1],
          const [1, 2],
          const [2, 3],
          const [3, 4],
          emitsDone
        ]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

SkipUntil

지정된 스트림이 항목을 방출한 후에 만 ​​항목 방출을 시작합니다.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

 

🔻샘플 코드   

더보기
  Stream<int> _getStream() {
    final controller = StreamController<int>();

    Timer(const Duration(milliseconds: 100), () => controller.add(1));
    Timer(const Duration(milliseconds: 200), () => controller.add(2));
    Timer(const Duration(milliseconds: 300), () => controller.add(3));
    Timer(const Duration(milliseconds: 400), () {
      controller.add(4);
      controller.close();
    });

    return controller.stream;
  }

  Stream<int> _getOtherStream() {
    final controller = StreamController<int>();

    Timer(const Duration(milliseconds: 250), () {
      controller.add(1);
      controller.close();
    });

    return controller.stream;
  }

  test('other스트림이 값을 방출한 후에, 소스스트림값들만 방출해야한다', () async {
    // given
    final a = _getStream();

    // when
    final result = a.skipUntil(_getOtherStream());

    // then
    await expectLater(result, emitsInOrder([3, 4]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

TakeUntil

다른 Stream 시퀀스가 ​​값을 생성할 때까지만
소스 Stream 시퀀스의 값을 반환합니다.

 

 

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

🔻샘플 코드   

더보기
 Stream<int> _getStream() {
    final controller = StreamController<int>();

    Timer(const Duration(milliseconds: 100), () => controller.add(1));
    Timer(const Duration(milliseconds: 200), () => controller.add(2));
    Timer(const Duration(milliseconds: 300), () => controller.add(3));
    Timer(const Duration(milliseconds: 400), () {
      controller.add(4);
      controller.close();
    });

    return controller.stream;
  }

  Stream<int> _getOtherStream() {
    final controller = StreamController<int>();

    Timer(const Duration(milliseconds: 250), () {
      controller.add(1);
      controller.close();
    });

    return controller.stream;
  }

  test('다른스트림의 값이 방출될때까지만, 소스스트림의 값들을 방출해야한다', () async {
    // given
    final a = _getStream();

    // when
    final result = a.takeUntil(_getOtherStream());

    // then
    await expectLater(result, emitsInOrder([1, 2]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

TakeWhileInclusive

각 값이 주어진 테스트(조건)를 충족할 때까지만 소스 스트림에서 값을 내 보냅니다.
테스트가 값으로 만족되지 않으면이 값을 모두 내보냅니다.

🔻샘플 코드   

더보기
   test('조건을 만족할때까지만 소스스트림의 값을 내보내야한다', () async {
    // given
    final a = Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3]);

    // when
    final result = a.takeWhileInclusive((i) => i < 4);

    // then
    await expectLater(result, emitsInOrder([2, 3]));
  }, timeout: Timeout(Duration(seconds: 5)));

  test('TakeWhilInclusive 조건에 일치하는 값이 없을때, 소스스트림의 첫 번째 값을 방출한다', () async {
    // given
    final a = Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3]);

    // when
    final result = a.takeWhileInclusive((i) => i > 9);

    // then
    await expectLater(result, emitsInOrder([2]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

Timestamp

소스 스트림에서 내 보낸 각 항목을 내 보낸 Timestamped항목과 항목이 내 보낸 시간을 포함하는 개체에 래핑 합니다.

🔻샘플 코드   

더보기
 test('timeStamp', () async {
    // given
    final a = Stream.fromIterable(const [1, 2, 3]);

    // when
    final result = a.timestamp();

    // then
    await expectLater(
        result,
        emitsInOrder([
          TypeMatcher<Timestamped>(),
          TypeMatcher<Timestamped>(),
          TypeMatcher<Timestamped>(),
        ]));

    // TimeStamp{timestamp: 2020-08-10 00:58:57.052087, value: 1}
    // TimeStamp{timestamp: 2020-08-10 00:58:57.053311, value: 2}
    // TimeStamp{timestamp: 2020-08-10 00:58:57.053397, value: 3}
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

WhereType

조건의 타입과 일치하지 않는 이벤트는 필터링되며 결과 Stream는 Type이 T 됩니다.

🔻샘플 코드   

더보기
test('WhereType', () async {
    // given
    final a = Stream.fromIterable([1, 'two', 3, 'four']);

    // when
    final result = a.whereType<int>();

    // then
    await expectLater(result, emitsInOrder([1, 3]));
  }, timeout: Timeout(Duration(seconds: 5)));

 

 

 

DoXX 함수

 

 

doOnData

스트림이 항목을 내보낼 때 지정된 콜백 함수를 호출합니다. 다른 rx구현에서는 이를 doOnNext라고 합니다.

🔻샘플 코드   

더보기
test('아이템이 방출되었을때, doOnData가 호출되어야한다', () async {
    //given
    var onDataCalled = false;
    var a = Stream.value(1);

    //when
    final stream = a.doOnData((_) => onDataCalled = true);

    //then
    await expectLater(stream, emits(1));
    await expectLater(onDataCalled, isTrue);
  });

  test('브로드캐스트 스트림에서 doOnData는 1번만 호출되어야한다', () async {
    //given
    final actual = <int>[];
    final controller = BehaviorSubject<int>(sync: true);

    //when
    final stream =
        controller.stream.transform(DoStreamTransformer(onData: actual.add));
    stream.listen(null);
    stream.listen(null);
    controller.add(1);
    controller.add(2);

    //then
    await expectLater(actual, const [1, 2]);
    await controller.close();
  });

 

 

doOnDone

스트림이 항목 방출을 완료하면 지정된 콜백 함수를 호출합니다. 다른 rx구현에서는 이를 doOnComplete라고 합니다.

🔻샘플 코드   

더보기
  test('스트림이 종료되면 doOnDone이 호출되어야한다', () async {
    //given
    var onDoneCalled = false;
    final a = Stream<void>.empty();

    //when
    final stream = a.doOnDone(() => onDoneCalled = true);

    //then
    await expectLater(stream, emitsDone);
    await expectLater(onDoneCalled, isTrue);
  });

 

 

doOnError

스트림에서 오류가 발생하면 지정된 콜백 함수를 호출합니다.

🔻샘플 코드   

더보기
 test('에러가 방출되었을때 doOnError가 호출되어야한다', () async {
    //given
    var onErrorCalled = false;
    final a = Stream<void>.error(Exception());

    //when
    final stream = a.doOnError((dynamic e, dynamic s) => onErrorCalled = true);

    //then

    await expectLater(stream, emitsError(isException));
    await expectLater(onErrorCalled, isTrue);
  });

  test('브로드캐스트 스트림에서 에러가 발생했을때, doOnError는 1번만 호출되어야한다', () async {
    //given
    var count = 0;
    final subject = BehaviorSubject<int>(sync: true);

    //when
    final stream = subject.stream.doOnError(
      (dynamic e, dynamic s) => count++,
    );
    stream.listen(null, onError: (dynamic e, dynamic s) {});
    stream.listen(null, onError: (dynamic e, dynamic s) {});
    subject.addError(Exception());
    subject.addError(Exception());

    //then
    await expectLater(count, 2);
    await subject.close();
  });

 

 

doOnEach

스트림이 데이터를 내보내거나 error를 내거나 done를 내 보낸 경우 지정된 콜백 함수를 호출합니다.
콜백은 Notification객체를 수신합니다.
Notification(onData, onDone OnError)

🔻샘플 코드   

더보기
 test('Data, Error, Done알림이 있을때 doOnEach를 호출해야한다', () async {
    //given
    StackTrace stacktrace;
    final actual = <Notification<int>>[];
    final exception = Exception();
    final a = Stream.value(1).concatWith([Stream<int>.error(exception)]);

    //when
    final stream = a.doOnEach((notification) {
      actual.add(notification);
      if (notification.isOnError) {
        stacktrace = notification.stackTrace;
      }
    });

    //then
    await expectLater(
      stream,
      emitsInOrder(<dynamic>[1, emitsError(isException), emitsDone]),
    );
    await expectLater(actual, [
      Notification.onData(1),
      Notification<void>.onError(exception, stacktrace),
      Notification<void>.onDone()
    ]);
  });

  test('브로드캐스트 스트림에서 doOnEach는 1번만 호출되어야한다', () async {
    //given
    var count = 0;
    final controller = StreamController<int>.broadcast(sync: true);
    final stream = controller.stream.transform(DoStreamTransformer(onEach: (_) {
      count++;
    }));

    //when
    stream.listen(null);
    stream.listen(null);
    controller.add(1);
    controller.add(2);

    //then
    await expectLater(count, 2);
    await controller.close();
  });

 

 

doOnCancel

스트림 구독이 취소되면 지정된 콜백 함수를 호출합니다.
다른 rx구현에서는 종종 doOnUnsubscribe 또는 doOnDispose라고 합니다.

🔻샘플 코드   

더보기
  test('subscription을 취소했을때, doOnCancel이 호출되어야한다', () async {
    //given
    var onCancelCalled = false;
    final stream = Stream.value(1);

    //when
    await stream.doOnCancel(() => onCancelCalled = true).listen(null).cancel();

    //then
    await expectLater(onCancelCalled, isTrue);
  });

  test('브로드캐스트 스트림에서 onCanceld은 1번만 호출되어야한다', () async {
    // given
    var count = 0;
    final subject = BehaviorSubject<int>(sync: true);

    // when
    final stream = subject.doOnCancel(() => count++);
    await stream.listen(null).cancel();
    await stream.listen(null).cancel();

    // then
    await expectLater(count, 2);
    await subject.close();
  });

 

 

doOnPasue, doOnResume

doOnPause
스트림 구독이 일시 중지되면 지정된 콜백 함수를 호출합니다.
  
doOnResume
스트림 구독이 항목 수신을 재개할 때 지정된 콜백 함수를 호출합니다.

🔻샘플 코드   

더보기
test('subscriptiond에서 호출시 onPause와 onResume이 호출되어야한다', () async {
    //given
    var onPauseCalled = false, onResumeCalled = false;
    var a = Stream.value(1);

    //when
    final stream = a.doOnPause((_) {
      onPauseCalled = true;
    }).doOnResume(() {
      onResumeCalled = true;
    });

    //then
    stream.listen(null, onDone: expectAsync0(() {
      expect(onPauseCalled, isTrue);
      expect(onResumeCalled, isTrue);
    }))
      ..pause()
      ..resume();
  });

 

 

 

 

Subject

 

PublishSubject

PublishSubject는 구독 이후에 소스 Stream이 배출한 항목들만 옵서버에게 방출합니다.

http://reactivex.io/documentation/ko/subject.html

 

BehaviorSubject

BehavoirSubject는 구독하기 시작하면,

소스 Stream이 가장 최근에 발행한 항목(또는 아직 아무 값도 발행되지 않았다면 맨 처음 값이나 기본 값)의 발행을 시작하며

그 이후 소스 Stream에 의해 발행된 항목들을 계속 발행합니다

http://reactivex.io/documentation/ko/subject.html

 

ReplaySubject

ReplaySubject는 옵서버가 구독을 시작한 시점과 관계없이 소스 스트림을 처음부터 모두 방출합니다

http://reactivex.io/documentation/ko/subject.html

 

전체 샘플 코드 보러 가기

https://github.com/qjatjr1108/rxdart_playground/blob/master/README.md

 

qjatjr1108/rxdart_playground

Contribute to qjatjr1108/rxdart_playground development by creating an account on GitHub.

github.com

 

 

 

 

..

참고

- https://rxmarbles.com

- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

- http://reactivex.io/documentation/operators

https://pub.dev/packages/rxdart