图解RxJava2(一)

前言

从这篇文章开始,系统地学习RxJava2设计思想和源码实现。说起大热门RxJava,网上有很多例如响应式编程、观察者模式等介绍,也有一些优秀的文章以上、下游等概念引初学者入门,在初步学习之后,可能感觉有所收获,但是总觉得不够解渴,要真正知晓其原理,还得结合源码加深理解。

例子

通过生活中的几个角色来学习RxJava2:饭店、厨师、服务员、顾客。

模拟一个情景:饭店有一个很火的套餐,顾客来店默认就要这个套餐(不存在服务员咨询顾客要什么的过程),所以情况应该是这样的

上面的漫画写成RxJava2就是很多入门文章中看到的:事件发起者(上游)

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("服务员从厨师那取得 扁食");
e.onNext("扁食");
System.out.println("服务员从厨师那取得 拌面");
e.onNext("拌面");
System.out.println("服务员从厨师那取得 蒸饺");
e.onNext("蒸饺");
System.out.println("厨师告知服务员菜上好了");
e.onComplete();
}
});

事件接收者(下游)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("来个沙县套餐!!!");
}

@Override
public void onNext(String s) {
System.out.println("服务员端给顾客 " + s);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
System.out.println("服务员告诉顾客菜上好了");
}
};

建立联系

1
observable.subscribe(observer);

打印如下:

1
2
3
4
5
6
7
8
9
来个沙县套餐!!!
服务员从厨师那取得 拌面
服务员端给顾客 拌面
服务员从厨师那取得 扁食
服务员端给顾客 扁食
服务员从厨师那取得 蒸饺
服务员端给顾客 蒸饺
厨师告知服务员菜上好了
服务员告诉顾客菜上好了

下面把一些类代入角色结合源码分析,演员表

源码分析

最初看源码的时候容易因为各个类名字起得很相似而看晕,因此先把涉及到的类之间的关系画出来

Observable 是个抽象类,其子类是 ObservableCreate ,如果把 Observable 比成饭店,那 ObservableCreate 就是沙县小吃,看下 Observable 的 create 方法

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

Observable 的 create 方法只是将接收的 ObservableOnSubscribe 作为参数传递给子类 ObservableCreate 真正实例化

1
2
3
4
5
6
7
8
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
}

上面这些代码就是漫画的第一格:饭店要开张(Observable.create),开张的前提是要有一个会做菜的厨师(new ObservableOnSubscribe),接着饭店起名叫沙县小吃(new ObservableCreate),并把这个厨师和沙县小吃建立联系(this.source = source)。厨师有了,但是他并没有立即开始做菜(ObservableOnSubscribe.subscribe()),这个也很好理解,现实生活中厨师也是这样,他做不做菜取决于饭店,毕竟是饭店给他开工资;而饭店是否让厨师做菜很大一个原因取决于有没有顾客上门,看下顾客:

顾客没有什么套路,上菜就吃(onNext),菜上完或菜出问题会有相应的提醒(onComplete/onError),对应上面漫画2。接着看饭店接客 observable.subscribe(observer) 的源码

1
2
3
4
5
6
7
public final void subscribe(Observer<? super T> observer) {
//省略部分代码
subscribeActual(observer);
//省略部分代码
}

protected abstract void subscribeActual(Observer<? super T> observer);

Observable(饭店) 的 subscribe 方法又会调用 subscribeActual 方法,该方法是个抽象方法,具体实现在子类,看看子类 ObservableCreate(沙县小吃)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void subscribeActual(Observer<? super T> observer) {
//步骤①
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//步骤②
observer.onSubscribe(parent);

try {
//步骤③
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

先看下涉及到的类以及所属关系

步骤① Emitter 翻译为发射器,这里名字起得也很形象 CreateEmitter(创建发射器) ,即对应服务员,CreateEmitter 创建的时候接收 Observer,就像一个服务员接待一个顾客一样(对应漫画3服务员说话)

步骤② 执行 onSubscribe 方法并接收 CreateEmitter ,所以看到 log 中最先打印该方法的内容,就像顾客认准之后自己的菜是由这个服务员上的(对应漫画3顾客说话)

步骤③ 调用 ObservableOnSubscribe.subscribe ,并接收 CreateEmitter ,就像厨师和该服务员建立联系,之后厨师做的菜都由该服务员端出去。上什么菜取决于 ObservableOnSubscribe.subscribe 中的实现。

分析到这里发现 CreateEmitter(服务员) 起到枢纽作用,看下代码中 e.onNext/e.onComplete 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}

onNext 中首先判空,服务员端个空盘子出来要被顾客锤成麻瓜;接着发送之前需要执行 isDisposed() 判断,可以理解成顾客是否还需要菜,默认情况下是需要的(!isDisposed() 为 true ),当执行完 onComplete() 方法后会执行 dispose() ,表明顾客不再需要菜了,后续的菜服务员不会再端上来给顾客了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observer<String> observer = new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
this.disposable = d;
System.out.println("来个沙县套餐!!!");

}

