一部分人压根就没想过Publisher和Subscriber的代码分别在什么线程里执行,
做Android的人天然会意识到这个,因为很多例子里都会写subScribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread()),所以没问题。
做了个测试工程。代码都是在里。
得到几个大白话的结论,可不容易在reactivex的文档里明确找到。
文档里那一堆花花绿绿的图真的能够给人信心吗?虽说画的很有道理,连thread的颜色都有区别,可总不容大白话好啊。
1. 线程指定
如果subscribeOn和observeOn都不指定,那么所有的动作都在当前线程里顺序执行。他们的作用分别是指定了Publisher和Subscriber的动作在什么thread里执行。
subscribeOn(scheduler) 这个函数名字取得真不合适,让人误以为subscriber的回调函数执行在指定的thread里,实际上是指定了Publisher代码执行在指定的thread里。
非要抬杠的话,由于后续的subscriber默认就的确被调用在该thread里,而且publisher的动作实际是在OnSubscibe callback里才被执行的,所以这个函数名马马虎虎说得过去,但是后续的observeOn才是真正指定了subscriber callback执行thread啊?
明明是Publisher/Subscriber对称的名词,却冒出了observerOn和subscribeOn两个类似的函数名称,没有人觉得不爽?
测试代码摘要:
private ObservableslowPublisher = Observable.create(subscriber -> { sleep(1); //sleep 1ms to just let other thread run so can get predictable output println("[SLOW publisher] begin"); println("[SLOW publisher] do some work"); sleep(3000); println("[SLOW publisher] publish"); subscriber.onNext("SLOW result"); subscriber.onCompleted(); println("[SLOW publisher] end"); } ); @Test public void test_publisher_subscriber_in_current_thread() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("17:02:56.571 @CurrentThread Enter test function"); assertOut("17:02:56.667 @CurrentThread [SLOW publisher] begin"); assertOut("17:02:56.667 @CurrentThread [SLOW publisher] do some work"); assertOut("17:02:59.668 @CurrentThread [SLOW publisher] publish"); assertOut("17:02:59.668 @CurrentThread ---- subscriber got SLOW result"); assertOut("17:02:59.668 @CurrentThread [SLOW publisher] end"); assertOut("17:02:59.669 @CurrentThread Leave test function"); } @Test public void test_publisher_in_a_thread_and_subscriber_also_in_same() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("11:49:51.169 @CurrentThread Enter test function"); assertOut("11:49:51.217 @CurrentThread Leave test function"); assertOut("11:49:51.218 @RxWorkThread1 [SLOW publisher] begin"); assertOut("11:49:51.218 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("11:49:54.221 @RxWorkThread1 [SLOW publisher] publish"); assertOut("11:49:54.221 @RxWorkThread1 ---- subscriber got SLOW result"); assertOut("11:49:54.222 @RxWorkThread1 [SLOW publisher] end"); } @Test public void test_publisher_in_a_thread_and_subscriber_in_another() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .observeOn(schedulerForWorkThread2) //cause subscriber run in another new thread .subscribe(result -> { sleep(1); //sleep 1ms to just let other thread run so can get predictable output println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("12:41:10.523 @CurrentThread Enter test function"); assertOut("12:41:10.676 @CurrentThread Leave test function"); assertOut("12:41:10.678 @RxWorkThread1 [SLOW publisher] begin"); assertOut("12:41:10.678 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("12:41:13.682 @RxWorkThread1 [SLOW publisher] publish"); assertOut("12:41:13.682 @RxWorkThread1 [SLOW publisher] end"); assertOut("12:41:13.683 @RxWorkThread2 ---- subscriber got SLOW result"); }
追踪了一大坨rxjava-1.1.5-sources.jar里的代码后,大致的动作归纳起来就是:
[Publisher]
in some thread(如果没指定subscribeOn(...)那就是当前thread): Do some long time work result_queue.putAndSetSignal(result)[Subscriber]
in some thread(如果没指定observeOn(...)那就和Publisher的一样,否则就是别的thread): result = result_queue.getOrWait() callback.onNext(result) callback.onComplete()2. 结果硬取
如果有个Rx(例如Rx<String>),要想在当前thread里就取出结果(例如String),那么唯一的方法就是toBlocking()然后.first()或者.iterable()之类的。他就这么设计的,几乎所有的方法的返回值依然是个Rx以便链式调用。
以toBlocking().first()为例,看看 rxjava-1.1.5-sources.jar!/rx/observables/BlockingObservable.java 里这部分的源码:
private T blockForSingle(final Observable observable) { final AtomicReferencereturnItem = new AtomicReference (); final AtomicReference returnException = new AtomicReference (); final CountDownLatch latch = new CountDownLatch(1); Subscription subscription = ((Observable )observable).subscribe(new Subscriber () { @Override public void onCompleted() { latch.countDown(); } @Override public void onError(final Throwable e) { returnException.set(e); latch.countDown(); } @Override public void onNext(final T item) { returnItem.set(item); } }); BlockingUtils.awaitForComplete(latch, subscription); if (returnException.get() != null) { if (returnException.get() instanceof RuntimeException) { throw (RuntimeException) returnException.get(); } else { throw new RuntimeException(returnException.get()); } } return returnItem.get(); }
真心不可怕,看完了完全掌控了。大致意思就是:
当前thread:
out_result = null fire publisher_event to start Publisher wait complete_eventSubscriber在自己的thread里:
result = result_queue.getOrWait() callback.onNext(result) out_result = result fire complete_event破除了暧昧和神秘感。自己做一个也比较简单,无非就是搞个信号等待一下,只是做得粗糙,没考虑到错误时的信号通知,代码就不贴了。
3. Error处理
对于阻塞式Rx(经过toBlocking()转换之后的Rx),必须在try/catch里做。而一般的Rx,必须在subscribe(...onError handler)做。
相比这个error handler,还有一个令人混淆的doOnError(handler),它的效果和前者不一样,并不被看作真正的error handler,就是说出了错误的时候如果没有找到前者,Subscriber thread里会爆OnErrorNotImplementedException。
@Test public void test_error_in_blocking_mode_will_always_throw_out_in_current_thread() throws Exception { new Thread(() -> { println("Enter test function"); try { errorPublisher .toBlocking().first(); } catch (Exception e) { println("---- test1: " + e); } try { errorPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .toBlocking().first(); } catch (Exception e) { println("---- test2: " + e); } try { errorPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .doOnError(e -> {/*do nothing*/}) .toBlocking().first(); } catch (Exception e) { println("---- test3: " + e); } println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("14:38:43.078 @CurrentThread Enter test function"); assertOut("14:38:43.113 @CurrentThread ---- test1: java.lang.NullPointerException"); assertOut("14:38:43.126 @CurrentThread ---- test2: java.lang.NullPointerException"); assertOut("14:38:43.152 @CurrentThread ---- test3: java.lang.NullPointerException"); assertOut("14:38:43.152 @CurrentThread Leave test function"); } @Test public void test_error_subscribe_without_error_handler() throws Exception { new Thread(() -> { println("Enter test function"); try { errorPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .subscribe(); } catch (Exception e) { println("---- should not come here : " + e); } println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("21:06:34.342 @CurrentThread Enter test function"); assertOut("21:06:34.397 @CurrentThread Leave test function"); } @Test public void test_error_subscribe_without_error_handler_cause_OnErrorNotImplementedException() throws Exception { new Thread(() -> { println("Enter test function"); try { errorPublisher .subscribe(); } catch (Exception e) { println("---- test1 error: " + e); } try { errorPublisher .doOnError(e -> {/*do nothing*/}) .subscribe(); } catch (Exception e) { println("---- test2 error: " + e); } println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("14:15:57.268 @CurrentThread Enter test function"); assertOut("14:15:57.304 @CurrentThread ---- test1 error: rx.exceptions.OnErrorNotImplementedException"); assertOut("14:15:57.308 @CurrentThread ---- test2 error: rx.exceptions.OnErrorNotImplementedException"); assertOut("14:15:57.308 @CurrentThread Leave test function"); } @Test public void test_error_subscribe_with_error_handler() throws Exception { new Thread(() -> { println("Enter test function"); errorPublisher .subscribe(result -> { //nothing }, e -> { //onError sleep(1); //sleep 1ms to just let other thread run so can get predictable output println("-------- error1: " + e); }, () -> { //onComplete //nothing }); errorPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .subscribe(result -> { //nothing }, e -> { //onError sleep(1); //sleep 1ms to just let other thread run so can get predictable output println("-------- error2: " + e); }, () -> { //onComplete //nothing }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("16:19:20.813 @CurrentThread Enter test function"); assertOut("16:19:20.830 @CurrentThread -------- error1: java.lang.NullPointerException"); assertOut("16:19:20.840 @CurrentThread Leave test function"); assertOut("16:19:20.843 @RxWorkThread1 -------- error2: java.lang.NullPointerException"); }
4. Merge,Concat顺序
Merge并不保证结果的顺序。Concat操作是确保结果顺序的。
@Test public void test_merge_in_parallel_but_result_order_not_predictable() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .mergeWith( fastPublisher .subscribeOn(schedulerForWorkThread2) ) .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("10:50:56.507 @CurrentThread Enter test function"); assertOut("10:50:56.637 @CurrentThread Leave test function"); assertOut("10:50:56.638 @RxWorkThread1 [SLOW publisher] begin"); assertOut("10:50:56.638 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("10:50:56.690 @RxWorkThread2 [FAST publisher] begin"); assertOut("10:50:56.690 @RxWorkThread2 ---- subscriber got FAST result"); assertOut("10:50:56.691 @RxWorkThread2 [FAST publisher] end"); assertOut("10:50:59.640 @RxWorkThread1 [SLOW publisher] publish"); assertOut("10:50:59.640 @RxWorkThread1 ---- subscriber got SLOW result"); assertOut("10:50:59.641 @RxWorkThread1 [SLOW publisher] end"); } @Test public void test_merge_in_serial_if_not_specify_schedule_thread() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .mergeWith( fastPublisher ) .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("10:52:59.765 @CurrentThread Enter test function"); assertOut("10:52:59.846 @CurrentThread [SLOW publisher] begin"); assertOut("10:52:59.846 @CurrentThread [SLOW publisher] do some work"); assertOut("10:53:02.849 @CurrentThread [SLOW publisher] publish"); assertOut("10:53:02.850 @CurrentThread ---- subscriber got SLOW result"); assertOut("10:53:02.850 @CurrentThread [SLOW publisher] end"); assertOut("10:53:02.904 @CurrentThread [FAST publisher] begin"); assertOut("10:53:02.904 @CurrentThread ---- subscriber got FAST result"); assertOut("10:53:02.904 @CurrentThread [FAST publisher] end"); assertOut("10:53:02.905 @CurrentThread Leave test function"); } @Test public void test_concat_will_always_serially_so_predictable_order() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .concatWith( fastPublisher .subscribeOn(schedulerForWorkThread2) ) .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("10:53:55.314 @CurrentThread Enter test function"); assertOut("10:53:55.454 @CurrentThread Leave test function"); assertOut("10:53:55.456 @RxWorkThread1 [SLOW publisher] begin"); assertOut("10:53:55.456 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("10:53:58.461 @RxWorkThread1 [SLOW publisher] publish"); assertOut("10:53:58.461 @RxWorkThread1 ---- subscriber got SLOW result"); assertOut("10:53:58.463 @RxWorkThread1 [SLOW publisher] end"); assertOut("10:53:58.516 @RxWorkThread2 [FAST publisher] begin"); assertOut("10:53:58.516 @RxWorkThread2 ---- subscriber got FAST result"); assertOut("10:53:58.517 @RxWorkThread2 [FAST publisher] end"); }
练的差不多了,
5. 挑战:异种Rx并发执行+统一掌控
假设有3个不同类型的Rx, 说不准谁快,其中有一个还有可能出错误,现在想让这三个Rx并行执行,但是要等待得到所有的结果,包括错误。
说真的,如果不是硬要用RxJava里的方法,那就各自并行publish,subscribe,在callback里用CountDownLatch信号量来搞算了,没啥难的,还好懂。
private static class RxResult{ T result; Throwable error; } private static class RxResultAndSignal extends RxResult { CountDownLatch latch = new CountDownLatch(1); }
@Test public void test_merge_3_different_type_rx_by_self_idea() throws Exception { new Thread(() -> { println("Enter test function"); RxResultAndSignalr1 = new RxResultAndSignal<>(); RxResultAndSignal r2 = new RxResultAndSignal<>(); RxResultAndSignal r3 = new RxResultAndSignal<>(); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .subscribe(result -> { r1.result = result; }, e -> { //onError r1.error = e; r1.latch.countDown(); }, () -> { //onComplete r1.latch.countDown(); }); incompatiblePublisher .subscribeOn(schedulerForWorkThread2) .subscribe(result -> { r2.result = result; }, e -> { //onError r2.error = e; r2.latch.countDown(); }, () -> { //onComplete r2.latch.countDown(); }); buggyPublisher .subscribeOn(schedulerForWorkThread3) .subscribe(result -> { r3.result = result; }, e -> { //onError r3.error = e; r3.latch.countDown(); }, () -> { //onComplete r3.latch.countDown(); }); try { r1.latch.await(); r2.latch.await(); r3.latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } sleep(1); //sleep 1ms to let other thread run so can get predictable output println("---- got all result: {" + r1.result + "}, {" + r2.result + "}, {" + r3.result + "}"); println("---- got all error: {" + r1.error + "}, {" + r2.error + "}, {" + r3.error + "}"); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("13:27:55.033 @CurrentThread Enter test function"); assertOut("13:27:55.065 @RxWorkThread1 [SLOW publisher] begin"); assertOut("13:27:55.065 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("13:27:55.082 @RxWorkThread2 [Incompatible publisher] begin"); assertOut("13:27:55.082 @RxWorkThread2 [Incompatible publisher] do some work"); assertOut("13:27:55.086 @RxWorkThread3 [Buggy publisher] begin"); assertOut("13:27:57.084 @RxWorkThread2 [Incompatible publisher] end"); assertOut("13:27:58.069 @RxWorkThread1 [SLOW publisher] publish"); assertOut("13:27:58.070 @RxWorkThread1 [SLOW publisher] end"); assertOut("13:27:58.071 @CurrentThread ---- got all result: {SLOW result}, {Incompatible result}, {null}"); assertOut("13:27:58.071 @CurrentThread ---- got all error: {null}, {null}, {java.lang.NullPointerException}"); assertOut("13:27:58.071 @CurrentThread Leave test function"); }
如果非要全用RxJava里的方法实现的话,折腾了一阵子,发现这个方法比较健壮:
@Test public void test_merge_3_different_type_rx() throws Exception { new Thread(() -> { println("Enter test function"); RxResultr1 = new RxResult<>(); RxResult r2 = new RxResult<>(); RxResult r3 = new RxResult<>(); Completable.merge( slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .doOnNext(result -> r1.result = result) .doOnError(e -> r1.error = e) .toCompletable() //so can mergeWith other type rx .onErrorComplete() //auto call complete when error , incompatiblePublisher .subscribeOn(schedulerForWorkThread2) .doOnNext(result -> r2.result = result) .doOnError(e -> r2.error = e) .toCompletable() //so can mergeWith other type rx .onErrorComplete() //auto call complete when error , buggyPublisher .subscribeOn(schedulerForWorkThread3) .doOnNext(result -> r3.result = result) .doOnError(e -> r3.error = e) .toCompletable() //so can mergeWith other type rx .onErrorComplete() //auto call complete when error ) .await(/*can specify total timeout*/); sleep(1); //sleep 1ms to let other thread run so can get predictable output println("---- got all result: {" + r1.result + "}, {" + r2.result + "}, {" + r3.result + "}"); println("---- got all error: {" + r1.error + "}, {" + r2.error + "}, {" + r3.error + "}"); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("13:47:50.789 @CurrentThread Enter test function"); assertOut("13:47:50.852 @RxWorkThread1 [SLOW publisher] begin"); assertOut("13:47:50.852 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("13:47:50.862 @RxWorkThread2 [Incompatible publisher] begin"); assertOut("13:47:50.863 @RxWorkThread2 [Incompatible publisher] do some work"); assertOut("13:47:50.869 @RxWorkThread3 [Buggy publisher] begin"); assertOut("13:47:52.868 @RxWorkThread2 [Incompatible publisher] end"); assertOut("13:47:53.854 @RxWorkThread1 [SLOW publisher] publish"); assertOut("13:47:53.854 @RxWorkThread1 [SLOW publisher] end"); assertOut("13:47:53.856 @CurrentThread ---- got all result: {SLOW result}, {Incompatible result}, {null}"); assertOut("13:47:53.856 @CurrentThread ---- got all error: {null}, {null}, {java.lang.NullPointerException}"); assertOut("13:47:53.856 @CurrentThread Leave test function"); }
绝对还有其他方法。RxJava构建了一个自己的世界,要折腾什么样子的都不稀奇,不过些复杂了就不好懂,光看暧昧的方法调用其实无法让人确信如自己希望的那样动作。