ReplaySubject<T> class

A special StreamController that captures all of the items that have been added to the controller, and emits those as the first items to any new listener.

This subject allows sending data, error and done events to the listener. As items are added to the subject, the ReplaySubject will store them. When the stream is listened to, those recorded items will be emitted to the listener. After that, any new events will be appropriately sent to the listeners. It is possible to cap the number of stored events by setting a maxSize value.

ReplaySubject is, by default, a broadcast (aka hot) controller, in order to fulfill the Rx Subject contract. This means the Subject's stream can be listened to multiple times.

Example

final subject = new ReplaySubject<int>();

subject.add(1);
subject.add(2);
subject.add(3);

subject.stream.listen(print); // prints 1, 2, 3
subject.stream.listen(print); // prints 1, 2, 3
subject.stream.listen(print); // prints 1, 2, 3

Example with maxSize

final subject = new ReplaySubject<int>(maxSize: 2);

subject.add(1);
subject.add(2);
subject.add(3);

subject.stream.listen(print); // prints 2, 3
subject.stream.listen(print); // prints 2, 3
subject.stream.listen(print); // prints 2, 3
Inheritance
Implemented types

Constructors

ReplaySubject({int maxSize, void onListen(), void onCancel(), bool sync: false })
factory

Properties

values List<T>
Synchronously get the values stored in Subject. May be empty.
read-only, override
controller StreamController<T>
final, inherited
done Future
Return a future which is completed when the StreamSink is finished. [...]
read-only, inherited
first AsObservableFuture<T>
The first element of this stream. [...]
read-only, inherited
hashCode int
The hash code for this object. [...]
read-only, inherited
hasListener bool
Whether there is a subscriber on the Stream.
read-only, inherited
isBroadcast bool
Whether this stream is a broadcast stream.
read-only, inherited
isClosed bool
Whether the stream controller is closed for adding more events. [...]
read-only, inherited
isEmpty AsObservableFuture<bool>
Whether this stream contains any elements. [...]
read-only, inherited
isPaused bool
Whether the subscription would need to buffer events. [...]
read-only, inherited
last AsObservableFuture<T>
The last element of this stream. [...]
read-only, inherited
length AsObservableFuture<int>
The number of elements in this stream. [...]
read-only, inherited
onCancel ControllerCancelCallback
The callback which is called when the stream is canceled. [...]
read / write, inherited
onListen ControllerCallback
The callback which is called when the stream is listened to. [...]
read / write, inherited
onPause ControllerCallback
The callback which is called when the stream is paused. [...]
read / write, inherited
onResume ControllerCallback
The callback which is called when the stream is resumed. [...]
read / write, inherited
runtimeType Type
A representation of the runtime type of the object.
read-only, inherited
single AsObservableFuture<T>
The single element of this stream. [...]
read-only, inherited
sink StreamSink<T>
Returns a view of this object that only exposes the StreamSink interface.
read-only, inherited
stream Observable<T>
The stream that this controller is controlling.
read-only, inherited

Methods

