convert various other objects and data types into Observables
decode from fromAction fromCallable fromFunc0 fromRunnable runAsync
In the separate RxJavaAsyncUtil
package, which is not included by default with RxGroovy, there is also a runAsync
function. Pass runAsync
an Action
and a Scheduler
, and it will return a StoppableObservable
that uses the specified Action
to generate items that it emits.
The Action
accepts an Observer
and a Subscription
. It uses the Subscription
to check for the isUnsubscribed
condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable
at any time by calling its unsubscribe
method (which will also unsubscribe the Subscription
you have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting the items, it is possible that some items may be lost in the interval between when you establish the StoppableObservable
with this method and when your Observer
is ready to receive items. If this is a problem, you can use the variant of runAsync
that also accepts a Subject
and pass a ReplaySubject
with which you can retrieve the otherwise-missing items.
decode from fromAction fromCallable fromFunc0 fromRunnable runAsync
In the separate RxJavaAsyncUtil
package, which is not included by default with RxJava, there is also a runAsync
function. Pass runAsync
an Action
and a Scheduler
, and it will return a StoppableObservable
that uses the specified Action
to generate items that it emits.
The Action
accepts an Observer
and a Subscription
. It uses the Subscription
to check for the isUnsubscribed
condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable
at any time by calling its unsubscribe
method (which will also unsubscribe the Subscription
you have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting the items, it is possible that some items may be lost in the interval between when you establish the StoppableObservable
with this method and when your Observer
is ready to receive items. If this is a problem, you can use the variant of runAsync
that also accepts a Subject
and pass a ReplaySubject
with which you can retrieve the otherwise-missing items.
from fromCallback fromEvent fromEventPattern fromNodeCallback fromPromise of ofArrayChanges ofObjectChanges ofWithScheduler pairs
There are several, specialized From variants in RxJS:
There is also a fromPromise
operator that converts a Promise into an Observable, converting its resolve
calls into onNext
notifications, and its reject
calls into onError
notifications.
fromPromise
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)rx.lite.js
rx.lite.compat.js
var promise = new RSVP.Promise(function (resolve, reject) { resolve(42); }); var source = Rx.Observable.fromPromise(promise); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Next: 42: Completed
var promise = new RSVP.Promise(function (resolve, reject) { reject(new Error('reason')); }); var source = Rx.Observable.fromPromise(promise); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Error: Error: reject
There is also an ofArrayChanges
operator that monitors an Array with the Array.observe
method, and returns an Observable that emits any changes that take place in the array. This operator is found only in the rx.all.js
distribution.
var arr = [1,2,3]; var source = Rx.Observable.ofArrayChanges(arr); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); }); arr.push(4)
Next: {type: "splice", object: Array[4], index: 3, removed: Array[0], addedCount: 1}
A similar operator is ofObjectChanges
. It returns an Observable that emits any changes made to a particular object, as reported by its Object.observe
method. It is also found only in the rx.all.js
distribution.
var obj = {x: 1}; var source = Rx.Observable.ofObjectChanges(obj); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); }); obj.x = 42;
Next: {type: "update", object: Object, name: "x", oldValue: 1}
There is also a pairs
operator. This operator accepts an Object, and returns an Observable that emits, as key/value pairs, the attributes of that object.
var obj = { foo: 42, bar: 56, baz: 78 }; var source = Rx.Observable.pairs(obj); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Next: ['foo', 42] Next: ['bar', 56] Next: ['baz', 78] Completed
pairs
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
fromArray fromIterator asObservable fromPromise
from toObservable
In Swift, this is implemented using the Observable.from
class method.
Each element of the array is produced as an emission. The difference between this method and Observable.just
is that the latter emits the whole array as one emission.
let numbers = [1,2,3,4,5] let source = Observable.from(numbers) source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/from.html