W3cubDocs

/ReactiveX

Zip

combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
Open interactive diagram on rxmarbles.com

The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. It applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth. It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items.

See Also

Language-Specific Information

RxGroovy implements this operator as several variants of zip and also as zipWith, an instance function version of the operator.

zip

The last argument to zip is a function that accepts an item from each of the Observables being zipped and emits an item to be emitted in response by the Observable returned from zip. You can provide the Observables to be zipped together to zip either as between two and nine individual parameters, or as a single parameter: either an Iterable of Observables or an Observable that emits Observables (as in the illustration above).

Sample Code

odds  = Observable.from([1, 3, 5, 7, 9]);
evens = Observable.from([2, 4, 6]);

Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[1, 2]
[3, 4]
[5, 6]
Sequence complete

Note that in this example, the resulting Observable completes normally after emitting three items, which is the number of items emitted by the shorter of the two component Observbles (evens, which emits three items).

zipWith

The zipWith instance version of this operator always takes two parameters. The first parameter may be either a simple Observable, or an iterable (as in the illustration above).

zip and zipWith do not by default operate on any particular Scheduler.

RxJava implements this operator as several variants of zip and also as zipWith, an instance function version of the operator.

zip

The last argument to zip is a function that accepts an item from each of the Observables being zipped and emits an item to be emitted in response by the Observable returned from zip. You can provide the Observables to be zipped together to zip either as between two and nine individual parameters, or as a single parameter: either an Iterable of Observables or an Observable that emits Observables (as in the illustration above).

zipWith

The zipWith instance version of this operator always takes two parameters. The first parameter may be either a simple Observable, or an iterable (as in the illustration above).

zip and zipWith do not by default operate on any particular Scheduler.

RxJS implements this operator as zip and zipArray.

zip

zip accepts a variable number of Observables or Promises as parameters, followed by a function that accepts one item emitted by each of those Observables or resolved by those Promises as input and produces a single item to be emitted by the resulting Observable.

Sample Code

/* Using arguments */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    range,
    range.skip(1),
    range.skip(2),
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Next: 1:2:3
Next: 2:3:4
Completed
/* Using promises and Observables */
var range = Rx.Observable.range(0, 5);

var source = Observable.zip(
    RSVP.Promise.resolve(0),
    RSVP.Promise.resolve(1),
    Rx.Observable.return(2)
    function (s1, s2, s3) {
        return s1 + ':' + s2 + ':' + s3;
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
Next: 0:1:2
Completed
zipArray

zipArray accepts a variable number of Observables as parameters and returns an Observable that emits arrays, each one containing the nth item from each source Observable.

Sample Code

var range = Rx.Observable.range(0, 5);

var source = Rx.Observable.zipArray(
    range,
    range.skip(1), 
    range.skip(2)
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });
Next: [0,1,2]
Next: [1,2,3]
Next: [2,3,4]
Completed
forkJoin

RxJS also implements a similar operator, forkJoin. There are two varieties of this operator. The first collects the last element emitted by each of the source Observables into an array and emits this array as its own sole emitted item. You can either pass a list of Observables to forkJoin as individual parameters or as an array of Observables.

var source = Rx.Observable.forkJoin(
    Rx.Observable.return(42),
    Rx.Observable.range(0, 10),
    Rx.Observable.fromArray([1,2,3]),
    RSVP.Promise.resolve(56)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: [42, 9, 3, 56]
Completed
forkJoin

A second variant of forkJoin exists as a prototype function, and you call it on an instance of one source Observable, passing it another source Observable as a parameter. As a second parameter, you pass it a function that combines the final item emitted by the two source Observables into the sole item to be emitted by the resulting Observable.

var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.range(0, 3);

var source = source1.forkJoin(source2, function (s1, s2) {
    return s1 + s2;
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
Next: 44
Completed

forkJoin is found in the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.experimental.js (requires rx.js, rx.compat.js, rx.lite.js, or rx.lite.compat.js)

RxPHP implements this operator as zip.

Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index. If the result selector function is omitted, a list with the elements of the observable sequences at corresponding indexes will be yielded.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip.php

//Without a result selector
$range = \Rx\Observable::fromArray(range(0, 4));

$source = $range
    ->zip([
        $range->skip(1),
        $range->skip(2)
    ]);

$observer = $createStdoutObserver();

$subscription = $source
    ->subscribe(new CallbackObserver(
        function ($array) use ($observer) {
            $observer->onNext(json_encode($array));
        },
        [$observer, 'onError'],
        [$observer, 'onCompleted']
    ));
Next value: [0,1,2]
Next value: [1,2,3]
Next value: [2,3,4]
Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/zip/zip-result-selector.php

//With a result selector
$range = \Rx\Observable::fromArray(range(0, 4));

$source = $range
    ->zip([
        $range->skip(1),
        $range->skip(2)
    ], function ($s1, $s2, $s3) {
        return $s1 . ':' . $s2 . ':' . $s3;
    });

$observer = $createStdoutObserver();

$subscription = $source->subscribe($createStdoutObserver());
Next value: 0:1:2
Next value: 1:2:3
Next value: 2:3:4
Complete!

RxPHP also has an operator forkJoin.

Runs all observable sequences in parallel and collect their last elements.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/forkJoin/forkJoin.php

use Rx\Observable;

$obs1 = Observable::range(1, 4);
$obs2 = Observable::range(3, 5);
$obs3 = Observable::fromArray(['a', 'b', 'c']);

$observable = Observable::forkJoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) {
    return $v1 . $v2 . $v3;
});
$observable->subscribe($stdoutObserver);
Next value: 47c
Complete!

© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/zip.html