Start create an Observable that emits the return value of a function-like directive There are a number of ways that programming languages have for obtaining values as the result of calculations, with names like functions, futures, actions, callables, runnables, and so forth. The operators grouped here under the Start operator category make these things behave like Observables so that they can be chained with other Observables in an Observable cascade
See Also Language-Specific Information
RxGroovy asyncAction asyncFunc deferFuture forEachFuture fromAction fromCallable fromFunc0 fromRunnable start startFuture toAsync
The various RxGroovy implementations of Start are found in the optional rxjava-async
module.
The rxjava-async
module includes the start
operator, which accepts a function as its parameter, calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer.
Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the toAsync
, asyncAction
, and asyncFunc
operators. These accept a function or an Action as their parameter. In the case of a function, this variant of the operator calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer (just as the start
operator does).
In the case of Action, the process is similar, but there is no return value. In this case, the Observable created by this operator will emit a null
before terminating.
Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the startFuture
operator. You pass it a function that returns a Future
. startFuture
calls this function immediately to obtain the Future
, and calls the Future
’s get
method to try to obtain its value. It returns an Observable to which it will emit this value to any subsequent observers.
The rxjava-async
module also includes the deferFuture
operator. You pass it a function that returns a Future
that returns an Observable. deferFuture
returns an Observable, but does not call the function you provide until such time as an observer subscribes to the Observable it returns. When it does so, it immediately calls get
on the resulting Future
, and then mirrors the emissions from the Observable returned by the Future
as its own emissions.
In this way you can include a Future
that returns an Observable in a cascade of Observables as a peer to other Observables.
The rxjava-async
module also includes the fromAction
operator. It accepts an Action
as its parameter, and returns an Observable that emits the item you pass to fromAction
upon termination of the Action
The rxjava-async
module also includes the fromCallable
operator. It accepts a Callable
as its parameter, and returns an Observable that emits the result of this callable as its sole emission.
The rxjava-async
module also includes the fromRunnable
operator. It accepts a Runnable
as its parameter, and returns an Observable that emits the item you pass to fromRunnable
upon termination of the Runnable
The rxjava-async
module also includes the forEachFuture
operator. It is not really a variant of the Start operator, but something all its own. You pass forEachFuture
some subset of the typical observer methods (onNext
, onError
, and onCompleted
) and the Observable will call these methods in the usual way. But forEachFuture
itself returns a Future
that blocks on get
until the source Observable completes, then returns either the completion or error, depending on how the Observable completed.
You can use this if you need a function that blocks until the completion of an Observable.
The rxjava-async
module also includes the runAsync
operator. It is peculiar in that it creates a specialization of an Observable called a StoppableObservable
.
Pass runAsync
an Action
and a Scheduler
, and it will return a StoppableObservable
that uses the specified Action
to generate items that it emits. The Action
accepts an Observer
and a Subscription
. It uses the Subscription
to check for the unsubscribed
condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable
at any time by calling its unsubscribe
method (which will also unsubscribe the Subscription
you have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting the items (that is, it produces a hot Observable), it is possible that some items may be lost in the interval between when you establish the StoppableObservable
with this operator and when your Observer
is ready to receive items. If this is a problem, you can use the variant of runAsync
that also accepts a Subject
and pass a ReplaySubject
with which you can retrieve the otherwise-missing items.
In RxGroovy there is also a version of the From operator that converts a Future
into an Observable, and in this way resembles the Start operator.
RxJava 1․x asyncAction asyncFunc deferFuture forEachFuture fromAction fromCallable fromFunc0 fromRunnable start startFuture toAsync
The various RxJava implementations of Start are found in the optional rxjava-async
module.
The rxjava-async
module includes the start
operator, which accepts a function as its parameter, calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer.
Note that the function will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the toAsync
, asyncAction
, and asyncFunc
operators. These accept a function or an Action as their parameter. In the case of a function, this variant of the operator calls that function to retrieve a value, and then returns an Observable that will emit that value to each subsequent observer (just as the start
operator does).
In the case of Action, the process is similar, but there is no return value. In this case, the Observable created by this operator will emit a null
before terminating.
Note that the function or Action will only be executed once, even if more than one observer subscribes to the resulting Observable.
The rxjava-async
module also includes the startFuture
operator. You pass it a function that returns a Future
. startFuture
calls this function immediately to obtain the Future
, and calls the Future
’s get
method to try to obtain its value. It returns an Observable to which it will emit this value to any subsequent observers.
The rxjava-async
module also includes the deferFuture
operator. You pass it a function that returns a Future
that returns an Observable. deferFuture
returns an Observable, but does not call the function you provide until such time as an observer subscribes to the Observable it returns. When it does so, it immediately calls get
on the resulting Future
, and then mirrors the emissions from the Observable returned by the Future
as its own emissions.
In this way you can include a Future
that returns an Observable in a cascade of Observables as a peer to other Observables.
The rxjava-async
module also includes the fromAction
operator. It accepts an Action
as its parameter, and returns an Observable that emits the item you pass to fromAction
upon termination of the Action
The rxjava-async
module also includes the fromCallable
operator. It accepts a Callable
as its parameter, and returns an Observable that emits the result of this callable as its sole emission.
The rxjava-async
module also includes the fromRunnable
operator. It accepts a Runnable
as its parameter, and returns an Observable that emits the item you pass to fromRunnable
upon termination of the Runnable
The rxjava-async
module also includes the forEachFuture
operator. It is not really a variant of the Start operator, but something all its own. You pass forEachFuture
some subset of the typical observer methods (onNext
, onError
, and onCompleted
) and the Observable will call these methods in the usual way. But forEachFuture
itself returns a Future
that blocks on get
until the source Observable completes, then returns either the completion or error, depending on how the Observable completed.
You can use this if you need a function that blocks until the completion of an Observable.
The rxjava-async
module also includes the runAsync
operator. It is peculiar in that it creates a specialization of an Observable called a StoppableObservable
.
Pass runAsync
an Action
and a Scheduler
, and it will return a StoppableObservable
that uses the specified Action
to generate items that it emits. The Action
accepts an Observer
and a Subscription
. It uses the Subscription
to check for the unsubscribed
condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable
at any time by calling its unsubscribe
method (which will also unsubscribe the Subscription
you have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting the items (that is, it produces a hot Observable), it is possible that some items may be lost in the interval between when you establish the StoppableObservable
with this operator and when your Observer
is ready to receive items. If this is a problem, you can use the variant of runAsync
that also accepts a Subject
and pass a ReplaySubject
with which you can retrieve the otherwise-missing items.
In RxJava there is also a version of the From operator that converts a Future
into an Observable, and in this way resembles the Start operator.
RxJS start startAsync toAsync
RxJS implements the start
operator. It takes as its parameters a function whose return value will be the emission from the resulting Observable, and, optionally, any additional parameter to that function and a Scheduler on which to run the function.
Sample Code var context = { value: 42 };
var source = Rx.Observable.start(
function () {
return this.value;
},
context,
Rx.Scheduler.timeout
);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
}); start
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)
rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
) rx.lite.js
rx.lite.compat.js
RxJS also implements the startAsync
operator. It takes as its parameters an asynchronous function whose return value will be the emission from the resulting Observable.
You can convert a function into an asynchronous function with the toAsync
method. It takes a function, function parameter, and Scheduler as parameters, and returns an asynchronous function that will be invoked on the specified Scheduler. The last two parameters are optional; if you do not specify a Scheduler, the timeout
Scheduler will be used by default.
Sample Code var source = Rx.Observable.startAsync(function () {
return RSVP.Promise.resolve(42);
});
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
}); startAsync
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)
rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
) rx.lite.js
rx.lite.compat.js
toAsync
is found in the following distributions:
rx.async.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)
rx.async.compat.js
(requires rx.binding.js
and either rx.js
or rx.compat.js
)
RxPHP start
RxPHP implements this operator as start
.
Invokes the specified function asynchronously on the specified scheduler, surfacing the result through an observable sequence.
Sample Code //from https://github.com/ReactiveX/RxPHP/blob/master/demo/start/start.php
$source = Rx\Observable::start(function () {
return 42;
});
$source->subscribe($stdoutObserver);