博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava(ReactiveX,Observable)的一些大白话
阅读量:6513 次
发布时间:2019-06-24

本文共 22198 字,大约阅读时间需要 73 分钟。

hot3.png

一部分人压根就没想过Publisher和Subscriber的代码分别在什么线程里执行,

做Android的人天然会意识到这个,因为很多例子里都会写subScribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread()),所以没问题。

做了个测试工程。代码都是在里。

得到几个大白话的结论,可不容易在reactivex的文档里明确找到。

文档里那一堆花花绿绿的图真的能够给人信心吗?虽说画的很有道理,连thread的颜色都有区别,可总不容大白话好啊。

1. 线程指定

如果subscribeOn和observeOn都不指定,那么所有的动作都在当前线程里顺序执行。他们的作用分别是指定了Publisher和Subscriber的动作在什么thread里执行。

subscribeOn(scheduler) 这个函数名字取得真不合适,让人误以为subscriber的回调函数执行在指定的thread里,实际上是指定了Publisher代码执行在指定的thread里。

非要抬杠的话,由于后续的subscriber默认就的确被调用在该thread里,而且publisher的动作实际是在OnSubscibe callback里才被执行的,所以这个函数名马马虎虎说得过去,但是后续的observeOn才是真正指定了subscriber callback执行thread啊

明明是Publisher/Subscriber对称的名词,却冒出了observerOn和subscribeOn两个类似的函数名称,没有人觉得不爽?

测试代码摘要:

