Buffer periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time The Buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items. There are a number of variants in the various language-specific implementations of Buffer that differ in how they choose which items go in which buffers.
Note that if the source Observable issues an onError
notification, Buffer will pass on this notification immediately without first emitting the buffer it is in the process of assembling, even if that buffer contains items that were emitted by the source Observable before it issued the error notification.
The Window operator is similar to Buffer but collects items into separate Observables rather than into data structures before reemitting them.
See Also Language-Specific Information
RxCpp buffer pairwise
RxCpp implements two variants of Buffer :
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of vector
s, each of which contains at most count
items from the source Observable (the final emitted vector
may have fewer than count
items).
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as vector
s. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
RxGroovy buffer
In RxGroovy there are several variants of Buffer :
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of List
s, each of which contains at most count
items from the source Observable (the final emitted List
may have fewer than count
items).
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as List
s. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a List
, and it also calls bufferClosingSelector
to generate a second Observable. When this second Observable emits an TClosing
object, buffer
emits the current List
and repeats this process: beginning a new List
and calling bufferClosingSelector
to create a new Observable to monitor. It will do this until the source Observable terminates.
buffer(boundary
[, initialCapacity
])
buffer(boundary)
monitors an Observable, boundary
. Each time that Observable emits an item, it creates a new List
to begin collecting items emitted by the source Observable and emits the previous List
.
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable, bufferOpenings
, that emits BufferOpening
objects. Each time it observes such an emitted item, it creates a new List
to begin collecting items emitted by the source Observable and it passes the bufferOpenings
Observable into the closingSelector
function. That function returns an Observable. buffer
monitors that Observable and when it detects an emitted item from it, it closes the List
and emits it as its own emission.
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
emits a new List
of items periodically, every timespan
amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
Scheduler.
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
emits a new List
of items for every count
items emitted by the source Observable, or, if timespan
has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is fewer than count
. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
creates a new List
of items every timeshift
period of time, and fills this bundle with every item emitted by the source Observable from that time until timespan
time has passed since the bundle’s creation, before emitting this List
as its own emission. If timespan
is longer than timeshift
, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
You can use the Buffer operator to implement backpressure (that is, to cope with an Observable that may produce items too quickly for its observer to consume).
Buffer can reduce a sequence of many items to a sequence of fewer buffers-of-items, making them more manageable. You could, for example, close and emit a buffer of items from a bursty Observable periodically, at a regular interval of time.
Sample Code Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS); Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the Debounce operator to emit a buffer closing indicator to the buffer operator.
Sample Code // we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
RxJava 1․x buffer
In RxJava there are several variants of Buffer :
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of List
s, each of which contains at most count
items from the source Observable (the final emitted List
may have fewer than count
items).
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as List
s. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a List
, and it also calls bufferClosingSelector
to generate a second Observable. When this second Observable emits an TClosing
object, buffer
emits the current List
and repeats this process: beginning a new List
and calling bufferClosingSelector
to create a new Observable to monitor. It will do this until the source Observable terminates.
buffer(boundary)
buffer(boundary)
monitors an Observable, boundary
. Each time that Observable emits an item, it creates a new List
to begin collecting items emitted by the source Observable and emits the previous List
.
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable, bufferOpenings
, that emits BufferOpening
objects. Each time it observes such an emitted item, it creates a new List
to begin collecting items emitted by the source Observable and it passes the bufferOpenings
Observable into the closingSelector
function. That function returns an Observable. buffer
monitors that Observable and when it detects an emitted item from it, it closes the List
and emits it as its own emission.
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
emits a new List
of items periodically, every timespan
amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
emits a new List
of items for every count
items emitted by the source Observable, or, if timespan
has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is fewer than count
. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
creates a new List
of items every timeshift
period of time, and fills this bundle with every item emitted by the source Observable from that time until timespan
time has passed since the bundle’s creation, before emitting this List
as its own emission. If timespan
is longer than timeshift
, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
You can use the Buffer operator to implement backpressure (that is, to cope with an Observable that may produce items too quickly for its observer to consume).
Buffer can reduce a sequence of many items to a sequence of fewer buffers-of-items, making them more manageable. You could, for example, close and emit a buffer of items from a bursty Observable periodically, at a regular interval of time.
Sample Code Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS); Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the Debounce operator to emit a buffer closing indicator to the buffer operator.
Sample Code // we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced); See Also
RxJS buffer bufferWithCount bufferWithTime bufferWithTimeOrCount
RxJS has four Buffer operators — buffer
, bufferWithCount
, bufferWithTime
, and bufferWithTimeOrCount
— each of which has variants that have different ways of governing which source Observable items are emitted as part of which buffers.
buffer(bufferBoundaries)
buffer(bufferBoundaries)
monitors an Observable, bufferBoundaries
. Each time that Observable emits an item, it creates a new collection to begin collecting items emitted by the source Observable and emits the previous collection.
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a collection, and it also calls bufferClosingSelector
to generate a second Observable. When this second Observable emits an item, buffer
emits the current collection and repeats this process: beginning a new collection and calling bufferClosingSelector
to create a new Observable to monitor. It will do this until the source Observable terminates.
buffer(bufferOpenings,bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable, bufferOpenings
, that emits BufferOpening
objects. Each time it observes such an emitted item, it creates a new collection to begin collecting items emitted by the source Observable and it passes the bufferOpenings
Observable into the bufferClosingSelector
function. That function returns an Observable. buffer
monitors that Observable and when it detects an emitted item from it, it emits the current collection and begins a new one.
buffer
is found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.coincidence.js
buffer
requires one of the following distributions:
rx.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
bufferWithCount(count)
bufferWithCount(count)
emits non-overlapping buffers, each of which contains at most count
items from the source Observable (the final emitted buffer may contain fewer than count
items).
bufferWithCount(count, skip)
bufferWithCount(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and a new one for every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones, emitting each buffer when it is complete. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
bufferWithCount
is found in each of the following distributions:
rx.js
rx.compat.js
rx.all.js
rx.all.compat.js
rx.lite.extras.js
bufferWithTime(timeSpan)
bufferWithTime(timeSpan)
emits a new collection of items periodically, every timeSpan
milliseconds, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
bufferWithTime(timeSpan, timeShift)
bufferWithTime(timeSpan, timeShift)
creates a new collection of items every timeShift
milliseconds, and fills this bundle with every item emitted by the source Observable from that time until timeSpan
milliseconds has passed since the collection’s creation, before emitting this collection as its own emission. If timeSpan
is longer than timeShift
, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
bufferWithTimeOrCount(timeSpan, count)
bufferWithTimeOrCount(timeSpan, count)
emits a new collection of items for every count
items emitted by the source Observable, or, if timeSpan
milliseconds have elapsed since its last collection emission, it emits a collection of however many items the source Observable has emitted in that span, even if this is fewer than count
. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
bufferWithTime
and bufferWithTimeOrCount
are found in each of the following distributions:
rx.all.js
rx.all.compat.js
rx.time.js
bufferWithTime
and bufferWithTimeOrCount
require one of the following distributions:
rx.time.js
requires rx.js
or rx.compat.js
otherwise: rx.lite.js
or rx.lite.compat.js
RxKotlin buffer
In RxKotlin there are several variants of Buffer :
buffer(count)
buffer(count)
emits non-overlapping buffers in the form of List
s, each of which contains at most count
items from the source Observable (the final emitted List
may have fewer than count
items).
buffer(count, skip)
buffer(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as List
s. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
buffer(bufferClosingSelector)
When it subscribes to the source Observable, buffer(bufferClosingSelector)
begins to collect its emissions into a List
, and it also calls bufferClosingSelector
to generate a second Observable. When this second Observable emits an TClosing
object, buffer
emits the current List
and repeats this process: beginning a new List
and calling bufferClosingSelector
to create a new Observable to monitor. It will do this until the source Observable terminates.
buffer(boundary)
buffer(boundary)
monitors an Observable, boundary
. Each time that Observable emits an item, it creates a new List
to begin collecting items emitted by the source Observable and emits the previous List
.
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable, bufferOpenings
, that emits BufferOpening
objects. Each time it observes such an emitted item, it creates a new List
to begin collecting items emitted by the source Observable and it passes the bufferOpenings
Observable into the closingSelector
function. That function returns an Observable. buffer
monitors that Observable and when it detects an emitted item from it, it closes the List
and emits it as its own emission.
buffer(timespan, unit
[, scheduler
])
buffer(timespan, unit)
emits a new List
of items periodically, every timespan
amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(timespan, unit, count
[, scheduler
])
buffer(timespan, unit, count)
emits a new List
of items for every count
items emitted by the source Observable, or, if timespan
has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is fewer than count
. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
buffer(timespan, timeshift, unit
[, scheduler
])
buffer(timespan, timeshift, unit)
creates a new List
of items every timeshift
period of time, and fills this bundle with every item emitted by the source Observable from that time until timespan
time has passed since the bundle’s creation, before emitting this List
as its own emission. If timespan
is longer than timeshift
, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan; by default this variant uses the computation
scheduler.
RxNET Buffer
In Rx.NET there are several variants of Buffer . For each variety you can either pass in the source Observable as the first parameter, or you can call it as an instance method of the source Observable (in which case you can omit that parameter):
Buffer(count)
Buffer(count)
emits non-overlapping buffers in the form of IList
s, each of which contains at most count
items from the source Observable (the final emitted IList
may have fewer than count
items).
Buffer(count, skip)
Buffer(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as IList
s. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
Buffer(bufferClosingSelector)
When it subscribes to the source Observable, Buffer(bufferClosingSelector)
begins to collect its emissions into an IList
, and it also calls bufferClosingSelector
to generate a second Observable. When this second Observable emits an TBufferClosing
object, Buffer
emits the current IList
and repeats this process: beginning a new IList
and calling bufferClosingSelector
to create a new Observable to monitor. It will do this until the source Observable terminates.
Buffer(bufferOpenings,bufferClosingSelector)
Buffer(bufferOpenings, bufferClosingSelector)
monitors an Observable, BufferOpenings
, that emits TBufferOpening
objects. Each time it observes such an emitted item, it creates a new IList
to begin collecting items emitted by the source Observable and it passes the TBufferOpening
object into the bufferClosingSelector
function. That function returns an Observable. Buffer
monitors that Observable and when it detects an emitted item from it, it closes the IList
and emits it as its own emission.
Buffer(timeSpan)
Buffer(timeSpan)
emits a new IList
of items periodically, every timeSpan
amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first list, since the subscription to the source Observable. There is also a version of this variant of the operator that takes an IScheduler
as a parameter and uses it to govern the timespan.
Buffer(timeSpan, count)
Buffer(timeSpan, count)
emits a new IList
of items for every count
items emitted by the source Observable, or, if timeSpan
has elapsed since its last list emission, it emits a list of however many items the source Observable has emitted in that span, even if this is fewer than count
. There is also a version of this variant of the operator that takes an IScheduler
as a parameter and uses it to govern the timespan.
Buffer(timeSpan, timeShift)
Buffer(timeSpan, timeShift)
creates a new IList
of items every timeShift
period of time, and fills this list with every item emitted by the source Observable from that time until timeSpan
time has passed since the list’s creation, before emitting this IList
as its own emission. If timeSpan
is longer than timeShift
, the emitted lists will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes an IScheduler
as a parameter and uses it to govern the timespan.
RxPHP bufferWithCount
RxPHP implements this operator as bufferWithCount
.
Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.
Sample Code //from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCount.php
$source = Rx\Observable::range(1, 6)
->bufferWithCount(2)
->subscribe($stdoutObserver); Next value: [1,2]
Next value: [3,4]
Next value: [5,6]
Complete! //from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCountAndSkip.php
$source = Rx\Observable::range(1, 6)
->bufferWithCount(2, 1)
->subscribe($stdoutObserver); Next value: [1,2]
Next value: [2,3]
Next value: [3,4]
Next value: [4,5]
Next value: [5,6]
Next value: [6]
Complete!
RxPY buffer buffer_with_count buffer_with_time buffer_with_time_or_count pairwise
RxPY has several Buffer variants: buffer
, buffer_with_count
, buffer_with_time
, and buffer_with_time_or_count
. For each of these variants there are optional parameters that change the behavior of the operator. As always in RxPY, when an operator may take more than one optional parameter, be sure to name the parameter in the parameter list when you call the operator so as to avoid ambiguity.
buffer(buffer_openings)
buffer(buffer_openings=boundaryObservable)
monitors an Observable, buffer_openings
. Each time that Observable emits an item, it creates a new array to begin collecting items emitted by the source Observable and emits the previous array.
buffer(closing_selector)
buffer(closing_selector=closingSelector)
begins collecting items emitted by the source Observable immediately upon subscription, and also calls the closing_selector
function to generate a second Observable. It monitors this new Observable and, when it completes or emits an item, it emits the current array, begins a new array to collect items from the source Observable, and calls closing_selector
again to generate a new Observable to monitor in order to determine when to emit the new array. It repeats this process until the source Observable terminates, whereupon it emits the final array.
buffer(closing_selector=openingSelector, buffer_closing_selector=closingSelector)
begins by calling closing_selector
to get an Observable. It monitors this Observable, and, whenever it emits an item, buffer
creates a new array, begins to collect items subsequently emitted by the source Observable into this array, and calls buffer_closing_selector
to get a new Observable to govern the closing of that array. When this new Observable emits an item or terminates, buffer
closes and emits the array that the Observable governs.
buffer_with_count(count)
buffer_with_count(count)
emits non-overlapping buffers in the form of arrays, each of which contains at most count
items from the source Observable (the final emitted array may have fewer than count
items).
buffer_with_count(count, skip)
buffer_with_count(count, skip=skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as arrays. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
buffer_with_time(timespan)
buffer_with_time(timespan)
emits a new array of items periodically, every timespan
milliseconds, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a scheduler
parameter and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
buffer_with_time(timespan, timeshift)
buffer(timespan, timeshift=timeshift)
creates a new array of items every timeshift
milliseconds, and fills this array with every item emitted by the source Observable from that time until timespan
milliseconds have passed since the array’s creation, before emitting this array as its own emission. If timespan
is longer than timeshift
, the emitted arrays will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a scheduler
parameter and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
buffer_with_time_or_count(timespan, count)
buffer_with_time_or_count(timespan, count)
emits a new array of items for every count
items emitted by the source Observable, or, if timespan
milliseconds have elapsed since its last bundle emission, it emits an array of however many items the source Observable has emitted in that span, even if this is fewer than count
. There is also a version of this variant of the operator that takes a scheduler
parameter and uses it to govern the timespan; by default this variant uses the timeout
scheduler.
Rxrb buffer_with_count buffer_with_time
Rx.rb has three variants of the Buffer operator:
buffer_with_count(count)
buffer_with_count(count)
emits non-overlapping buffers in the form of arrays, each of which contains at most count
items from the source Observable (the final emitted array may have fewer than count
items).
buffer_with_count(count,skip)
buffer_with_count(count, skip=skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as arrays. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
buffer_with_time(timespan)
buffer_with_time(timespan)
emits a new array of items periodically, every timespan
milliseconds, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable.
RxScala slidingBuffer tumblingBuffer
RxScala has two varieties of Buffer — slidingBuffer
and tumblingBuffer
— each of which has variants with different ways of assembling the buffers they emit:
slidingBuffer(count, skip)
slidingBuffer(count, skip)
creates a new buffer starting with the first emitted item from the source Observable, and every skip
items thereafter, and fills each buffer with count
items: the initial item and count-1
subsequent ones. It emits these buffers as Seq
s. Depending on the values of count
and skip
these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).
slidingBuffer(timespan, timeshift)
slidingBuffer(timespan, timeshift)
creates a new Seq
of items every timeshift
(a Duration
), and fills this buffer with every item emitted by the source Observable from that time until timespan
(also a Duration
) has passed since the buffer’s creation, before emitting this Seq
as its own emission. If timespan
is longer than timeshift
, the emitted arrays will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler
as a parameter and uses it to govern the timespan.
slidingBuffer(openings, closings)
slidingBuffer(openings,closings)
monitors the openings
Observable, and, whenever it emits an Opening
item, slidingBuffer
creates a new Seq
, begins to collect items subsequently emitted by the source Observable into this buffer, and calls closings
to get a new Observable to govern the closing of that buffer. When this new Observable emits an item or terminates, slidingBuffer
closes and emits the Seq
that the Observable governs.
tumblingBuffer(count)
tumblingBuffer(count)
emits non-overlapping buffers in the form of Seq
s, each of which contains at most count
items from the source Observable (the final emitted buffer may have fewer than count
items).
tumblingBuffer(boundary)
tumblingBuffer(boundary)
monitors an Observable, boundary
. Each time that Observable emits an item, it creates a new Seq
to begin collecting items emitted by the source Observable and emits the previous Seq
. This variant of the operator has an optional second parameter, initialCapacity
with which you can indicate the expected size of these buffers so as to make memory allocation more efficient.
tumblingBuffer(timespan)
tumblingBuffer(timespan)
emits a new Seq
of items periodically, every timespan
(a Duration
), containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. This variant of the operator has an optional second parameter, scheduler
, with which you can set the Scheduler
that you want to govern the timespan calculation.
tumblingBuffer(timespan, count)
tumblingBuffer(timespan, count)
emits a new Seq
of items for every count
items emitted by the source Observable, or, if timespan
(a Duration
) has elapsed since its last bundle emission, it emits a Seq
containing however many items the source Observable emitted in that span, even if this is fewer than count
. This variant of the operator has an optional third parameter, scheduler
, with which you can set the Scheduler
that you want to govern the timespan calculation.