Rx chain initialization

Hello everyone, my name is Ivan, I'm an Android developer. Today I want to share my experience with RxJava2 and tell you how the chain initialization takes place. Why did I decide to bring this up at all? After talking with fellow developers, I realized that not everyone who uses this tool understands how it works. And then I decided to figure out how subscriptions are arranged in RxJava2 and in what sequence all work is initialized. I have not found a single article explaining this. In light of this, I went into the source code to see how everything works and sketched for myself a small cheat sheet that grew into this article.





In this article, I will not describe what it is Observable



, Observer



and all the other entities that are used in RxJava2. If you decide to read this article, then I assume that you are already familiar with this information. And if you are still not familiar with these concepts, then I recommend that you familiarize yourself with them before reading.





Here's how to get started:





Grock * RxJava





Exploring RxJava 2 for Android





Let's see how the simplest chain works:





Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
      
      



On top

First, I will briefly describe each step that we go through in this chain (steps start from top to bottom):





  • An object is created in the just statement ObservableFromArray



    .





  • An object is created in the map statement ObservableMap



    , which takes in the constructor a reference to the previously created object in the just statement.





  • filter ObservableFilter



    , map, just.





  • Observable



    Observable



    subscribe()



    ( ObservableFilter



    filter) Observer



    , .





  • ObservableFilter.subscribe()



    ObservableFilter.subscribeActual()



    , Observer



    , filter, FilterObserver



    . Observer



    Observer



    ObservableFilter.subscribe()



    .





  • ObservableMap.subscribe()



    ObservableMap.subscribeActual()



    Observer,



    map, MapObserver



    , FilterObserver



    .





  • ObservableFromArray.subscribe()



    ObservableFromArray.subscribeActual()



    , onSubscribe()



    ObservableFromArray.subscribeActual()



    Observer



    ’.





  • onSubscribe()



    Observer



    ’ .





  • ObservableFromArray



    onNext()



    Observer



    ’.





A visual representation of the above diagram.
.

, just()



null, fromArray(),



Observable



.





public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
   ObjectHelper.requireNonNull(item1, "item1 is null");
   ObjectHelper.requireNonNull(item2, "item2 is null");
   ObjectHelper.requireNonNull(item3, "item3 is null");
   ObjectHelper.requireNonNull(item4, "item4 is null");
   ObjectHelper.requireNonNull(item5, "item5 is null");

   return fromArray(item1, item2, item3, item4, item5);
}
      
      



fromArray()



, .





public static <T> Observable<T> fromArray(T... items) {
   ObjectHelper.requireNonNull(items, "items is null");
   if (items.length == 0) {
       return empty();
   }
   if (items.length == 1) {
       return just(items[0]);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
      
      



ObservableFromArray



, .





onAssembly()



, - Observable



, , .





public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
       return apply(f, source);
   }
   return source;
}
      
      



onAssembly()



Observable



- , :





RxJavaPlugins.setOnObservableAssembly(o -> {
	if (o instanceof ObservableFromArray) {
    	return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
	}
	return o;
});
 
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
      
      



The just created ObservableFromArray
ObservableFromArray

map()



. , . null, ObservableMap



.





public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
      
      



, ObservableMap



mapper, , this (source). this ObservableFromArray



. ObservableMap



AbstractObservableWithUpstream



, source.





AbstractObservableWithUpstream



, Observable



.





onAssembly()



Observable







Updated schema with generated ObservableMap
ObservableMap

filter()



. , , ObservableFilter



this ObservableMap



( ObservableFromArray



, ) .





public final Observable<T> filter(Predicate<? super T> predicate) {
   ObjectHelper.requireNonNull(predicate, "predicate is null");
   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
      
      



Updated schema with generated ObservableFilter
ObservableFilter

subscribe()



, . onNext()



. subscribe()



ObservableFilter



, Observable



.





public final Disposable subscribe(Consumer<? super T> onNext) {
   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
      
      



null, LambdaObserver



.





public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
       Action onComplete, Consumer<? super Disposable> onSubscribe) {
   ObjectHelper.requireNonNull(onNext, "onNext is null");
   ObjectHelper.requireNonNull(onError, "onError is null");
   ObjectHelper.requireNonNull(onComplete, "onComplete is null");
   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

   subscribe(ls);

   return ls;
}
      
      



, .





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



subscribeActual()



LambdaObserver



. subscribeActual()



ObservableFilter



. .





public void subscribeActual(Observer<? super T> observer) {
   source.subscribe(new FilterObserver<T>(observer, predicate));
}
      
      



FilterObserver



, LambdaObserver



, ObservableFilter



.





FilterObserver



BasicFuseableObserver



, onSubscribe()



. BasicFuseableObserver



, Observer



’. , 6 , FilterObserver



MapObserver



. BasicFuseableObserver.onSubscribe()



onSubscribe()



Observer



’, . :





public final void onSubscribe(Disposable d) {
   if (DisposableHelper.validate(this.upstream, d)) {
       this.upstream = d;
       if (d instanceof QueueDisposable) {
           this.qd = (QueueDisposable<T>)d;
       }
       if (beforeDownstream()) {

           downstream.onSubscribe(this);

           afterDownstream();
       }
   }
}
      
      



, ObservableFilter



FilterObserver



, source.subscribe()



. , source ObservableMap



, . ObservableMap



subscribe()



.





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



, subscribe()



subscribeActual()



, ObservableMap



. subscribeActual()



MapObserver



FilterObserver



mapper



’. 





public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}
      
      



public void subscribeActual(Observer<? super T> observer) {
   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

   observer.onSubscribe(d);

   if (d.fusionMode) {
       return;
   }
   d.run();
}
      
      



Observer



BasicFuseableObserver



, onSubscribe()



, Observer



, onSubscribe()



.





subscribeActual()



run()



, Observer



.





void run() {
   T[] a = array;
   int n = a.length;

   for (int i = 0; i < n && !isDisposed(); i++) {
       T value = a[i];
       if (value == null) {
           downstream.onError(new NullPointerException("The element at index " + i + " is null"));
           return;
       }
       downstream.onNext(value);
   }
   if (!isDisposed()) {
       downstream.onComplete();
   }
}
      
      



onNext()



Observer



’, onComplete()



onError()



, .





Visual representation of the creation and subscription process





Observable



callback’ Observer



, .





onSubscribe()



, doOnSubscribe()



.





3 :









  • Observable







  • Observer







Therefore, when using operators, it should be borne in mind that each operator allocates memory for several objects and you should not add operators to the chain, just because it is "possible".





RxJava is a powerful tool, but you need to understand how it works and what to use it for. If you just need to execute a network request in a background thread and then execute the result on the main thread, then it's like “shooting sparrows with a cannon”, you can get caught, but the consequences can be serious.








All Articles