图解RxJava2(三)

概述

上篇文章只分析了 RxJava 中 subscribeOn 方法的实现原理,然而只使用 subscribeOn 发现上下游都是执行在子线程中。在日常开发中往往是将上游耗时任务通过 subscribeOn 指定在子线程中,下游通常是更新 UI 等需要在主线程中进行,使用 observeOn(AndroidSchedulers.mainThread()) 就能实现,那么它是怎么做到的呢?

例子

基于上篇文章的代码,修改上下游联系,添加 observeOn(AndroidSchedulers.mainThread())

1
2
3
4
source.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
Log.e(TAG, "其他任务执行");

打印如下

此时主线程中的「其他任务」没有被阻塞,上游的耗时任务在子线程 RxNewThreadScheduler-1 中执行,而下游接收任务在主线程中进行,并且事件传递不保证顺序(多次执行输出可能都不同),这也是多线程执行顺序的不确定性特点,上篇已介绍过。

源码分析

1
2
3
source.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

有前两篇分析经历,现在就轻车熟路:执行 Observable.create 、 new Observer 、Schedulers.newThread()、

subscribeOn(Scheduler) 后此时主线程应该是下面的样子

AndroidSchedulers.mainThread()

AndroidSchedulers 是 RxAndroid 中提供的,使用前需要在 Android Studio 中添加依赖。mainThread() 最后会创建 HandlerScheduler:

1
new HandlerScheduler(new Handler(Looper.getMainLooper()));

HandlerScheduler 也是 Scheduler 的子类,在初始化 HandlerScheduler 的时候创建了一个持有主线程 Looper 的 Handler ,可以猜想后面线程切换很有可能就是 Handler 机制的那一套。此时的主线程

