RxJava Source Code Reading (2) — Thread Scheduling
RxJava provides very convenient thread switching capabilities. This article will explore how thread scheduling in RxJava is implemented.
I highly recommend the article “RxJava Detailed Explanation for Android Developers”. Some of the images in this article are sourced from it, thanks to the original author!
Note: The version of RxJava source code studied in this article is 1.1.1, and the tool used for automatically generating class diagrams from code is the Eclipse plugin AmaterasUML.
SubscribeOn and ObserveOn
RxJava implements thread switching through the subscribeOn() and observeOn() methods in Observable.
subscribeOn(): Specifies the thread in which subscribe() occurs, that is, the thread where Observable.OnSubscribe is activated, also known as the event generation thread.
observeOn(): Specifies the thread in which the Subscriber runs, also known as the event consumption thread.
Example
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> s) {
s.onNext("Hello, world!");
s.onCompleted();
}
}).subscribeOn(Schedulers.io()) // Specifies that subscribe() occurs on the IO thread
.observeOn(AndroidSchedulers.mainThread()) // Specifies that the Subscriber's callback occurs on the main thread
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, s);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
});
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> s) {
s.onNext("Hello, world!");
s.onCompleted();
}
}).subscribeOn(Schedulers.io()) // Specifies that subscribe() occurs on the IO thread
.observeOn(AndroidSchedulers.mainThread()) // Specifies that the Subscriber's callback occurs on the main thread
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, s);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
});
SubscribeOn
Principle
The event generation thread is the thread where the call method of the initial Observable's OnSubscribe is invoked.
The subscribeOn method returns a new Observable. When subscription (subscribe) occurs, the call method of the new Observable's OnSubscribe will be invoked, and this method will call the call method of the initial Observable's OnSubscribe in a new thread (specified by the scheduler).
Principle Diagram
Process
The initial Observable calls the subscribeOn method, which then calls the create method with the parameter OperatorSubscribeOn.
public final Observable<T> subscribeOn(Scheduler scheduler) {
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
public final Observable<T> subscribeOn(Scheduler scheduler) {
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
OperatorSubscribeOn is an operator that implements the OnSubscribe interface. OperatorSubscribeOn holds references to the initial Observable and the scheduler, and implements the call method of OnSubscribe.
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
@Override
public void call(final Subscriber<? super T> subscriber) {
//……
}
}
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
@Override
public void call(final Subscriber<? super T> subscriber) {
//……
}
}
The create method creates a new Observable, and the OnSubscribe object in this new Observable is an instance of the operator OperatorSubscribeOn.
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
The SubscribeOn method returns this new Observable.
The subsequent subscribe method is a method of this new Observable, with the parameter being a subscriber. The subscribe method will call the call method of the OnSubscribe in this new Observable, which is the call method of OperatorSubscribeOn, with the parameter being the original subscriber.
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//……
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
//……
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//……
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
//……
}
OperatorSubscribeOn gets a worker from the scheduler, which knows which thread to schedule the task on. In the call method, a new task is created and given to the worker for scheduling. This task creates a new subscriber based on the subscriber passed into the method, which acts as a proxy for the original subscriber. The onNext, onError, and onComplete methods are directly delegated to the original subscriber, but a setProducer method is overridden (the purpose of which is unclear, likely related to backpressure, but can be ignored for now). The task uses the initial Observable to subscribe (unsafeSubscribe) this new subscriber. unsafeSubscribe is similar to subscribe, except it does not consider error handling. unsafeSubscribe will call the call method of the OnSubscribe object in the initial Observable, and this method will be executed in the thread specified by the worker, meaning the event will be generated in the thread specified by the worker.
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
//……
}
};
source.unsafeSubscribe(s);
}
});
}
}
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
//……
}
};
source.unsafeSubscribe(s);
}
});
}
}
ObserveOn
Principle
The event consumption thread is the thread where the target Subscriber's OnNext and other methods are called.
The ObserveOn method returns a new Observable and creates a new Subscriber that wraps the target Subscriber. When subscription (subscribe) occurs, the call method of the new Observable's OnSubscribe will be invoked, which then calls the call method of the initial Observable's OnSubscribe, passing in the new subscriber as the parameter. When methods in the new Subscriber are called, they will be invoked in the new thread (specified by the scheduler) corresponding to the methods in the target Subscriber.
Principle Diagram
Process
The initial Observable calls the observeOn method, which then calls the lift method (which is somewhat different from subscribeOn), with the parameter being OperatorObserveOn.
public final Observable<T> observeOn(Scheduler scheduler) {
//……
return lift(new OperatorObserveOn<T>(scheduler, false));
}
public final Observable<T> observeOn(Scheduler scheduler) {
//……
return lift(new OperatorObserveOn<T>(scheduler, false));
}
OperatorObserveOn is also an operator, but it implements the Operator interface. OperatorObserveOn holds a reference to the scheduler and implements the call method of the Operator interface.
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
//……
}
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
//……
}
}
Similar to SubscribeOn, the lift method also creates a new Observable. In the call method of the new Observable's OnSubscribe, it calls the call method of the initial Observable's OnSubscribe, with the parameter being the subscriber returned by the call method of OperatorObserveOn, which is of type ObserveOnSubscriber.
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);
}
});
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
parent.init();
return parent;
}
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);
}
});
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
parent.init();
return parent;
}
}
ObserveOnSubscriber essentially wraps the original Subscriber, holding a reference to a worker that calls the original subscriber's methods within the worker.
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
@Override
public void onNext(final T t) {
//……
schedule();
}
@Override
public void onCompleted() {
//……
schedule();
}
@Override
public void onError(final Throwable e) {
//……
schedule();
}
protected void schedule() {
recursiveScheduler.schedule(this);
}
@Override
public void call() {
//……
localChild.onNext(localOn.getValue(v));
//……
}
}
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
@Override
public void onNext(final T t) {
//……
schedule();
}
@Override
public void onCompleted() {
//……
schedule();
}
@Override
public void onError(final Throwable e) {
//……
schedule();
}
protected void schedule() {
recursiveScheduler.schedule(this);
}
@Override
public void call() {
//……
localChild.onNext(localOn.getValue(v));
//……
}
}
When the subscribe() method is called, the call method of the new Observable's OnSubscribe is invoked, which calls the call method of the initial Observable's OnSubscribe, passing in the new subscriber as the parameter. When the methods of the new subscriber are called, they will be invoked in the specified thread, meaning the events are consumed in the specified thread.
Mixing Multiple SubscribeOn and ObserveOn
There are a total of 5 operations on events in the diagram.
As can be seen from the diagram, operations ① and ② are influenced by the first subscribeOn(), running on the red thread; operations ③ and ④ are influenced by the first observeOn(), running on the green thread; operation ⑤ is influenced by the second observeOn(), running on the purple thread; while the second subscribeOn() has no effect on the entire process because the thread was interrupted by the first subscribeOn() during the notification process.
Therefore, when multiple subscribeOn() are used, only the first subscribeOn() takes effect.
Scheduler
The scheduler, Scheduler, acts as a thread controller, allowing RxJava to specify which thread each segment of code should run on.
Schedulers
Schedulers is a factory for creating Schedulers, providing various types of schedulers.
- Schedulers.immediate(): Runs directly in the current thread, equivalent to not specifying a thread. This is the default Scheduler.
- Schedulers.newThread(): Always enables a new thread and executes operations in a new thread.
- Schedulers.io(): The Scheduler used for I/O operations (reading/writing files, reading/writing databases, network information exchange, etc.). Its behavior is similar to newThread(), except that io() internally uses a thread pool with no upper limit on the number of threads, allowing it to reuse idle threads. Therefore, in most cases, io() is more efficient than newThread(). Do not place computational work in io() to avoid creating unnecessary threads.
- Schedulers.computation(): The Scheduler used for computations. This computation refers to CPU-intensive calculations, which are not limited by I/O operations, such as graphical calculations. This Scheduler uses a fixed thread pool, sized according to the number of CPU cores. Do not place I/O operations in computation(), as the waiting time for I/O operations will waste CPU resources.
Scheduler is an abstract class primarily responsible for creating workers, while specific tasks are completed by the workers. The abstract class Scheduler.Worker represents a sequence of work and contains specific scheduling methods. Worker implements the unsubscribe() method of the Subscription interface to recycle resources when unsubscribing.
We will illustrate using Schedulers.newThread() and Schedulers.io() as examples.
Schedulers.newThread()
Schedulers.newThread() returns a NewThreadScheduler object.
NewThreadScheduler extends Scheduler and overrides the createWorker() method, where it creates a NewThreadWorker. NewThreadWorker extends Scheduler.Worker and overrides the schedule() method.
NewThreadScheduler creates a NewThreadWorker each time through createWorker, and tasks are completed by the new worker without requiring additional operations.
NewThreadWorker creates a thread pool with a size of 1 each time.
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
ScheduledExecutorService is a class provided in the java.util.concurrent package that can execute tasks at fixed rates or periodically.
The schedule methods in NewThreadWorker are completed by calling methods from ScheduledExecutorService. This way, the code in NewThreadWorker should be minimal, but it also considers the issue of tasks being canceled before execution in the new thread. In Java 7+, simply enabling the RemoveOnCancel strategy allows tasks to be immediately removed from the task queue when canceled, preventing memory leaks. For Java versions that do not support this strategy, NewThreadWorker will create a new static thread shared by all NewThreadWorkers, which will periodically call exec.purge() to remove any tasks that have been canceled but not yet executed from each NewThreadWorker.
Schedulers.io()
Schedulers.io() returns a CachedThreadScheduler object.
As shown in the class diagram, EventLoopWorker, CachedWorkerPool, and ThreadWorker are all inner classes of CachedThreadScheduler.
CachedThreadScheduler is used to create EventLoopWorker and also holds a CachedWorkerPool.
CachedWorkerPool manages a queue of ThreadWorkers that can be reused. ThreadWorker extends NewThreadWorker and adds an expiration time property. CachedWorkerPool also has an evictorService thread that periodically removes expired ThreadWorkers from the queue.
EventLoopWorker retrieves a ThreadWorker from CachedWorkerPool each time. The createWorker method of CachedThreadScheduler creates an EventLoopWorker, but the actual scheduling is done by the ThreadWorker obtained from CachedWorkerPool.
AndroidSchedulers
AndroidSchedulers is located in the rx.android.schedulers package of RxAndroid, which contains only two files: AndroidSchedulers.java and HandlerScheduler.java.
AndroidSchedulers has only one public method, mainThread(), which returns a HandlerScheduler object. When creating this Scheduler object, new Handler(Looper.getMainLooper()) is passed as a constructor parameter.
Similarly, HandlerScheduler creates a new HandlerWorker through the createWorker method, where HandlerWorker is an inner class of HandlerScheduler. In the schedule method of HandlerWorker, tasks are handed over to the handler for execution.
handler.postDelayed(scheduledAction, unit.toMillis(delayTime));
handler.postDelayed(scheduledAction, unit.toMillis(delayTime));
Comments
No comments yet. Be the first to comment!