RxGroovy implements the groupBy
operator. The Observable it returns emits items of a particular subclass of Observable — the GroupedObservable
. Objects that implement the GroupedObservable
interface have an additional method — getkey
— by which you can retrieve the key by which items were designated for this particular GroupedObservable
.
The following sample code uses groupBy
to transform a list of numbers into two lists, grouped by whether or not the numbers are even:
Sample Code
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
def groupFunc = { return(0 == (it % 2)); };
numbers.groupBy(groupFunc).flatMap({ it.reduce([it.getKey()], {a, b -> a << b}) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[false, 1, 3, 5, 7, 9]
[true, 2, 4, 6, 8]
Sequence complete
Another version of groupBy
allows you to pass in a transformative function that changes the elements before they are emitted by the resulting GroupedObservable
s.
Note that when groupBy
splits up the source Observable into an Observable that emits GroupedObservable
s, each of these GroupedObservable
s begins to buffer the items that it will emit upon subscription. For this reason, if you ignore any of these GroupedObservable
s (you neither subscribe to it or apply an operator to it that subscribes to it), this buffer will present a potential memory leak. For this reason, rather than ignoring a GroupedObservable
that you have no interest in observing, you should instead apply an operator like take(0)
to it as a way of signalling to it that it may discard its buffer.
If you unsubscribe from one of the GroupedObservable
s, or if an operator like take
that you apply to the GroupedObservable
unsubscribes from it, that GroupedObservable
will be terminated. If the source Observable later emits an item whose key matches the GroupedObservable
that was terminated in this way, groupBy
will create and emit a new GroupedObservable
to match the key. In other words, unsubscribing from a GroupedObservable
will not cause groupBy
to swallow items from its group. For example, see the following code:
Sample Code
Observable.range(1,5)
.groupBy({ 0 })
.flatMap({ this.take(1) })
.subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
In the above code, the source Observable emits the sequence { 1 2 3 4 5 }
. When it emits the first item in this sequence, the groupBy
operator creates and emits a GroupedObservable
with the key of 0
. The flatMap
operator applies the take(1)
operator to that GroupedObservable
, which gives it the item (1
) that it emits and that also unsubscribes from the GroupedObservable
, which is terminated. When the source Observable emits the second item in its sequence, the groupBy
operator creates and emits a second GroupedObservable
with the same key (0
) to replace the one that was terminated. flatMap
again applies take(1)
to this new GroupedObservable
to retrieve the new item to emit (2
) and to unsubscribe from and terminate the GroupedObservable
, and this process repeats for the remaining items in the source sequence.
groupBy
does not by default operate on any particular Scheduler.