stream_transformers 0.3.0+3

  • README.md
  • CHANGELOG.md
  • Installing
  • Versions
  • 62

stream_transformers

Build Status Coverage Status

This library provides a set of stream transformers for Dart's Stream class.

These transformers are used internally by Frappe. If you're looking for a more featured functional reactive programming (FRP) library for Dart, you should look there.

Transformers

BufferWhen

Pauses the delivery of events from the source stream when the signal stream delivers a value of true. The buffered events are delivered when the signal delivers a value of false. Errors originating from the source and signal streams will be forwarded to the transformed stream and will not be buffered. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller = new StreamController();
var signal = new StreamController();

var stream = controller.stream;

var buffered = stream.transform(new BufferWhen(signal.stream));

controller.add(1);
signal.add(true);
controller.add(2);

buffered.listen(print); // Prints: 1

Combine

Combines the latest values of two streams using a two argument function. The combining function will not be called until each stream delivers its first value. After the first value of each stream is delivered, the combining function will be invoked for each event from the source streams. Errors occurring on the streams will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller1 = new StreamController();
var controller2 = new StreamController();

var combined = controller1.stream.transform(new Combine(controller2.stream, (a, b) => a + b));

combined.listen(print);

controller1.add(1);
controller2.add(1); // Prints: 2
controller1.add(2); // Prints: 3
controller2.add(2); // Prints: 4

Use the static function Combine.all(List<Stream>) to combine a list of streams together. The returned stream will contain a List that contains the current values of each of the streams.

Example:

var controller1 = new StreamController();
var controller2 = new StreamController();

var combined = Combine.all([controller1.stream, controller2.stream]);

combined.listen(print);

controller1.add(1);
controller2.add(2); // Prints: [1, 2]
controller1.add(3); // Prints: [3, 2]
controller2.add(4); // Prints: [3, 4]

Concat

Concatenates two streams into one stream by delivering the values of the source stream, and then delivering the values of the other stream once the source stream completes. This means that it's possible that events from the second stream might not be included if the source stream hasn't completed. Use Concat.all() to concatenate many streams.

Errors will be forwarded from either stream, whether or not the source stream has completed. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var source = new StreamController();
var other = new StreamController();

var stream = source.stream.transform(new Concat(other.stream));
stream.listen(print);

other..add(1)..add(2);
source..add(3)..add(4)..close();

// 3
// 4
// 1
// 2

ConcatAll

Concatenates a stream of streams into a single stream, by delivering the first stream's values, and then delivering the next stream's values after the previous stream has completed.

This means that it's possible that events from the second stream might not be included if the source stream hasn't completed. Use Concat.all() to concatenate many streams.

Errors will be forwarded from either stream, whether or not the source stream has completed. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var source = new StreamController();
var other1 = new StreamController();
var other2 = new StreamController();

source..add(other1.stream)..add(other2.stream);

other2..add(1)..add(2);
other1..add(3)..add(4)..close();

var stream = source.stream.transform(new ConcatAll());
stream.listen(print);

// 3
// 4
// 1
// 2

Debounce

Delivers the last event in the stream after the duration has passed without receiving an event.

Errors occurring on the source stream will not be ignored. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

source:             asdf----asdf----
source.debounce(2): -----f-------f--

Example:

var controller = new StreamController();

var debounced = controller.stream.transform(new Debounce(new Duration(seconds:1)));
debounced.listen(print);

controller.add(1);
controller.add(2);
controller.add(3);

// Prints: 3

Delay

Throttles the delivery of each event by a given duration. Errors occurring on the source stream will not be delayed. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller = new StreamController();
var delayed = controller.stream.transform(new Delay(new Duration(seconds: 2)));

// source:              asdf----
// source.delayed(2):   --a--s--d--f---

DoAction

Invokes a side-effect function for each value, error and done event in the stream.

This is useful for debugging, but also invoking preventDefault for browser events. Side effects will only be invoked once if the transformed stream has multiple subscribers.

