Rx中的线程切换

初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程
首先需要搞懂在RxJava.subscribeOn()observeOn() 之间的区别:

  • .subscribeOn() 用来指定Observable应该操作的调度器(Scheduler)
  • .observeOn() 指定 Observable在一个指定的调度器(Scheduler)上给观察者发送通知
  • 默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致

没理解?

英文原文: https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.nn1m7lrb8</br>
翻译: hanks
注: 不是完全翻译,添加了具体例子

例子

1.主线程 / .subscribe() 线程

在 Activity的 onCreate()(主线程) 方法中添加以下代码:

Observable.just(1,2,3)
  .subscribe();

调用情况如下:

图片
图片

实验:

Observable.just(1,2,3)
            .doOnNext(new Action1<Integer>() {
                @Override public void call(Integer integer) {
                    Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override public void call(Integer integer) {
                    Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
                }
            });

输出结果:

12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :main
12-06 16:14:39.225 15603-15603/com.hanks.rxsearch I/RxThread: get result:3, run In :main

2. .subscribeOn()

即使你在主线程中添加下面的代码,但是整段代码将运行在 .subscribeOn()定义的线程上

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .subscribe();
图片
图片

实验:

Observable.just(1,2,3)
           .doOnNext(new Action1<Integer>() {
               @Override public void call(Integer integer) {
                   Log.i("RxThread", "doOnNext:" + integer +", run In :" + Thread.currentThread().getName() );
               }
           })
           .subscribeOn(Schedulers.newThread())
           .subscribe(new Action1<Integer>() {
               @Override public void call(Integer integer) {
                   Log.i("RxThread", "get result:" + integer +", run In :" + Thread.currentThread().getName() );
               }
           });

输出结果:

12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:1, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:2, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:13:17.717 14294-14319/com.hanks.rxsearch I/RxThread: get result:3, run In :RxNewThreadScheduler-1

3. .observeOn()

加入在主线程中添加下面的代码,首先 Observable 将在 .subscribe() 的线程上创建,但是 .observeOn()方法被调用之后,代码将运行在指定的线程上:

Observable.just(1,2,3)
  .observeOn(Schedulers.newThread())
  .subscribe();
图片
图片

实验:

new Thread() {
           @Override public void run() {
               Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Action1<Integer>() {
                   @Override public void call(Integer integer) {
                       Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                               .getName());
                   }
               });

           }
       }.start();

输出结果:

12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :Thread-155
12-06 16:18:06.493 18584-18606/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :Thread-155
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:18:06.521 18584-18584/com.hanks.rxsearch I/RxThread: get result:3, run In :main

3. Combined logic

由于操作可以被组合使用,于是有了下面的代码:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .observeOn(Schedulers.newThread())
  .subscribe();
图片
图片

实验:

new Thread() {
     @Override public void run() {
         Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {
             @Override public void call(Integer integer) {
                 Log.i("RxThread", "doOnNext:" + integer + ", run In :" + Thread.currentThread()
                         .getName());
             }
         })
         .subscribeOn(Schedulers.newThread())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(new Action1<Integer>() {
             @Override public void call(Integer integer) {
                 Log.i("RxThread", "get result:" + integer + ", run In :" + Thread.currentThread()
                         .getName());
             }
         });
     }
 }.start();

输出结果:

12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:1, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:2, run In :RxNewThreadScheduler-1
12-06 16:19:53.066 20247-20274/com.hanks.rxsearch I/RxThread: doOnNext:3, run In :RxNewThreadScheduler-1
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:1, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:2, run In :main
12-06 16:19:53.077 20247-20247/com.hanks.rxsearch I/RxThread: get result:3, run In :main

Tips / Gotchas:

1. “UI线程运行异常”

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .subscribe(/** logic which touches ui **//); //在newThread中调用

obviously.

2. 逻辑处理放在后台(newThread)

错误姿势:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .observeOn(AndroidSchedulers.mainThread())
  .flatMap(/** logic which doesn't touch ui **//)
  .subscribe();

实验:

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.newThread())
           .observeOn(AndroidSchedulers.mainThread())
           .flatMap(new Func1<String, Observable<String>>() {
               @Override public Observable<String> call(String str) {
                   Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                   return Observable.from(str.split("-") ); // 返回平方
               }
           })
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

输出结果:

12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:43:00.181 8161-8190/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[main,5,main]
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:43:00.243 8161-8161/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

正确姿势:

Observable.just(1,2,3)
  .subscribeOn(Schedulers.newThread())
  .flatMap(/** logic which doesn't touch ui **//)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe();

第二段代码中 flatMap (或者其他逻辑处理)将运行在后台线程, 如果是在Android中,这样做不会阻塞UI,阻塞UI的话有可能导致ANR之类的异常。这跟 AsyncTask中的 doInBackground()类似,在 doInBackground()中做耗时操作

实验:

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.newThread())
           .flatMap(new Func1<String, Observable<String>>() {
               @Override public Observable<String> call(String str) {
                   Log.i("RxThread", "flatMap:" + str + ", run In :" + Thread.currentThread());
                   return Observable.from(str.split("-") ); // 返回平方
               }
           })
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

输出结果:

12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Picasso, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Glide, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:41:27.025 6812-6839/com.hanks.rxsearch I/RxThread: flatMap:Android-Fresco, run In :Thread[RxNewThreadScheduler-1,5,main]
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Picasso, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Glide, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Android, run In :main
12-06 16:41:27.043 6812-6812/com.hanks.rxsearch I/RxThread: get result:Fresco, run In :main

3. 最早的 .subscribeOn() 生效

看下面的代码:

Observable.just(1,2,3)
  .subscribeOn(thread1)
  .subscribeOn(thread2)
  .subscribe();

Observable 的创建和 .subscribeOn() 的调用都将在 thread1 上面执行,所以没有必要多次调用 .subscribeOn(),因为只有第一次的是有用的。

实验:

new Thread() {
      @Override public void run() {
          Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
              @Override public void call(String str) {
                  Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                          .getName());
              }
          })
          .subscribeOn(Schedulers.newThread())
          .subscribeOn(Schedulers.io())
          .subscribeOn(Schedulers.computation())
          .subscribe(new Action1<String>() {
              @Override public void call(String str) {
                  Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                          .getName());
              }
          });
      }
  }.start();

输出结果

12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxNewThreadScheduler-1
12-06 16:51:17.581 15622-15652/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxNewThreadScheduler-1

实验

new Thread() {
       @Override public void run() {
           Observable.just("Android-Picasso", "Android-Glide", "Android-Fresco").doOnNext(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "doOnNext:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           })
           .subscribeOn(Schedulers.io())
           .subscribeOn(Schedulers.newThread())
           .subscribeOn(Schedulers.computation())
           .subscribe(new Action1<String>() {
               @Override public void call(String str) {
                   Log.i("RxThread", "get result:" + str + ", run In :" + Thread.currentThread()
                           .getName());
               }
           });
       }
   }.start();

输出结果

12-06 16:52:13.378 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Picasso, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Glide, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: doOnNext:Android-Fresco, run In :RxCachedThreadScheduler-2
12-06 16:52:13.379 16424-16454/com.hanks.rxsearch I/RxThread: get result:Android-Fresco, run In :RxCachedThreadScheduler-2

Android Rxjava Rxandroid

文章出处 (http://hanks.xyz)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容