onAdd(T event) → void
An extension point for sub-classes. Perform any side-effect / state management you need to here, rather than overriding the add method directly.
override
add(T event) → void
Sends a data event. [...]
inherited
addError(Object error, [ StackTrace stackTrace ]) → void
Sends or enqueues an error event. [...]
inherited
addStream(Stream<T> source, { bool cancelOnError: true }) Future
Receives events from source and puts them into this controller's stream. [...]
inherited
any(bool test(T element)) AsObservableFuture<bool>
Checks whether test accepts any element provided by this stream. [...]
inherited
asBroadcastStream({void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription) }) Observable<T>
Returns a multi-subscription stream that produces the same events as this. [...]
inherited
asyncExpand<S>(Stream<S> mapper(T value)) Observable<S>
Maps each emitted item to a new Stream using the given mapper, then subscribes to each new stream one after the next until all values are emitted. [...]
inherited
asyncMap<S>(FutureOr<S> convert(T value)) Observable<S>
Creates an Observable with each data event of this stream asynchronously mapped to a new event. [...]
inherited
buffer(SamplerBuilder<T, List<T>> sampler) Observable<List<T>>
Creates an Observable where each item is a List containing the items from the source sequence, batched by the sampler. [...]
inherited
bufferCount(int count, [ int startBufferEvery = 0 ]) Observable<List<T>>
Buffers a number of values from the source Observable by count then emits the buffer and clears it, and starts a new buffer each startBufferEvery values. If startBufferEvery is not provided or is null, then new buffers are started immediately at the start of the source and when each buffer closes and is emitted. [...]
inherited
bufferFuture<O>(Future<O> onFutureHandler()) Observable<List<T>>
Creates an Observable where each item is a List containing the items from the source sequence, batched whenever onFutureHandler completes. [...]
inherited
bufferTest(bool onTestHandler(T event)) Observable<List<T>>
Creates an Observable where each item is a List containing the items from the source sequence, batched whenever onTestHandler passes. [...]
inherited
bufferTime(Duration duration) Observable<List<T>>
Creates an Observable where each item is a List containing the items from the source sequence, sampled on a time frame with duration. [...]
inherited
bufferWhen<O>(Stream<O> other) Observable<List<T>>
Creates an Observable where each item is a List containing the items from the source sequence, sampled on onStream. [...]
inherited
cast<R>() Observable<R>
Adapt this stream to be a Stream<R>. [...]
inherited
close() Future
Closes the stream. [...]
inherited
concatMap<S>(Stream<S> mapper(T value)) Observable<S>
Maps each emitted item to a new Stream using the given mapper, then subscribes to each new stream one after the next until all values are emitted. [...]
inherited
concatWith(Iterable<Stream<T>> other) Observable<T>
Returns an Observable that emits all items from the current Observable, then emits all items from the given observable, one after the next. [...]
inherited
contains(Object needle) AsObservableFuture<bool>
Returns whether needle occurs in the elements provided by this stream. [...]
inherited
debounce(Duration duration) Observable<T>
Creates an Observable that will only emit items from the source sequence if a particular time span has passed without the source sequence emitting another item. [...]
inherited
defaultIfEmpty(T defaultValue) Observable<T>
Emit items from the source Stream, or a single default item if the source Stream emits nothing. [...]
inherited
delay(Duration duration) Observable<T>
The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment. [...]
inherited
dematerialize<S>() Observable<S>
Converts the onData, onDone, and onError Notification objects from a materialized stream into normal onData, onDone, and onError events. [...]
inherited
distinct([bool equals(T previous, T next) ]) Observable<T>
WARNING: More commonly known as distinctUntilChanged in other Rx implementations. Creates an Observable where data events are skipped if they are equal to the previous data event. [...]
inherited
distinctUnique({bool equals(T e1, T e2), int hashCode(T e) }) Observable<T>
WARNING: More commonly known as distinct in other Rx implementations. Creates an Observable where data events are skipped if they have already been emitted before. [...]
inherited
doOnCancel(void onCancel()) Observable<T>
Invokes the given callback function when the stream subscription is cancelled. Often called doOnUnsubscribe or doOnDispose in other implementations. [...]
inherited
doOnData(void onData(T event)) Observable<T>
Invokes the given callback function when the stream emits an item. In other implementations, this is called doOnNext. [...]
inherited
doOnDone(void onDone()) Observable<T>
Invokes the given callback function when the stream finishes emitting items. In other implementations, this is called doOnComplete(d). [...]
inherited
doOnEach(void onEach(Notification<T> notification)) Observable<T>
Invokes the given callback function when the stream emits data, emits an error, or emits done. The callback receives a Notification object. [...]
inherited
doOnError(Function onError) Observable<T>
Invokes the given callback function when the stream emits an error. [...]
inherited
doOnListen(void onListen()) Observable<T>
Invokes the given callback function when the stream is first listened to. [...]
inherited
doOnPause(void onPause(Future resumeSignal)) Observable<T>
Invokes the given callback function when the stream subscription is paused. [...]
inherited
doOnResume(void onResume()) Observable<T>
Invokes the given callback function when the stream subscription resumes receiving items. [...]
inherited
drain<S>([S futureValue ]) AsObservableFuture<S>
Discards all data on this stream, but signals when it is done or an error occurred. [...]
inherited
elementAt(int index) AsObservableFuture<T>
Returns the value of the indexth data event of this stream. [...]
inherited
every(bool test(T element)) AsObservableFuture<bool>
Checks whether test accepts all elements provided by this stream. [...]
inherited
exhaustMap<S>(Stream<S> mapper(T value)) Observable<S>
Converts items from the source stream into a new Stream using a given mapper. It ignores all items from the source stream until the new stream completes. [...]
inherited
expand<S>(Iterable<S> convert(T value)) Observable<S>
Creates an Observable from this stream that converts each element into zero or more events. [...]
inherited
firstWhere(bool test(T element), { dynamic defaultValue(), T orElse() }) AsObservableFuture<T>
Finds the first element of this stream matching test. [...]
inherited
flatMap<S>(Stream<S> mapper(T value)) Observable<S>
Converts each emitted item into a new Stream using the given mapper function. The newly created Stream will be be listened to and begin emitting items downstream. [...]
inherited
flatMapIterable<S>(Stream<Iterable<S>> mapper(T value)) Observable<S>
Converts each item into a new Stream. The Stream must return an Iterable. Then, each item from the Iterable will be emitted one by one. [...]
inherited
fold<S>(S initialValue, S combine(S previous, T element)) AsObservableFuture<S>
Combines a sequence of values by repeatedly applying combine. [...]
inherited
forEach(void action(T element)) AsObservableFuture
Executes action on each element of this stream. [...]
inherited
handleError(Function onError, { bool test(dynamic error) }) Observable<T>
Creates a wrapper Stream that intercepts some errors from this stream. [...]
inherited
ignoreElements() Observable<T>
Creates an Observable where all emitted items are ignored, only the error / completed notifications are passed [...]
inherited
interval(Duration duration) Observable<T>
Creates an Observable that emits each item in the Stream after a given duration. [...]
inherited
join([String separator = "" ]) AsObservableFuture<String>
Combines the string representation of elements into a single string. [...]
inherited
lastWhere(bool test(T element), { Object defaultValue(), T orElse() }) AsObservableFuture<T>
Finds the last element in this stream matching test. [...]
inherited
listen(void onData(T event), { Function onError, void onDone(), bool cancelOnError }) StreamSubscription<T>
Adds a subscription to this stream. Returns a StreamSubscription which handles events from the stream using the provided onData, onError and onDone handlers. [...]
inherited
map<S>(S convert(T event)) Observable<S>
Maps values from a source sequence through a function and emits the returned values. [...]
inherited
mapTo<S>(S value) Observable<S>
Emits the given constant value on the output Observable every time the source Observable emits a value. [...]
inherited
materialize() Observable<Notification<T>>
Converts the onData, on Done, and onError events into Notification objects that are passed into the downstream onData listener. [...]
inherited
max([Comparator<T> comparator ]) AsObservableFuture<T>
Converts a Stream into a Future that completes with the largest item emitted by the Stream. [...]
inherited
mergeWith(Iterable<Stream<T>> streams) Observable<T>
Combines the items emitted by multiple streams into a single stream of items. The items are emitted in the order they are emitted by their sources. [...]
inherited
min([Comparator<T> comparator ]) AsObservableFuture<T>
Converts a Stream into a Future that completes with the smallest item emitted by the Stream. [...]
inherited
noSuchMethod(Invocation invocation) → dynamic
Invoked when a non-existent method or property is accessed. [...]
inherited
ofType<S>(TypeToken<S> typeToken) Observable<S>
Filters a sequence so that only events of a given type pass [...]
inherited
onErrorResume(Stream<T> recoveryFn(dynamic error)) Observable<T>
Intercepts error events and switches to a recovery stream created by the provided recoveryFn. [...]
inherited
onErrorResumeNext(Stream<T> recoveryStream) Observable<T>
Intercepts error events and switches to the given recovery stream in that case [...]
inherited
onErrorReturn(T returnValue) Observable<T>
instructs an Observable to emit a particular item when it encounters an error, and then terminate normally [...]
inherited
onErrorReturnWith(T returnFn(dynamic error)) Observable<T>
instructs an Observable to emit a particular item created by the returnFn when it encounters an error, and then terminate normally. [...]
inherited
pairwise() Observable<List<T>>
Triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. [...]
inherited
pipe(StreamConsumer<T> streamConsumer) AsObservableFuture
Pipes the events of this stream into streamConsumer. [...]
inherited
publish() ConnectableObservable<T>
Convert the current Observable into a ConnectableObservable that can be listened to multiple times. It will not begin emitting items from the original Observable until the connect method is invoked. [...]
inherited
publishReplay({int maxSize }) ReplayConnectableObservable<T>
Convert the current Observable into a ReplayConnectableObservable that can be listened to multiple times. It will not begin emitting items from the original Observable until the connect method is invoked. [...]
inherited
publishValue({T seedValue }) ValueConnectableObservable<T>
Convert the current Observable into a ValueConnectableObservable that can be listened to multiple times. It will not begin emitting items from the original Observable until the connect method is invoked. [...]
inherited
reduce(T combine(T previous, T element)) AsObservableFuture<T>
Combines a sequence of values by repeatedly applying combine. [...]
inherited
sample(Stream sampleStream) Observable<T>
Returns an Observable that, when the specified sample stream emits an item or completes, emits the most recently emitted item (if any) emitted by the source stream since the previous emission from the sample stream. [...]
inherited
scan<S>(S accumulator(S accumulated, T value, int index), [ S seed ]) Observable<S>
Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. [...]
inherited
share() Observable<T>
Convert the current Observable into a new Observable that can be listened to multiple times. It will automatically begin emitting items when first listened to, and shut down when no listeners remain. [...]
inherited
shareReplay({int maxSize }) ReplayObservable<T>
Convert the current Observable into a new ReplayObservable that can be listened to multiple times. It will automatically begin emitting items when first listened to, and shut down when no listeners remain. [...]
inherited
shareValue({T seedValue }) ValueObservable<T>
Convert the current Observable into a new ValueObservable that can be listened to multiple times. It will automatically begin emitting items when first listened to, and shut down when no listeners remain. [...]
inherited
singleWhere(bool test(T element), { T orElse() }) AsObservableFuture<T>
Finds the single element in this stream matching test. [...]
inherited
skip(int count) Observable<T>
Skips the first count data events from this stream. [...]
inherited
skipUntil<S>(Stream<S> otherStream) Observable<T>
Starts emitting items only after the given stream emits an item. [...]
inherited
skipWhile(bool test(T element)) Observable<T>
Skip data events from this stream while they are matched by test. [...]
inherited
startWith(T startValue) Observable<T>
Prepends a value to the source Observable. [...]
inherited
startWithMany(List<T> startValues) Observable<T>
Prepends a sequence of values to the source Observable. [...]
inherited
switchIfEmpty(Stream<T> fallbackStream) Observable<T>
When the original observable emits no items, this operator subscribes to the given fallback stream and emits items from that observable instead. [...]
inherited
switchMap<S>(Stream<S> mapper(T value)) Observable<S>
Converts each emitted item into a new Stream using the given mapper function. The newly created Stream will be be listened to and begin emitting items, and any previously created Stream will stop emitting. [...]
inherited
take(int count) Observable<T>
Provides at most the first n values of this stream. Forwards the first n data events of this stream, and all error events, to the returned stream, and ends with a done event. [...]
inherited
takeUntil<S>(Stream<S> otherStream) Observable<T>
Returns the values from the source observable sequence until the other observable sequence produces a value. [...]
inherited
takeWhile(bool test(T element)) Observable<T>
Forwards data events while test is successful. [...]
inherited
throttle(Duration duration) Observable<T>
Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration. [...]
inherited
timeInterval() Observable<TimeInterval<T>>
Records the time interval between consecutive values in an observable sequence. [...]
inherited
timeout(Duration timeLimit, { void onTimeout(EventSink<T> sink) }) Observable<T>
The Timeout operator allows you to abort an Observable with an onError termination if that Observable fails to emit any items during a specified duration. You may optionally provide a callback function to execute on timeout.
inherited
timestamp() Observable<Timestamped<T>>
Wraps each item emitted by the source Observable in a Timestamped object that includes the emitted item and the time when the item was emitted. [...]
inherited
toList() AsObservableFuture<List<T>>
Collects all elements of this stream in a List. [...]
inherited
toSet() AsObservableFuture<Set<T>>
Collects the data of this stream in a Set. [...]
inherited
toString() String
Returns a string representation of this object.
inherited
transform<S>(StreamTransformer<T, S> streamTransformer) Observable<S>
Applies streamTransformer to this stream. [...]
inherited
where(bool test(T event)) Observable<T>
Filters the elements of an observable sequence based on the test.
inherited
window(SamplerBuilder<T, Stream<T>> sampler) Observable<Stream<T>>
Creates an Observable where each item is a Stream containing the items from the source sequence, batched by the sampler. [...]
inherited
windowCount(int count, [ int startBufferEvery = 0 ]) Observable<Stream<T>>
Buffers a number of values from the source Observable by count then emits the values inside the buffer as a new Stream, and starts a new buffer each startBufferEvery values. If startBufferEvery is not provided or is null, then new buffers are started immediately at the start of the source and when each buffer closes and is emitted. [...]
inherited
windowFuture<O>(Future<O> onFutureHandler()) Observable<Stream<T>>
Creates an Observable where each item is a Stream containing the items from the source sequence, batched whenever onFutureHandler completes. [...]
inherited
windowTest(bool onTestHandler(T event)) Observable<Stream<T>>
Creates an Observable where each item is a Stream containing the items from the source sequence, batched whenever onTestHandler passes. [...]
inherited
windowTime(Duration duration) Observable<Stream<T>>
Creates an Observable where each item is a Stream containing the items from the source sequence, sampled on a time frame with duration. [...]
inherited
windowWhen<O>(Stream<O> other) Observable<Stream<T>>
Creates an Observable where each item is a Stream containing the items from the source sequence, sampled on onStream. [...]
inherited
withLatestFrom<S, R>(Stream<S> latestFromStream, R fn(T t, S s)) Observable<R>
Creates an Observable that emits when the source stream emits, combining the latest values from the two streams using the provided function. [...]
inherited
zipWith<S, R>(Stream<S> other, R zipper(T t, S s)) Observable<R>
Returns an Observable that combines the current stream together with another stream using a given zipper function. [...]
inherited

Operators

operator ==(dynamic other) bool
The equality operator. [...]
inherited