private Observable
slowPublisher = Observable.create(subscriber -> { sleep(1); //sleep 1ms to just let other thread run so can get predictable output println("[SLOW publisher] begin"); println("[SLOW publisher] do some work"); sleep(3000); println("[SLOW publisher] publish"); subscriber.onNext("SLOW result"); subscriber.onCompleted(); println("[SLOW publisher] end"); } ); @Test public void test_publisher_subscriber_in_current_thread() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("17:02:56.571 @CurrentThread Enter test function"); assertOut("17:02:56.667 @CurrentThread [SLOW publisher] begin"); assertOut("17:02:56.667 @CurrentThread [SLOW publisher] do some work"); assertOut("17:02:59.668 @CurrentThread [SLOW publisher] publish"); assertOut("17:02:59.668 @CurrentThread ---- subscriber got SLOW result"); assertOut("17:02:59.668 @CurrentThread [SLOW publisher] end"); assertOut("17:02:59.669 @CurrentThread Leave test function"); } @Test public void test_publisher_in_a_thread_and_subscriber_also_in_same() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .subscribe(result -> { println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("11:49:51.169 @CurrentThread Enter test function"); assertOut("11:49:51.217 @CurrentThread Leave test function"); assertOut("11:49:51.218 @RxWorkThread1 [SLOW publisher] begin"); assertOut("11:49:51.218 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("11:49:54.221 @RxWorkThread1 [SLOW publisher] publish"); assertOut("11:49:54.221 @RxWorkThread1 ---- subscriber got SLOW result"); assertOut("11:49:54.222 @RxWorkThread1 [SLOW publisher] end"); } @Test public void test_publisher_in_a_thread_and_subscriber_in_another() throws Exception { new Thread(() -> { println("Enter test function"); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .observeOn(schedulerForWorkThread2) //cause subscriber run in another new thread .subscribe(result -> { sleep(1); //sleep 1ms to just let other thread run so can get predictable output println("---- subscriber got " + result); }); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("12:41:10.523 @CurrentThread Enter test function"); assertOut("12:41:10.676 @CurrentThread Leave test function"); assertOut("12:41:10.678 @RxWorkThread1 [SLOW publisher] begin"); assertOut("12:41:10.678 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("12:41:13.682 @RxWorkThread1 [SLOW publisher] publish"); assertOut("12:41:13.682 @RxWorkThread1 [SLOW publisher] end"); assertOut("12:41:13.683 @RxWorkThread2 ---- subscriber got SLOW result"); }

追踪了一大坨rxjava-1.1.5-sources.jar里的代码后,大致的动作归纳起来就是:

[Publisher]

in some thread(如果没指定subscribeOn(...)那就是当前thread):
        Do some long time work
        result_queue.putAndSetSignal(result)

[Subscriber]

in some thread(如果没指定observeOn(...)那就和Publisher的一样,否则就是别的thread):
       result = result_queue.getOrWait()
       callback.onNext(result)
       callback.onComplete()

2. 结果硬取

如果有个Rx(例如Rx<String>),要想在当前thread里就取出结果(例如String),那么唯一的方法就是toBlocking()然后.first()或者.iterable()之类的。他就这么设计的,几乎所有的方法的返回值依然是个Rx以便链式调用。

以toBlocking().first()为例,看看 rxjava-1.1.5-sources.jar!/rx/observables/BlockingObservable.java 里这部分的源码:

private T blockForSingle(final Observable
observable) { final AtomicReference
returnItem = new AtomicReference
(); final AtomicReference
returnException = new AtomicReference
(); final CountDownLatch latch = new CountDownLatch(1); Subscription subscription = ((Observable
)observable).subscribe(new Subscriber
() { @Override public void onCompleted() { latch.countDown(); } @Override public void onError(final Throwable e) { returnException.set(e); latch.countDown(); } @Override public void onNext(final T item) { returnItem.set(item); } }); BlockingUtils.awaitForComplete(latch, subscription); if (returnException.get() != null) { if (returnException.get() instanceof RuntimeException) { throw (RuntimeException) returnException.get(); } else { throw new RuntimeException(returnException.get()); } } return returnItem.get(); }

真心不可怕,看完了完全掌控了。大致意思就是:

当前thread:

    out_result = null
    fire publisher_event to start Publisher
    wait complete_event

Subscriber在自己的thread里:

    result = result_queue.getOrWait()
    callback.onNext(result)
    out_result = result
    fire complete_event

破除了暧昧和神秘感。自己做一个也比较简单,无非就是搞个信号等待一下,只是做得粗糙,没考虑到错误时的信号通知,代码就不贴了。

3. Error处理

对于阻塞式Rx(经过toBlocking()转换之后的Rx),必须在try/catch里做。而一般的Rx,必须在subscribe(...onError handler)做。

相比这个error handler,还有一个令人混淆的doOnError(handler),它的效果和前者不一样,并不被看作真正的error handler,就是说出了错误的时候如果没有找到前者,Subscriber thread里会爆OnErrorNotImplementedException。

@Test    public void test_error_in_blocking_mode_will_always_throw_out_in_current_thread() throws Exception {        new Thread(() -> {            println("Enter test function");            try {                errorPublisher                        .toBlocking().first();            } catch (Exception e) {                println("---- test1: " + e);            }            try {                errorPublisher                        .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread                        .toBlocking().first();            } catch (Exception e) {                println("---- test2: " + e);            }            try {                errorPublisher                        .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread                        .doOnError(e -> {/*do nothing*/})                        .toBlocking().first();            } catch (Exception e) {                println("---- test3: " + e);            }            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("14:38:43.078 @CurrentThread Enter test function");        assertOut("14:38:43.113 @CurrentThread ---- test1: java.lang.NullPointerException");        assertOut("14:38:43.126 @CurrentThread ---- test2: java.lang.NullPointerException");        assertOut("14:38:43.152 @CurrentThread ---- test3: java.lang.NullPointerException");        assertOut("14:38:43.152 @CurrentThread Leave test function");    }    @Test    public void test_error_subscribe_without_error_handler() throws Exception {        new Thread(() -> {            println("Enter test function");            try {                errorPublisher                        .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread                        .subscribe();            } catch (Exception e) {                println("---- should not come here : " + e);            }            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("21:06:34.342 @CurrentThread Enter test function");        assertOut("21:06:34.397 @CurrentThread Leave test function");    }    @Test    public void test_error_subscribe_without_error_handler_cause_OnErrorNotImplementedException() throws Exception {        new Thread(() -> {            println("Enter test function");            try {                errorPublisher                        .subscribe();            } catch (Exception e) {                println("---- test1 error: " + e);            }            try {                errorPublisher                        .doOnError(e -> {/*do nothing*/})                        .subscribe();            } catch (Exception e) {                println("---- test2 error: " + e);            }            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("14:15:57.268 @CurrentThread Enter test function");        assertOut("14:15:57.304 @CurrentThread ---- test1 error: rx.exceptions.OnErrorNotImplementedException");        assertOut("14:15:57.308 @CurrentThread ---- test2 error: rx.exceptions.OnErrorNotImplementedException");        assertOut("14:15:57.308 @CurrentThread Leave test function");    }    @Test    public void test_error_subscribe_with_error_handler() throws Exception {        new Thread(() -> {            println("Enter test function");            errorPublisher                    .subscribe(result -> {                        //nothing                    }, e -> { //onError                        sleep(1); //sleep 1ms to just let other thread run so can get predictable output                        println("-------- error1: " + e);                    }, () -> { //onComplete                        //nothing                    });            errorPublisher                    .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread                    .subscribe(result -> {                        //nothing                    }, e -> { //onError                        sleep(1); //sleep 1ms to just let other thread run so can get predictable output                        println("-------- error2: " + e);                    }, () -> { //onComplete                        //nothing                    });            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("16:19:20.813 @CurrentThread Enter test function");        assertOut("16:19:20.830 @CurrentThread -------- error1: java.lang.NullPointerException");        assertOut("16:19:20.840 @CurrentThread Leave test function");        assertOut("16:19:20.843 @RxWorkThread1 -------- error2: java.lang.NullPointerException");    }

4. Merge,Concat顺序

Merge并不保证结果的顺序。Concat操作是确保结果顺序的。

@Test    public void test_merge_in_parallel_but_result_order_not_predictable() throws Exception {        new Thread(() -> {            println("Enter test function");            slowPublisher                    .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread                    .mergeWith(                            fastPublisher                                    .subscribeOn(schedulerForWorkThread2)                    )                    .subscribe(result -> {                        println("---- subscriber got " + result);                    });            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("10:50:56.507 @CurrentThread Enter test function");        assertOut("10:50:56.637 @CurrentThread Leave test function");        assertOut("10:50:56.638 @RxWorkThread1 [SLOW publisher] begin");        assertOut("10:50:56.638 @RxWorkThread1 [SLOW publisher] do some work");        assertOut("10:50:56.690 @RxWorkThread2 [FAST publisher] begin");        assertOut("10:50:56.690 @RxWorkThread2 ---- subscriber got FAST result");        assertOut("10:50:56.691 @RxWorkThread2 [FAST publisher] end");        assertOut("10:50:59.640 @RxWorkThread1 [SLOW publisher] publish");        assertOut("10:50:59.640 @RxWorkThread1 ---- subscriber got SLOW result");        assertOut("10:50:59.641 @RxWorkThread1 [SLOW publisher] end");    }    @Test    public void test_merge_in_serial_if_not_specify_schedule_thread() throws Exception {        new Thread(() -> {            println("Enter test function");            slowPublisher                    .mergeWith(                            fastPublisher                    )                    .subscribe(result -> {                        println("---- subscriber got " + result);                    });            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("10:52:59.765 @CurrentThread Enter test function");        assertOut("10:52:59.846 @CurrentThread [SLOW publisher] begin");        assertOut("10:52:59.846 @CurrentThread [SLOW publisher] do some work");        assertOut("10:53:02.849 @CurrentThread [SLOW publisher] publish");        assertOut("10:53:02.850 @CurrentThread ---- subscriber got SLOW result");        assertOut("10:53:02.850 @CurrentThread [SLOW publisher] end");        assertOut("10:53:02.904 @CurrentThread [FAST publisher] begin");        assertOut("10:53:02.904 @CurrentThread ---- subscriber got FAST result");        assertOut("10:53:02.904 @CurrentThread [FAST publisher] end");        assertOut("10:53:02.905 @CurrentThread Leave test function");    }    @Test    public void test_concat_will_always_serially_so_predictable_order() throws Exception {        new Thread(() -> {            println("Enter test function");            slowPublisher                    .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread                    .concatWith(                            fastPublisher                                    .subscribeOn(schedulerForWorkThread2)                    )                    .subscribe(result -> {                        println("---- subscriber got " + result);                    });            println("Leave test function");        }, "CurrentThread" /*threadName*/).start();        assertOut("10:53:55.314 @CurrentThread Enter test function");        assertOut("10:53:55.454 @CurrentThread Leave test function");        assertOut("10:53:55.456 @RxWorkThread1 [SLOW publisher] begin");        assertOut("10:53:55.456 @RxWorkThread1 [SLOW publisher] do some work");        assertOut("10:53:58.461 @RxWorkThread1 [SLOW publisher] publish");        assertOut("10:53:58.461 @RxWorkThread1 ---- subscriber got SLOW result");        assertOut("10:53:58.463 @RxWorkThread1 [SLOW publisher] end");        assertOut("10:53:58.516 @RxWorkThread2 [FAST publisher] begin");        assertOut("10:53:58.516 @RxWorkThread2 ---- subscriber got FAST result");        assertOut("10:53:58.517 @RxWorkThread2 [FAST publisher] end");    }

 

练的差不多了,

5. 挑战:异种Rx并发执行+统一掌控

假设有3个不同类型的Rx, 说不准谁快,其中有一个还有可能出错误,现在想让这三个Rx并行执行,但是要等待得到所有的结果,包括错误。

说真的,如果不是硬要用RxJava里的方法,那就各自并行publish,subscribe,在callback里用CountDownLatch信号量来搞算了,没啥难的,还好懂。

private static class RxResult
{ T result; Throwable error; } private static class RxResultAndSignal
extends RxResult
{ CountDownLatch latch = new CountDownLatch(1); }
@Test    public void test_merge_3_different_type_rx_by_self_idea() throws Exception {        new Thread(() -> {            println("Enter test function");            RxResultAndSignal
r1 = new RxResultAndSignal<>(); RxResultAndSignal
r2 = new RxResultAndSignal<>(); RxResultAndSignal
r3 = new RxResultAndSignal<>(); slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .subscribe(result -> { r1.result = result; }, e -> { //onError r1.error = e; r1.latch.countDown(); }, () -> { //onComplete r1.latch.countDown(); }); incompatiblePublisher .subscribeOn(schedulerForWorkThread2) .subscribe(result -> { r2.result = result; }, e -> { //onError r2.error = e; r2.latch.countDown(); }, () -> { //onComplete r2.latch.countDown(); }); buggyPublisher .subscribeOn(schedulerForWorkThread3) .subscribe(result -> { r3.result = result; }, e -> { //onError r3.error = e; r3.latch.countDown(); }, () -> { //onComplete r3.latch.countDown(); }); try { r1.latch.await(); r2.latch.await(); r3.latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } sleep(1); //sleep 1ms to let other thread run so can get predictable output println("---- got all result: {" + r1.result + "}, {" + r2.result + "}, {" + r3.result + "}"); println("---- got all error: {" + r1.error + "}, {" + r2.error + "}, {" + r3.error + "}"); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("13:27:55.033 @CurrentThread Enter test function"); assertOut("13:27:55.065 @RxWorkThread1 [SLOW publisher] begin"); assertOut("13:27:55.065 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("13:27:55.082 @RxWorkThread2 [Incompatible publisher] begin"); assertOut("13:27:55.082 @RxWorkThread2 [Incompatible publisher] do some work"); assertOut("13:27:55.086 @RxWorkThread3 [Buggy publisher] begin"); assertOut("13:27:57.084 @RxWorkThread2 [Incompatible publisher] end"); assertOut("13:27:58.069 @RxWorkThread1 [SLOW publisher] publish"); assertOut("13:27:58.070 @RxWorkThread1 [SLOW publisher] end"); assertOut("13:27:58.071 @CurrentThread ---- got all result: {SLOW result}, {Incompatible result}, {null}"); assertOut("13:27:58.071 @CurrentThread ---- got all error: {null}, {null}, {java.lang.NullPointerException}"); assertOut("13:27:58.071 @CurrentThread Leave test function"); }

如果非要全用RxJava里的方法实现的话,折腾了一阵子,发现这个方法比较健壮:

@Test    public void test_merge_3_different_type_rx() throws Exception {        new Thread(() -> {            println("Enter test function");            RxResult
r1 = new RxResult<>(); RxResult
r2 = new RxResult<>(); RxResult
r3 = new RxResult<>(); Completable.merge( slowPublisher .subscribeOn(schedulerForWorkThread1) //cause publisher run in new thread .doOnNext(result -> r1.result = result) .doOnError(e -> r1.error = e) .toCompletable() //so can mergeWith other type rx .onErrorComplete() //auto call complete when error , incompatiblePublisher .subscribeOn(schedulerForWorkThread2) .doOnNext(result -> r2.result = result) .doOnError(e -> r2.error = e) .toCompletable() //so can mergeWith other type rx .onErrorComplete() //auto call complete when error , buggyPublisher .subscribeOn(schedulerForWorkThread3) .doOnNext(result -> r3.result = result) .doOnError(e -> r3.error = e) .toCompletable() //so can mergeWith other type rx .onErrorComplete() //auto call complete when error ) .await(/*can specify total timeout*/); sleep(1); //sleep 1ms to let other thread run so can get predictable output println("---- got all result: {" + r1.result + "}, {" + r2.result + "}, {" + r3.result + "}"); println("---- got all error: {" + r1.error + "}, {" + r2.error + "}, {" + r3.error + "}"); println("Leave test function"); }, "CurrentThread" /*threadName*/).start(); assertOut("13:47:50.789 @CurrentThread Enter test function"); assertOut("13:47:50.852 @RxWorkThread1 [SLOW publisher] begin"); assertOut("13:47:50.852 @RxWorkThread1 [SLOW publisher] do some work"); assertOut("13:47:50.862 @RxWorkThread2 [Incompatible publisher] begin"); assertOut("13:47:50.863 @RxWorkThread2 [Incompatible publisher] do some work"); assertOut("13:47:50.869 @RxWorkThread3 [Buggy publisher] begin"); assertOut("13:47:52.868 @RxWorkThread2 [Incompatible publisher] end"); assertOut("13:47:53.854 @RxWorkThread1 [SLOW publisher] publish"); assertOut("13:47:53.854 @RxWorkThread1 [SLOW publisher] end"); assertOut("13:47:53.856 @CurrentThread ---- got all result: {SLOW result}, {Incompatible result}, {null}"); assertOut("13:47:53.856 @CurrentThread ---- got all error: {null}, {null}, {java.lang.NullPointerException}"); assertOut("13:47:53.856 @CurrentThread Leave test function"); }

绝对还有其他方法。RxJava构建了一个自己的世界,要折腾什么样子的都不稀奇,不过些复杂了就不好懂,光看暧昧的方法调用其实无法让人确信如自己希望的那样动作。

 

转载于:https://my.oschina.net/osexp2003/blog/688428

你可能感兴趣的文章
我作为一个面试官的感想
查看>>
【深度学习】L1正则化和L2正则化
查看>>
String 源码探究
查看>>
linux apache服务器
查看>>
QuickXDev插件自己主动升级后player no exist
查看>>
react-container-query
查看>>
HBase – 探索HFile索引机制
查看>>
C#构造方法(函数) C#方法重载 C#字段和属性 MUI实现上拉加载和下拉刷新 SVN常用功能介绍(二) SVN常用功能介绍(一) ASP.NET常用内置对象之——Server s...
查看>>
时间见证着—eternal life
查看>>
spring mvc: log4j插件 log日志的输出
查看>>
记一次解决layui 的bug - layer.open 与 layui渲染问题
查看>>
利用SMB jcifs实现对windows中的共享文件夹的操作
查看>>
Spring(十七):Spring AOP(一):简介
查看>>
html5常用属性text-shadow、vertical-align、background如何使用
查看>>
写给大忙人的centos下ftp服务器搭建(以及启动失败/XFTP客户端一直提示“用户身份验证失败”解决方法)...
查看>>
ASP.NET Core 2 学习笔记(十四)Filters
查看>>
Install FileZilla in Ubuntu16.04
查看>>
arm GIC介绍之四【转】
查看>>
Linux并发与同步专题 (2)spinlock
查看>>
性能优化7--App瘦身
查看>>