RxJava
更新日期:
前言:RxJava火了很长时间,最近特地研究了一下这个库,感觉真特么牛逼啊。简直是异步神器,有了它,妈妈再也不用担心我使用AsyncTask或者维护异步操作线程池之类的事了。
什么是RxJava?
说到RxJava,必须先说ReactiveX。Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。Rx使用Observable序列组合异步和基于事件的程序。ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。这里有详细中文介绍:https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html 。RxJava是 ReactiveX 在JVM上的一个实现。
RxJava剖析
在我看来,RxJava是一种新的编程方法。它由多个角色共同组合操作完成。其中包括:Observer(观察者)、Observable(可观察者或者叫被观察者)、Operator(操作,在Observable内部对输入数据的处理)、Scheduler(调度者)。
- Observer,观察者。负责获取Observable处理返回的数据,并对返回的数据进行处理。
- Observable,可观察者或者叫被观察者。负责调用一个或连续多个Operator构造特定行为的Observable(Operator是函数方法,在这里需要意识到,Observable执行的动作,取决于Operator和传入的参数)。最后,调用subscribe()使自己被Observer订阅。
- Operator,操作。所有Operator都是Observable的静态函数,此方法返回Observable对象。调用Operator过程中,有的Operator传入数据或事件,有的传入带有call()方法的OnSubscribe对象。Observable利用这些传入的数据、事件、OnSubscribe做它需要做的事。在Observable处理完毕后,会将结果返回给Observer或者返回给下一个Operator。
- Scheduler,调度者。负责设置观察者和被观察者代码执行的调度器。这一点是RxJava亮度功能。也就是通过调用subscribeOn()和observeOn()来设置Observable和Observer执行的线程。RxJava自带如下几个Scheduler:Schedulers.immediate(),默认Scheduler,在当前线程运行;Schedulers.newThread(),开启新线程执行操作;Schedulers.io(),执行IO操作的线程,内部是一个无上限个数的线程池;Schedulers.computation(),执行大量计算的线程,内部是一个固定大小的线程池。另外,RxAndroid带了一个Android特性的库:AndroidSchedulers.mainThread(),Android主线程。
RxJava使用
RxJava自带了很多的Operator,我们可以使用创建、变换、过滤等多个Operator,高效地完成想要做的事。并且,结构非常清晰。
创建操作
用于创建Observable的操作
Create — 通过调用观察者的方法从头创建一个Observable
Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
Empty/Never/Throw — 创建行为受限的特殊Observable
From — 将其它的对象或数据结构转换为Observable
Interval — 创建一个定时发射整数序列的Observable
Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
Range — 创建发射指定范围的整数序列的Observable
Repeat — 创建重复发射特定的数据或数据序列的Observable
Start — 创建发射一个函数的返回值的Observable
Timer — 创建在一个指定的延迟之后发射单个数据的Observable
如上图所示,官方API支持这些创建操作。这里使用Create和Just举例。
Create
使用一个函数从头开始创建一个Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | final String bitmapUrl = "https://www.android.com/static/img/ready-for-you/hero-1024.png"; //可观察者 rx.Observable<Bitmap> observable = rx.Observable.create(new rx.Observable.OnSubscribe<Bitmap>() { @Override public void call(Subscriber<? super Bitmap> subscriber) { Request request = new Request.Builder().url(bitmapUrl).build(); try { Response response = okHttpClient.newCall(request).execute(); InputStream is = response.body().byteStream(); Bitmap bitmap = BitmapFactory.decodeStream(is); is.close(); subscriber.onNext(bitmap); subscriber.onCompleted(); } catch (IOException e) { e.printStackTrace(); subscriber.onError(e.getCause()); } } }); //观察者 Observer<Bitmap> observer = new Observer<Bitmap>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Bitmap o) { imageView.setImageBitmap(o); } }; //建立订阅关系 Subscription sp = observable.subscribe(observer); |
上述代码,是一个Create操作的完整流程,功能是获取Android官网read-for-you的图片(用到了OkHttp库获取图片资源)。订阅流程是:构造Observable—>构造Observer—>调用subscribe建立订阅关系。执行流程是:执行Observable中的OnSubscribe—>结果通知给Observer。注意:onCompleted()、onError()、onNext()都是Observer接受Observable处理结果回调的。在一切执行完毕后,调用sp.unsubscribe()解除订阅,避免内存泄露。
在上述代码中,没有显式指定Observable和Observer运行的调度器,默认不在任何特定的调度器上执行。很显然,网络获取图片数据应该在异步线程中执行,imageView设置bitmap应该在UI线程执行。正确代码如下:
1 2 3 | //建立订阅关系 Subscription sp = observable.subscribeOn(Schedulers.io()). observeOn(AndroidSchedulers.mainThread()).subscribe(observer); |
Just
创建一个发射指定值的Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | rx.Observable.just("welcome", "to", "Android").subscribe(new Subscriber<String>() { @Override public void onStart() { //在开始前回调,做准备工作 super.onStart(); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(TAG, s); } }); |
上述代码,用到了just操作。功能是依次打印welcome to Android。subscribe()传入的参数Subscriber是Observer的子类。另外,RxJava还提供了更简洁的实现方式,如下。
1 2 3 4 5 6 | rx.Observable.just("welcome", "to", "Android").subscribe(new Action1<String>() { @Override public void call(String s) { Log.i(TAG, s); } }); |
在上面代码中,subscribe()传入的Action1是负责处理onNext的工作。onCompleted和onError默认不做任何处理。当然,也可以传入处理onCompleted和onError的Action。代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | rx.Observable.just("welcome", "to", "Android").subscribe( new Action1<String>() { //处理onNext @Override public void call(String s) { Log.i(TAG, s); } }, new Action1<Throwable>() { //处理onError @Override public void call(Throwable throwable) { } }, new Action0() { //处理onCompleted @Override public void call() { } }); |
变换操作
变换操作也是RxJava的一大亮点。它能将输入的数据或事件进行变换操作,转换成另外的数据的数据或事件。RxJava自带的变换操作如下:
map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列
flatMap( ), concatMap( ), and flatMapIterable( ) — 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
switchMap( ) — 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
groupBy( ) — 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
buffer( ) — 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
cast( ) — 在发射之前强制将Observable发射的所有数据转换为指定类型
变换操作能解决实际开发中的很多问题,下面以map()和groupBy()为例。
map
对Observable发射的每一项数据应用一个函数,执行变换操作。
1 2 3 4 5 6 7 8 9 10 11 12 | String[] words = new String[]{"welcome", "to", "Android"}; rx.Observable.from(words).map(new Func1<String, String>() { @Override public String call(String s) { return s.toUpperCase(); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.i("", s); } }); |
上述代码片段使用了from和map两个Operator,功能是将welcome to Android转换成大写并打印出来。from是将String[]转换成Observable—>map的Func执行转换操作,将传入的的字符串转换成大写—>在Observer中打印输出。注意,map不仅仅只支持同类型之间的转换,任意类型都可以。比如将图片Uri转换成Bitmap,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | String uri = "https://www.android.com/static/img/ready-for-you/hero-1024.png"; rx.Observable.just(uri).subscribeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() { @Override public Bitmap call(String s) { Request request = new Request.Builder().url(s).build(); try { Response response = okHttpClient.newCall(request).execute(); InputStream is = response.body().byteStream(); Bitmap bitmap = BitmapFactory.decodeStream(is); is.close(); return bitmap; } catch (IOException e) { e.printStackTrace(); } return null; }}) .observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { if (bitmap != null) { imageView.setImageBitmap(bitmap); } } }); |
上述代码片段中,just在android ui线程执行,map转换uri为bitmap在Schedulers.io()线程执行,Observer在android ui线程执行。
groupBy
将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | Integer[] items = new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; rx.Observable.from(items) .groupBy(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { //此处返回integer所属组的key return integer % 2 == 0; } }) .subscribe(new Action1<GroupedObservable<Boolean, Integer>>() { @Override public void call(GroupedObservable<Boolean, Integer> booleanIntegerGroupedObservable) { boolean key = booleanIntegerGroupedObservable.getKey(); if (key) { //当前的GroupedObservable偶数分组 booleanIntegerGroupedObservable.cache().subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("", integer + ""); } }); } else { //当前GroupedObservable为奇数分组,丢弃 booleanIntegerGroupedObservable.take(0); } } } ); |
上述代码片段使用的是groupBy操作。功能是将0-9数字分组成偶数和奇数,并且打印偶数,奇数不处理。Func1返回integer的key,groupBy根据key判断integer属于哪个组,然后将integer发送给相应的GroupedObservable。
过滤操作
过滤操作符可用于过滤和选择Observable发射的数据序列。RxJava自带如下过滤操作。
filter( ) — 过滤数据
takeLast( ) — 只发射最后的N项数据
last( ) — 只发射最后的一项数据
lastOrDefault( ) — 只发射最后的一项数据,如果Observable为空就发射默认值
takeLastBuffer( ) — 将最后的N项数据当做单个数据发射
skip( ) — 跳过开始的N项数据
skipLast( ) — 跳过最后的N项数据
take( ) — 只发射开始的N项数据
first( ) and takeFirst( ) — 只发射第一项数据,或者满足某种条件的第一项数据
firstOrDefault( ) — 只发射第一项数据,如果Observable为空就发射默认值
elementAt( ) — 发射第N项数据
elementAtOrDefault( ) — 发射第N项数据,如果Observable数据少于N项就发射默认值
sample( ) or throttleLast( ) — 定期发射Observable最近的数据
throttleFirst( ) — 定期发射Observable发射的第一项数据
throttleWithTimeout( ) or debounce( ) — 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据
timeout( ) — 如果在一个指定的时间段后还没发射数据,就发射一个异常
distinct( ) — 过滤掉重复数据
distinctUntilChanged( ) — 过滤掉连续重复的数据
ofType( ) — 只发射指定类型的数据
ignoreElements( ) — 丢弃所有的正常数据,只发射错误或完成通知
下面以filter为例。
filter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | Observable.from(items).subscribeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.computation()) //指定过滤操作在Schedulers.computation()上执行 .filter(new Func1<Integer, Boolean>() { //执行过滤操作 @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }) .observeOn(AndroidSchedulers.mainThread()) //指定ui线程执行Observer .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { //处理过滤的数据 Log.i("", "" + integer); } }); |
上述代码片段过滤0-9中的偶数,并且在Observer中打印出偶数。
结合操作
顾名思义,结合操作就是组合多个Observable,RxJava有下面这些结合操作。
startWith( ) — 在数据序列的开头增加一项数据
merge( ) — 将多个Observable合并为一个
mergeDelayError( ) — 合并多个Observables,让没有错误的Observable都完成后再发射错误通知
zip( ) — 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
and( ), then( ), and when( ) — (rxjava-joins) 通过模式和计划组合多个Observables发射的数据集合
combineLatest( ) — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
join( ) and groupJoin( ) — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
switchOnNext( ) — 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据
doOnSubscribe
doOnSubscribe是Observable的另一个好用的函数(类似Subscriber的onStart()方法,但onStart()不能指定Scheduler)。它可以指定一个Action0在Operator执行之前回调,并且可以指定回调Action0的Scheduler。如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | String uri = "https://www.android.com/static/img/ready-for-you/hero-1024.png"; rx.Observable.just(uri).subscribeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(new Action0() { @Override public void call() { imageView.setImageBitmap(defBitmap); } }).subscribeOn(AndroidSchedulers.mainThread()) //指定doOnSubscribe中Action0的Scheduler为UI线程 .observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() { @Override public Bitmap call(String s) { Request request = new Request.Builder().url(s).build(); try { Response response = okHttpClient.newCall(request).execute(); InputStream is = response.body().byteStream(); Bitmap bitmap = BitmapFactory.decodeStream(is); is.close(); return bitmap; } catch (IOException e) { e.printStackTrace(); } return null; }}) .observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { if (bitmap != null) { imageView.setImageBitmap(bitmap); } } }); |
上述代码片段的功能是imageView显示read-for-you图片。但在获取图片之前,在doOnSubscribe中先显示了默认的图片。
运用
github地址:https://github.com/ReactiveX/RxJava RxJava作为一种响应式编程方法,确实在异步、数据流、事件序列上表现非常优秀:简洁、高效、易用、灵活。作为一个Android开发者,我觉得完全可以将RxJava作为一个基础架构库来构建我们的应用程序。