observeOn(Scheduler scheduler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final Observable<T> observeOn(Scheduler scheduler) {
//bufferSize()默认为128
return observeOn(scheduler, false, bufferSize());
}
/**
* @param scheduler 调度器
* @param delayError 发生异常是否马上结束事件流
* @param bufferSize 缓存队列的默认大小
* @return Observable
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略判空代码
//这里的this就是之前传下来的ObservableSubscribeOn(黄焖鸡饭店)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

该方法返回 Observable ,创建了 ObservableObserveOn(已经习惯了,就这几个英文单词排列组合),它也是 Observable 的子类,结合我们举的例子,就给它起名肯德基,肯德基持有黄焖鸡饭店的引用,初始化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//省略其他代码
}

此时的主线程

subscribe(Observer observer)

由上两篇分析可知,这里会先去执行 ObservableObserveOn(肯德基) 的 subscribeActual(observer) 方法,这里的 observer 是顾客小明

1
2
3
4
5
6
7
8
9
10
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//步骤①
Scheduler.Worker w = scheduler.createWorker();
//步骤② source 即 ObservableSubscribeOn(黄焖鸡饭店)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

步骤① 和上篇一样,这里也会创建 Worker,具体实现在 HandlerScheduler 中

1
2
3
4
5
6
7
8
9
10
11
12
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
HandlerWorker(Handler handler) {
this.handler = handler;
}
//省略其他代码
}

并把之前持有主线程 Looper 的 Handler 传进去。

步骤② 先创建了 ObserveOnObserver(总起这种很操蛋的名字),作为 ObservableObserveOn(肯德基)的内部类,它是 BasicIntQueueDisposable 的子类(保证原子性、拥有操作队列功能、保证一次性操作),实现Observer接口(也是个顾客)。结合例子,就给它起名叫顾客小强,只是这个小强功能比较强大,小强持有小明的引用。

接着执行 ObservableSubscribeOn(黄焖鸡饭店)的 subscribe ,具体实现是 subscribeActual :

1
2
3
4
5
6
7
8
public void subscribeActual(final Observer<? super T> s) {
//小红登场,持有小强的引用
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//调用小强的onSubscribe,把小红传进去
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

看下小强(ObserveOnObserver)的 onSubscribe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void onSubscribe(Disposable s) {
//判空操作
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//小红不是 QueueDisposable 类型
if (s instanceof QueueDisposable) {
//省略部分代码
}
// 创建一个大小为128的队列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//actual是小明,执行小明的 onSubscribe ,所以看到 log 打印了
actual.onSubscribe(this);
}
}

这里创建了一个队列,大小为128。到目前为止所有操作都发生在主线程中。

回到上面,继续执行

1
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

上篇文章已经介绍过了,具体流程如下

在上篇介绍到这的时候说,接下来的操作都是在子线程中进行的,那此时这里会有什么转折呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void subscribeActual(Observer<? super T> observer) {
//创建服务员,并和顾客联系,这里的顾客是小红
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//执行顾客小红的的 onSubscribe
observer.onSubscribe(parent);
try {
//厨师做菜,并和服务员联系
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

后续还有:服务员端菜(CreateEmitter.onNext) —> 顾客小红拿到菜(SubscribeOnObserver.onNext) ,到这里都是执行在子线程中(卧槽,怎么还没切线程啊,这都快走完了),接着小强拿到菜(ObserveOnObserver.onNext),看下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void onNext(T t) {
if (done) {
return;
}
//上游数据类型不是异步的
if (sourceMode != QueueDisposable.ASYNC) {
//将上游下发的数据放入队列中
queue.offer(t);
}
//调度
schedule();
}
void schedule() {
//保证原子性操作
if (getAndIncrement() == 0) {
//执行HandlerWorker的schedule,传入的this也就是小强
//上面说了小强很强大,除了顾客身份,还是个Runnable
worker.schedule(this);
}
}

最后执行 HandlerScheduler 的 schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//这里的handler持有mainLooper,所以传进去的Runnable会在主线程中执行
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}

原来用的是 Handler 机制来完成的,那 Runnable 具体执行的是什么呢?看下小强的 run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void run() {
//默认为false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
//获取队列引用
final SimpleQueue<T> q = queue;
//获取小明引用
final Observer<? super T> a = actual;
for (;;) {
//检查是否没有数据要发送
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//从队列中取数据
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//执行小明的onNext()
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

就是从队列中取出传过来的数据,交给小明的 onNext 方法执行,所以小明的 onNext 是在主线程中执行,这部分流程如下(Queue 即小强内部维护的队列):

上图的事件调度不保证顺序,只是模拟了其中一种情况。

多次observeOn

1
2
3
4
source.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(observer);

上面我先把下游接收事件指定在主线程,再指定在一个新的线程,打印如下:

看到此时下游接收事件被成功执行在后指定的新线程,这里是怎么实现的呢?分解下代码

1
2
source.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())

执行到这的时候应该是这样的

这也是上面分析过的, subscribeOn 返回的 Observable 称为黄焖鸡店(ObservableSubscribeOn),observeOn 返回的 Observable 称为肯德基1号店(ObservableObserveOn),肯德基1号店持有黄焖鸡店的引用;接着

1
.observeOn(Schedulers.newThread())

执行到这的时候应该是这样的

把第二次 observeOn 返回的 Observable 称为肯德基2号店(ObservableObserveOn),肯德基2号店持有1号店的引用;接着

1
.subscribe(observer);

会先执行肯德基2号店的 subscribeActual 方法,这里的 observer 是小明

1
2
3
4
5
6
7
8
9
10
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//此时schduler是NewThreadScheduler,创建的worker是NewThreadWorker
Scheduler.Worker w = scheduler.createWorker();
//source是肯德基1号店,observer是小明
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

这里会创建小强(ObserveOnObserver),为了和后面区分开,就叫他2号店小强,2号店小强持有小明的引用,之后执行肯德基1号店的 subscribeActual ,observer 是肯德基2号店小强

1
2
3
4
5
6
7
8
9
10
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//此时schduler是HandlerScheduler,创建的Worker是HandlerWorker
Scheduler.Worker w = scheduler.createWorker();
//source是黄焖鸡店,observer是肯德基2号店小强
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

这里又创建了小强,就叫他肯德基1号店小强,1号店小强持有2号店小强的引用,整个过程如下

接着执行黄焖鸡店的 subscribeActual,observer 是肯德基1号店小强

1
2
3
4
5
6
7
8
public void subscribeActual(final Observer<? super T> s) {
//s 是肯德基1号店小强,小红登场,小红持有1号店小强引用
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//执行1号店小强的onSubscribe方法
s.onSubscribe(parent);
//此时schduler是NewThreadScheduler
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

执行到上面 s.onSubscribe(parent) 是应该是这样的

因此 onSubscribe 方法还是执行在主线程中;其实看到这就有点明白了,就是一层层的回调…接着执行后面的流程,直接上图

上图省略了其他事件,并且省略了事件入队的过程,至此分析完毕。

加个鸡腿