window method

Observable<Stream<T>> window (SamplerBuilder<T, Stream<T>> sampler)

Creates an Observable where each item is a Stream containing the items from the source sequence, batched by the sampler.

Example with onCount

Observable.range(1, 4)
  .window(onCount(2))
  .doOnData((_) => print('next window'))
  .flatMap((s) => s)
  .listen(print); // prints next window 1, 2, next window 3, 4

Example with onFuture

new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
  .window(onFuture(() => new Future.delayed(const Duration(milliseconds: 220))))
  .doOnData((_) => print('next window'))
  .flatMap((s) => s)
  .listen(print); // prints next window 0, 1, next window 2, 3, ...

Example with onTest

new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
  .window(onTest((i) => i % 2 == 0))
  .doOnData((_) => print('next window'))
  .flatMap((s) => s)
  .listen(print); // prints next window 0, next window 1, 2 next window 3, 4,  ...

Example with onTime

new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
  .window(onTime(const Duration(milliseconds: 220)))
  .doOnData((_) => print('next window'))
  .flatMap((s) => s)
  .listen(print); // prints next window 0, 1, next window 2, 3, ...

Example with onStream

new Observable.periodic(const Duration(milliseconds: 100), (int i) => i)
  .window(onStream(new Stream.periodic(const Duration(milliseconds: 220), (int i) => i)))
  .doOnData((_) => print('next window'))
  .flatMap((s) => s)
  .listen(print); // prints next window 0, 1, next window 2, 3, ...

You can create your own sampler by extending StreamView should the above samplers be insufficient for your use case.

Implementation

Observable<Stream<T>> window(SamplerBuilder<T, Stream<T>> sampler) =>
    transform(new WindowStreamTransformer<T>((Stream<T> stream,
            OnDataTransform<T, Stream<T>> bufferHandler,
            OnDataTransform<Stream<T>, Stream<T>> scheduleHandler) =>
        sampler(stream, bufferHandler, scheduleHandler)));