Errors occurring on the source stream will be forwarded to the returned stream, even when passing an error handler to DoAction. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller = new StreamController();
var sideEffect = new DoAction((value) => print("Do Next: $value"),
    onError: (error) => print("Do Error: $error"),
    onDone: () => print("Do Done"));
var stream = controller.stream.transform(sideEffect);

stream.listen((value) => print("Next: $value"),
    onError: (e) => print("Error: $e"),
    onDone: () => print("Done"));

controller..add(1)..add(2)..close();

// Do Next: 1
// Next: 1
// Do Next: 2
// Next: 2
// Do Done
// Done

FlatMap

Spawns a new stream from a function for each event in the source stream. The returned stream will contain the events and errors from each of the spawned streams until they're closed. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller = new StreamController();
var flapMapped = controller.stream.transform(new FlatMap((value) => new Stream.fromIterable([value + 1]));

flatMapped.listen(print);

controller.add(1); // Prints: 2
controller.add(2); // Prints: 3

FlatMapLatest

Similar to FlatMap, but instead of including events from all spawned streams, only includes the ones from the latest stream. Think of this as stream switching.

Example:

var controller = new StreamController();
var latest = controller.stream.transform(new FlatMap((value) => new Stream.fromIterable([value + 1]));

latest.listen(print);

controller.add(1);
controller.add(2); // Prints: 3

Merge

Combines the events from two streams into a single stream. Errors occurring on a source stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller1 = new StreamController();
var controller2 = new StreamController();

var merged = controller1.stream.transform(new Merge(controller2.stream));

merged.listen(print);

controller1.add(1); // Prints: 1
controller2.add(2); // Prints: 2
controller1.add(3); // Prints: 3
controller2.add(4); // Prints: 4

Use the static function Merge.all(List<Stream>) to merge all streams of a list into a single stream.

MergeAll

Combines the events from a stream of streams into a single stream.

The returned stream will contain the errors occurring on any stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var source = new StreamController();
var stream1 = new Stream.fromIterable([1, 2]);
var stream2 = new Stream.fromIterable([3, 4]);

var merged = source.stream.transform(new MergeAll());
source..add(stream1)..add(stream2);

merged.listen(print);

// 1
// 2
// 3
// 4

SampleOn

Takes the latest value of the source stream whenever the trigger stream produces an event.

Errors that happen on the source stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

// values start at 0
var source = new Stream.periodic(new Duration(seconds: 1), (i) => i);
var trigger = new Stream.periodic(new Duration(seconds: 2), (i) => i);

var stream = source.stream.transform(new SampleOn(trigger.stream)).take(3);

stream.listen(print);

// 0
// 2
// 4

SamplePeriodically

Takes the latest value of the source stream at a specified interval.

Errors that happen on the source stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

// values start at 0
var source = new Stream.periodic(new Duration(seconds: 1), (i) => i);
var stream = source.stream.transform(new SamplePeriodically(new Duration(seconds: 2)).take(3);

stream.listen(print);

// 0
// 2
// 4

Scan

Reduces the values of a stream into a single value by using an initial value and an accumulator function. The function is passed the previous accumulated value and the current value of the stream. This is useful for maintaining state using a stream. Errors occurring on the source stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var button = new ButtonElement();

var clickCount = button.onClick.transform(new Scan(0, (previous, current) => previous + 1));

clickCount.listen(print);

// [button click] .. prints: 1
// [button click] .. prints: 2

SelectFirst

Forwards events from the first stream to deliver an event.

Errors are forwarded from both streams until a stream is selected. Once a stream is selected, only errors from the selected stream are forwarded. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var stream1 = new Stream.periodic(new Duration(seconds: 1)).map((_) => "Stream 1");
var stream2 = new Stream.periodic(new Duration(seconds: 2)).map((_) => "Stream 2");

var selected = stream1.transform(new SelectFirst(stream2)).take(1);
selected.listen(print);

// Stream 1

SkipUntil

Waits to deliver events from a stream until the signal Stream delivers a value. Errors that happen on the source stream will be forwarded once the Stream delivers its value. Errors happening on the signal stream will be forwarded immediately. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var signal = new StreamController();
var controller = new StreamController();

var skipStream = controller.stream.transform(new SkipUntil(signal.stream));

skipStream.listen(print);

controller.add(1);
controller.add(2);
signal.add(true);
controller.add(3); // Prints: 3
controller.add(4); // Prints: 4

StartWith

Prepends values to the beginning of a stream. Use StartWith.many to prepend multiple values.

Errors on the source stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var source = new Stream.fromIterable([2, 3]);
var stream = source.transform(new StartWith(1));
stream.listen(print);

// 1
// 2
// 3

TakeUntil

Delivers events from the source stream until the signal Stream produces a value. At which point, the transformed stream closes. The returned stream will continue to deliver values if the signal stream closes without a value.

This is useful for automatically cancelling a stream subscription to prevent memory leaks. Errors that happen on the source and signal stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var signal = new StreamController();
var controller = new StreamController();

var takeUntil = controller.stream.transform(new TakeUntil(signal.stream));

takeUntil.listen(print);

controller.add(1); // Prints: 1
controller.add(2); // Prints: 2
signal.add(true);
controller.add(3);
controller.add(4);

When

Starts delivering events from the source stream when the signal stream delivers a value of true. Events are skipped when the signal stream delivers a value of false. Errors from the source or toggle stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller = new StreamController();
var signal = new StreamController();

var whenStream = controller.stream.transform(new When(signal.stream));

whenStream.listen(print);

controller.add(1);
signal.add(true);
controller.add(2); // Prints: 2
signal.add(false);
controller.add(3);

Zip

Combines the events of two streams into one by invoking a combiner function that is invoked when each stream delivers an event at each index. The transformed stream finishes when either source stream finishes. Errors from either stream will be forwarded to the transformed stream. If the source stream is a broadcast stream, then the transformed stream will also be a broadcast stream.

Example:

var controller1 = new StreamController();
var controller2 = new StreamController();

var zipped = controller1.stream.transform(new Zip(controller2.stream, (a, b) => a + b));

zipped.listen(print);

controller1.add(1);
controller1.add(2);
controller2.add(1); // Prints 2
controller1.add(3);
controller2.add(2); // Prints 4
controller2.add(3); // Prints 6

Running Tests

Tests are run using test_runner.

  • Install test_runner: pub global activate test_runner
  • Run test_runner inside stream_transformers: pub global run run_tests

Features and bugs

Please file feature requests and bugs at the issue tracker.

Changelog

Note: Patch versions that only include documentation changes are omitted.

0.3.0+3 (09/16/2015)

0.3.0 (02/30/2015)

  • Add Concat [#15]
  • Add ConcatAll [#16]
  • Add DoAction [#4]
  • Add MergeAll [#2]
  • Add SampleOn [#1]
  • Add SamplePeriodically [#3]
  • Add SelectFirst [#13]
  • Add StartWith [#14]
  • FlatMap now closes only when its source is closed.

0.2.0 (02/01/2015)

  • Changed the behavior of debounce to debounce the first value [#5]
  • Rename StreamConverter to Mapper, and generalized type signature to R Mapper<A,R>(A value) [#10]
  • Make sure errors for transformers are forwarded as documented
  • Make sure internal subscriptions are cancelled when the transformed stream subscriptions are cancelled

0.1.0+3 (01/25/2015)

  • Fixed issue #6 where Scan doesn't include the initial value in the transformed stream.

0.1.0+1 (01/14/2015)

  • Fixed an issue where When and Zip transformers would not return the same stream type.

0.1.0 (01/10/2015)

Initial version

  • Add BufferWhen transformer
  • Add Combine transformer
  • Add Debounce transformer
  • Add Delay transformer
  • Add FlatMap transformer
  • Add FlatMapLatest transformer
  • Add Merge transformer
  • Add Scan transformer
  • Add SkipUntil transformer
  • Add TakeUntil transformer
  • Add When transformer
  • Add Zip transformer

1. Depend on it

Add this to your package's pubspec.yaml file:


dependencies:
  stream_transformers: "^0.3.0+3"

2. Install it

You can install packages from the command line:

with pub:


$ pub get

Alternatively, your editor might support pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:


import 'package:stream_transformers/stream_transformers.dart';
        
Version Uploaded Documentation Archive
0.3.0+3 Sep 26, 2015 Go to the documentation of stream_transformers 0.3.0+3 Download stream_transformers 0.3.0+3 archive
0.3.0+2 Mar 10, 2015 Go to the documentation of stream_transformers 0.3.0+2 Download stream_transformers 0.3.0+2 archive
0.3.0+1 Feb 20, 2015 Go to the documentation of stream_transformers 0.3.0+1 Download stream_transformers 0.3.0+1 archive
0.3.0 Feb 20, 2015 Go to the documentation of stream_transformers 0.3.0 Download stream_transformers 0.3.0 archive
0.2.0+1 Feb 15, 2015 Go to the documentation of stream_transformers 0.2.0+1 Download stream_transformers 0.2.0+1 archive
0.2.0 Feb 1, 2015 Go to the documentation of stream_transformers 0.2.0 Download stream_transformers 0.2.0 archive
0.1.0+3 Jan 26, 2015 Go to the documentation of stream_transformers 0.1.0+3 Download stream_transformers 0.1.0+3 archive
0.1.0+2 Jan 26, 2015 Go to the documentation of stream_transformers 0.1.0+2 Download stream_transformers 0.1.0+2 archive
0.1.0+1 Jan 14, 2015 Go to the documentation of stream_transformers 0.1.0+1 Download stream_transformers 0.1.0+1 archive
0.1.0 Jan 11, 2015 Go to the documentation of stream_transformers 0.1.0 Download stream_transformers 0.1.0 archive

Analysis

This feature is new.
We welcome feedback.

We analyzed this package, and provided a score, details, and suggestions below.

  • tool failures on Dec 6, 2017
  • Dart: 2.0.0-dev.8.0
  • pana: 0.7.3+1

Scores

Popularity:
Describes how popular the package is relative to other packages. [more]
93
Health:
Code health derived from static analysis. [more]
51
Maintenance:
Reflects how tidy and up-to-date the package is. [more]
0
Overall score:
Weighted score of the above. [more]
62

Platforms

Detected platforms:

Error(s) prevent platform classification.

Suggestions

  • Fix lib/src/combine.dart.

    Strong-mode analysis of lib/src/combine.dart failed with the following error:

    line: 28 col: 24
    The argument type '(EventSink<List<dynamic>>) → StreamSubscription<List<Object>>' can't be assigned to the parameter type '(EventSink<dynamic>) → StreamSubscription<dynamic>'.

  • Fix lib/src/zip.dart.

    Strong-mode analysis of lib/src/zip.dart failed with the following error:

    line: 50 col: 38
    The argument type '(EventSink<R>) → StreamSubscription<R>' can't be assigned to the parameter type '(EventSink<dynamic>) → StreamSubscription<dynamic>'.

  • Fix further 9 Dart files.

    Similar analysis of the following files failed:

    • lib/src/take_until.dart
    • lib/src/when.dart
    • lib/src/flat_map.dart
    • lib/src/sample_on.dart
    • lib/src/flat_map_latest.dart
    • lib/src/scan.dart
    • lib/src/do_action.dart
    • lib/src/merge_all.dart
    • lib/src/util.dart

Dependencies

Package Constraint Resolved Available
Dev dependencies
guinness >=0.1.16 <0.2.0
unittest >=0.11.4 <0.12.0