You can implement your own Observable operators. This page shows you how.
If your operator is designed to originate an Observable, rather than to transform or react to a source Observable, use the create( )
method rather than trying to implement Observable
manually. Otherwise, follow the instructions below.
The following example shows how you can chain a custom operator (in this example: myOperator
) along with standard RxJava operators by using the lift( )
operator:
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
The following section will show how to form the scaffolding of your operator so that it will work correctly with lift( )
.
Define your operator as a public class that implements the Operator
interface, like so:
public class myOperator<T> implements Operator<T> { public myOperator( /* any necessary params here */ ) { /* any necessary initialization here */ } @Override public Subscriber<? super T> call(final Subscriber<? super T> s) { return new Subscriber<t>(s) { @Override public void onCompleted() { /* add your own onCompleted behavior here, or just pass the completed notification through: */ if(!s.isUnsubscribed()) { s.onCompleted(); } } @Override public void onError(Throwable t) { /* add your own onError behavior here, or just pass the error notification through: */ if(!s.isUnsubscribed()) { s.onError(t); } } @Override public void onNext(T item) { /* this example performs some sort of simple transformation on each incoming item and then passes it along */ if(!s.isUnsubscribed()) { transformedItem = myOperatorTransformOperation(item); s.onNext(transformedItem); } } }; } }
isUnsubscribed( )
status before it emits any item to (or sends any notification to) the Subscriber. Do not waste time generating items that no Subscriber is interested in seeing.onNext( )
method any number of times, but these calls must be non-overlapping.onCompleted( )
or onError( )
method, but not both, exactly once, and it may not subsequently call a Subscriber's onNext( )
method.serialize( )
operator to it to force the correct behavior.first( )
is defined as take(1)
.
single( )
ignoreElements( )
is defined as filter(alwaysFalse( ))
reduce(a)
is defined as scan(a)
.
last( )
onError( )
calls.
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/implement-operator.html