RxJava 源码阅读(1)— 关键类

本文将介绍 RxJava 的实现中几个比较重要的类。

强烈推荐 《给 Android 开发者的 RxJava 详解》 这篇文章!

注:本文研究的 RxJava 源码版本是 1.1.1,使用的根据代码自动生成类图的工具为 Eclipse 插件 AmaterasUML。

RxJavaImportantClass.png

RxJava 的源码主要可以分为三个部分:发布订阅(Observable 相关)、操作符(Operator 相关)、线程调度(Scheduler 相关)

1. 发布订阅

Observable:可观察对象,持有一个 OnSubscribe 的引用,Observable.create 方法用来创建一个 Observable, 入参为 OnSubscribe,当 Observable.subscribe(subscriber) 被调用时,Observable 会调用 OnSubscribe 的 call 方法。

OnSubscribe:Observable 的 create 方法的入参,OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发。

Observer:观察者,它决定事件触发的时候将有怎样的行为:onNext,onCompleted,onError。

Subscriber:实现了 Observer 的抽象类,对 Observer 接口进行了一些扩展:

  • onStart(): 在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置
  • isUnsubscribed(): 判断状态
  • unsubscribe(): 用于取消订阅,在这个方法被调用后,Subscriber 将不再接收事件

Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象。

Subscription:定义了观察者与被观察者之间的订阅关系的接口,是 subscribe 方法的返回值,可查看订阅状态或者用于取消订阅。

Function 和 Action 将方法包装起来,通过参数传递。它们都只有一个方法 call(),区别是 Function 包装有返回值的方法,Action 包装无返回值的方法。

Action0 无参无返回值,由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。

Action1 无返回值,但有一个参数,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。

2. 操作符

RxJava 中的操作符可以分为两类。

第一类操作符实现了 Observable.OnSubscribe 接口,在使用时会调用 Observable 的 create(onsubscirbe),如 OperatorSubscribeOn。

Observable 中的 subscribeOn():

java
public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
      return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}
public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
      return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

第二类操作符实现了 Observable.Operator 接口,在使用时会调用 Observable 的 lift(operator),如 OperatorMap。

Observable 中的 map():

java
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

3. 线程调度

subscribeOn 和 observeOn 两个操作符对应的两个类 OperatorSubscribeOn 和 OperatorObserveOn,这两个操作符都持有一个 Scheduler 的引用。

Schedulers 是一个创建 Scheduler 的工厂,可以提供多种调度器 (Scheduler)。

Scheduler 是一个抽象类,抽象类 Scheduler.Worker 代表了一个工作序列,worker.schedule() 包含了具体的调度方法。以 NewThreadScheduler 为例,NewThreadScheduler 继承了 Scheduler,并重写了 createWorker() 方法,在该方法中创建了一个 NewThreadWorker。NewThreadWork 继承了 Scheduler.Worker,重写了 schedule() 方法。

4. 其他

其他种类的 Obervable:在 rx.observables 包中,包含了一些具有其他附加功能的 Observable,如 BlockingObservable、GroupedObservable 等,可以通过操作符将 Observable 转化成其他的 Observable。

其他种类的 Observer:在 rx.observers 包中,也是附加了功能。

插件:在 rx.plugins 中。

Subject:代表既是可观察者,又是观察者的类型。

Single:一种特殊的 Observable,只发出一个事件。

Subscription:一些 Subscription 的特定实现。

评论

后继续评论需要管理员审核后可见

暂无评论,来发表第一条评论吧