@Override
public void onNext(String s) {
if (s.equals("拌面")) {
disposable.dispose();
}
System.out.println("服务员端给顾客 " + s);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("服务员告诉顾客菜上好了");
}
};

打印如下:

1
2
3
4
5
6
来个沙县套餐!!!
服务员从厨师那取得 拌面
服务员端给顾客 拌面
服务员从厨师那取得 扁食
服务员从厨师那取得 蒸饺
厨师告知服务员菜上好了

从上面可以看到一旦执行完 Disposable.dispose() 方法,顾客和服务员就没有后续交流了,就像 Disposable 翻译的那样「一次性」,理解成顾客对服务员说「后续的菜都别上了,你也不要再出现在我面前」;但是服务员和厨师的交流还是保持着,默认情况下厨师并不知道顾客不需要菜了,因此他还是继续做菜,然后交给服务员端出去。当然我们也可以在厨师做下一道菜的之前,判断下顾客还要不要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
System.out.println("服务员从厨师那取得 拌面");
e.onNext("拌面");
}
if (!e.isDisposed()) {
System.out.println("服务员从厨师那取得 扁食");
e.onNext("扁食");
}
if (!e.isDisposed()) {
System.out.println("服务员从厨师那取得 蒸饺");
e.onNext("蒸饺");
}
if (!e.isDisposed()) {
System.out.println("厨师告知服务员菜上好了");
e.onComplete();
}

}
});

打印如下:

1
2
3
来个沙县套餐!!!
服务员从厨师那取得 拌面
服务员端给顾客 拌面

再看下另一种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("服务员从厨师那取得 拌面");
e.onNext("拌面");
System.out.println("服务员从厨师那取得 扁食");
e.onNext("扁食");
System.out.println("服务员从厨师那取得 蒸饺");
e.onNext("蒸饺");
System.out.println("厨师告知服务员菜上好了");
e.onComplete();
}
});
observable.subscribe();

打印如下

1
2
3
4
服务员从厨师那取得 拌面
服务员从厨师那取得 扁食
服务员从厨师那取得 蒸饺
厨师告知服务员菜上好了

上面分析了要有顾客厨师才会做菜,这都没顾客怎么也做菜呢?看下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
//省略判空
//默认的顾客
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}

public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}

protected abstract void subscribeActual(Observer<? super T> observer);

原来系统会默认创建一个 LambdaObserver(默认顾客) ,服务员从厨师那端的菜会传给这个顾客。所以可以看出厨师做不做菜只取决于饭店(Observable.subscribe),后面的流程和上面分析的一致。另外上面的代码还出现了Consumer、Action类,这些类里也有对事件的处理,可以理解成顾客选择接收服务员的哪些信息,在 functions 包下还有其他实现

subscribe() 有下面几个重载方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//顾客只关心上什么菜
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}

//顾客关心上什么菜以及菜是不是出问题
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}

//顾客关心上什么菜、菜是不是有问题、菜是不是上完了
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}

//顾客关心上什么菜、菜是不是有问题、菜是不是上完了、
//onSubscribe()中可以获取Disposable引用,之后选择告诉服务员是否继续上菜
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}

//由顾客自己决定关心哪些事件,和上一条效果一样
public final void subscribe(Observer<? super T> observer) {}

如果顾客只关心上什么菜,我们可以这么写:

1
2
3
4
5
6
7
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("服务员端给顾客 " + s);
}
};
observable.subscribe(consumer);

打印如下:

1
2
3
4
5
6
7
服务员从厨师那取得 拌面
服务员端给顾客 拌面
服务员从厨师那取得 扁食
服务员端给顾客 扁食
服务员从厨师那取得 蒸饺
服务员端给顾客 蒸饺
厨师告知服务员菜上好了