Sample emit the most recent items emitted by an Observable within periodic time intervals Open interactive diagram on rxmarbles.com The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling.
In some implementations, there is also a ThrottleFirst operator that is similar, but emits not the most-recently emitted item in the sample period, but the first item that was emitted during that period.
See Also Language-Specific Information
RxGroovy sample throttleFirst throttleLast
RxGroovy implements this operator as sample
and throttleLast
.
Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period.
One variant of sample
(or its alias, throttleLast
) samples at a periodic time interval that you choose by passing in a TimeUnit
and a quantity of such units as parameters to sample
.
The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.
Sample Code def numbers = Observable.range( 1, 1000000 );
numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
); 339707
547810
891282
Sequence complete This variant of sample
operates by default on the computation
Scheduler , but you can optionally pass in a Scheduler of your choosing as a third parameter.
There ia also a variant of sample
(that does not have a throttleLast
alias) that samples the source Observable each time a second Observable emits an item (or when it terminates). You pass in that second Observable as the parameter to sample
.
This variant of sample
does not by default operate on any particular Scheduler .
There is also a throttleFirst
operator, which differs from throttleLast
/sample
in that it emits the first item emitted by the source Observable in each sampling period rather than the most recently emitted item.
Sample Code Scheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // deliver
o.onNext(2); // skip
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // deliver
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // skip
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted(); throttleFirst
operates by default on the computation
Scheduler , but you can optionally pass in a Scheduler of your choosing as a third parameter.
RxJava 1․x sample throttleFirst throttleLast
RxJava implements this operator as sample
and throttleLast
.
Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from this operator will emit no item for that sampling period.
One variant of sample
(or its alias, throttleLast
) samples at a periodic time interval that you choose by passing in a TimeUnit
and a quantity of such units as parameters to sample
.
This variant of sample
operates by default on the computation
Scheduler , but you can optionally pass in a Scheduler of your choosing as a third parameter.
There ia also a variant of sample
(that does not have a throttleLast
alias) that samples the source Observable each time a second Observable emits an item (or when it terminates). You pass in that second Observable as the parameter to sample
.
This variant of sample
does not by default operate on any particular Scheduler .
There is also a throttleFirst
operator, which differs from throttleLast
/sample
in that it emits the first item emitted by the source Observable in each sampling period rather than the most recently emitted item.
throttleFirst
operates by default on the computation
Scheduler , but you can optionally pass in a Scheduler of your choosing as a third parameter.
RxJS sample throttleFirst
RxJS implements this operator with two variants of sample
.
The first variant accepts as its parameter a periodicity, defined as an integer number of milliseconds, and it samples the source Observable periodically at that frequency.
Sample Code var source = Rx.Observable.interval(1000)
.sample(5000)
.take(2);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 3
Next: 8
Completed The second variant accepts as its parameter an Observable, and it samples the source Observable whenever this second Observable emits an item.
Sample Code var source = Rx.Observable.interval(1000)
.sample(Rx.Observable.interval(5000))
.take(2);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 3
Next: 8
Completed There is also a throttleFirst
operator, which differs from sample
in that it emits the first item emitted by the source Observable in each sampling period rather than the most recently emitted item.
It does not have the variant that uses the emissions from a second Observable to regulate the sampling periodicity.
Sample Code var times = [
{ value: 0, time: 100 },
{ value: 1, time: 600 },
{ value: 2, time: 400 },
{ value: 3, time: 900 },
{ value: 4, time: 200 }
];
// Delay each item by time and project value;
var source = Rx.Observable.from(times)
.flatMap(function (item) {
return Rx.Observable
.of(item.value)
.delay(item.time);
})
.throttleFirst(300 /* ms */);
var subscription = source.subscribe(
function (x) { console.log('Next: %s', x); },
function (err) { console.log('Error: %s', err); },
function () { console.log('Completed'); }); Next: 0
Next: 2
Next: 3
Completed sample
and throttleFirst
operate by default on the timeout
Scheduler . They are found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.time.js
(requires rx.js
or rx.compat.js
) rx.lite.js
rx.lite.compat.js