文章目录
  1. 1. 什么是RxJava?
  2. 2. RxJava剖析
  3. 3. RxJava使用
    1. 3.1. 创建操作
      1. 3.1.1. Create
      2. 3.1.2. Just
    2. 3.2. 变换操作
      1. 3.2.1. map
      2. 3.2.2. groupBy
    3. 3.3. 过滤操作
      1. 3.3.1. filter
    4. 3.4. 结合操作
    5. 3.5. doOnSubscribe
  4. 4. 运用

前言:RxJava火了很长时间,最近特地研究了一下这个库,感觉真特么牛逼啊。简直是异步神器,有了它,妈妈再也不用担心我使用AsyncTask或者维护异步操作线程池之类的事了。

什么是RxJava?

说到RxJava,必须先说ReactiveX。Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。Rx使用Observable序列组合异步和基于事件的程序。ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。这里有详细中文介绍:https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html 。RxJava是 ReactiveX 在JVM上的一个实现。

RxJava剖析

在我看来,RxJava是一种新的编程方法。它由多个角色共同组合操作完成。其中包括:Observer(观察者)、Observable(可观察者或者叫被观察者)、Operator(操作,在Observable内部对输入数据的处理)、Scheduler(调度者)。

  • Observer,观察者。负责获取Observable处理返回的数据,并对返回的数据进行处理。
  • Observable,可观察者或者叫被观察者。负责调用一个或连续多个Operator构造特定行为的Observable(Operator是函数方法,在这里需要意识到,Observable执行的动作,取决于Operator和传入的参数)。最后,调用subscribe()使自己被Observer订阅。
  • Operator,操作。所有Operator都是Observable的静态函数,此方法返回Observable对象。调用Operator过程中,有的Operator传入数据或事件,有的传入带有call()方法的OnSubscribe对象。Observable利用这些传入的数据、事件、OnSubscribe做它需要做的事。在Observable处理完毕后,会将结果返回给Observer或者返回给下一个Operator。
  • Scheduler,调度者。负责设置观察者和被观察者代码执行的调度器。这一点是RxJava亮度功能。也就是通过调用subscribeOn()和observeOn()来设置Observable和Observer执行的线程。RxJava自带如下几个Scheduler:Schedulers.immediate(),默认Scheduler,在当前线程运行;Schedulers.newThread(),开启新线程执行操作;Schedulers.io(),执行IO操作的线程,内部是一个无上限个数的线程池;Schedulers.computation(),执行大量计算的线程,内部是一个固定大小的线程池。另外,RxAndroid带了一个Android特性的库:AndroidSchedulers.mainThread(),Android主线程。

RxJava使用

RxJava自带了很多的Operator,我们可以使用创建、变换、过滤等多个Operator,高效地完成想要做的事。并且,结构非常清晰。

创建操作

用于创建Observable的操作
Create — 通过调用观察者的方法从头创建一个Observable
Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
Empty/Never/Throw — 创建行为受限的特殊Observable
From — 将其它的对象或数据结构转换为Observable
Interval — 创建一个定时发射整数序列的Observable
Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
Range — 创建发射指定范围的整数序列的Observable
Repeat — 创建重复发射特定的数据或数据序列的Observable
Start — 创建发射一个函数的返回值的Observable
Timer — 创建在一个指定的延迟之后发射单个数据的Observable
如上图所示,官方API支持这些创建操作。这里使用Create和Just举例。

Create

使用一个函数从头开始创建一个Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final String bitmapUrl = "https://www.android.com/static/img/ready-for-you/hero-1024.png";
//可观察者
rx.Observable<Bitmap> observable = rx.Observable.create(new rx.Observable.OnSubscribe<Bitmap>() {
    @Override
    public void call(Subscriber<? super Bitmap> subscriber) {
        Request request = new Request.Builder().url(bitmapUrl).build();
        try {
            Response response = okHttpClient.newCall(request).execute();
            InputStream is = response.body().byteStream();
            Bitmap bitmap = BitmapFactory.decodeStream(is);
            is.close();

            subscriber.onNext(bitmap);
            subscriber.onCompleted();
        } catch (IOException e) {
            e.printStackTrace();
            subscriber.onError(e.getCause());
        }
    }
});
//观察者
Observer<Bitmap> observer = new Observer<Bitmap>() {
    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Bitmap o) {
        imageView.setImageBitmap(o);
    }
};
//建立订阅关系
Subscription sp = observable.subscribe(observer);

上述代码,是一个Create操作的完整流程,功能是获取Android官网read-for-you的图片(用到了OkHttp库获取图片资源)。订阅流程是:构造Observable—>构造Observer—>调用subscribe建立订阅关系。执行流程是:执行Observable中的OnSubscribe—>结果通知给Observer。注意:onCompleted()、onError()、onNext()都是Observer接受Observable处理结果回调的。在一切执行完毕后,调用sp.unsubscribe()解除订阅,避免内存泄露。
在上述代码中,没有显式指定Observable和Observer运行的调度器,默认不在任何特定的调度器上执行。很显然,网络获取图片数据应该在异步线程中执行,imageView设置bitmap应该在UI线程执行。正确代码如下:

1
2
3
//建立订阅关系
Subscription sp = observable.subscribeOn(Schedulers.io()).
        observeOn(AndroidSchedulers.mainThread()).subscribe(observer);

Just

创建一个发射指定值的Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
rx.Observable.just("welcome", "to", "Android").subscribe(new Subscriber<String>() {
    @Override
    public void onStart() { //在开始前回调,做准备工作
    	super.onStart();
    }
    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, s);
    }
});

上述代码,用到了just操作。功能是依次打印welcome to Android。subscribe()传入的参数Subscriber是Observer的子类。另外,RxJava还提供了更简洁的实现方式,如下。

1
2
3
4
5
6
rx.Observable.just("welcome", "to", "Android").subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, s);
    }
});

在上面代码中,subscribe()传入的Action1是负责处理onNext的工作。onCompleted和onError默认不做任何处理。当然,也可以传入处理onCompleted和onError的Action。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rx.Observable.just("welcome", "to", "Android").subscribe(
        new Action1<String>() { //处理onNext
            @Override
            public void call(String s) {
                Log.i(TAG, s);
            }
        }, new Action1<Throwable>() { //处理onError
            @Override
            public void call(Throwable throwable) {
            }
        }, new Action0() {  //处理onCompleted
            @Override
            public void call() {
            }
        });

变换操作

变换操作也是RxJava的一大亮点。它能将输入的数据或事件进行变换操作,转换成另外的数据的数据或事件。RxJava自带的变换操作如下:
map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列
flatMap( ), concatMap( ), and flatMapIterable( ) — 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
switchMap( ) — 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
groupBy( ) — 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
buffer( ) — 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
cast( ) — 在发射之前强制将Observable发射的所有数据转换为指定类型
变换操作能解决实际开发中的很多问题,下面以map()和groupBy()为例。

map

对Observable发射的每一项数据应用一个函数,执行变换操作。

1
2
3
4
5
6
7
8
9
10
11
12
String[] words = new String[]{"welcome", "to", "Android"};
rx.Observable.from(words).map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s.toUpperCase();
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i("", s);
    }
});

上述代码片段使用了from和map两个Operator,功能是将welcome to Android转换成大写并打印出来。from是将String[]转换成Observable—>map的Func执行转换操作,将传入的的字符串转换成大写—>在Observer中打印输出。注意,map不仅仅只支持同类型之间的转换,任意类型都可以。比如将图片Uri转换成Bitmap,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
String uri = "https://www.android.com/static/img/ready-for-you/hero-1024.png";
rx.Observable.just(uri).subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                Request request = new Request.Builder().url(s).build();
                try {
                    Response response = okHttpClient.newCall(request).execute();
                    InputStream is = response.body().byteStream();
                    Bitmap bitmap = BitmapFactory.decodeStream(is);
                    is.close();

                    return bitmap;
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null;
            }})
        .observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                if (bitmap != null) {
                    imageView.setImageBitmap(bitmap);
                }
            }
        });

上述代码片段中,just在android ui线程执行,map转换uri为bitmap在Schedulers.io()线程执行,Observer在android ui线程执行。

groupBy

将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Integer[] items = new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
rx.Observable.from(items)
        .groupBy(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) { //此处返回integer所属组的key
                return integer % 2 == 0;
            }
        })
        .subscribe(new Action1<GroupedObservable<Boolean, Integer>>() {
                       @Override
                       public void call(GroupedObservable<Boolean, Integer> booleanIntegerGroupedObservable) {
                           boolean key = booleanIntegerGroupedObservable.getKey();
                           if (key) {
                               //当前的GroupedObservable偶数分组
                               booleanIntegerGroupedObservable.cache().subscribe(new Action1<Integer>() {
                                   @Override
                                   public void call(Integer integer) {
                                       Log.d("", integer + "");
                                   }
                               });
                           } else {
                               //当前GroupedObservable为奇数分组,丢弃
                               booleanIntegerGroupedObservable.take(0);
                           }
                       }
                   }
        );

上述代码片段使用的是groupBy操作。功能是将0-9数字分组成偶数和奇数,并且打印偶数,奇数不处理。Func1返回integer的key,groupBy根据key判断integer属于哪个组,然后将integer发送给相应的GroupedObservable。

过滤操作

过滤操作符可用于过滤和选择Observable发射的数据序列。RxJava自带如下过滤操作。
filter( ) — 过滤数据
takeLast( ) — 只发射最后的N项数据
last( ) — 只发射最后的一项数据
lastOrDefault( ) — 只发射最后的一项数据,如果Observable为空就发射默认值
takeLastBuffer( ) — 将最后的N项数据当做单个数据发射
skip( ) — 跳过开始的N项数据
skipLast( ) — 跳过最后的N项数据
take( ) — 只发射开始的N项数据
first( ) and takeFirst( ) — 只发射第一项数据,或者满足某种条件的第一项数据
firstOrDefault( ) — 只发射第一项数据,如果Observable为空就发射默认值
elementAt( ) — 发射第N项数据
elementAtOrDefault( ) — 发射第N项数据,如果Observable数据少于N项就发射默认值
sample( ) or throttleLast( ) — 定期发射Observable最近的数据
throttleFirst( ) — 定期发射Observable发射的第一项数据
throttleWithTimeout( ) or debounce( ) — 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据
timeout( ) — 如果在一个指定的时间段后还没发射数据,就发射一个异常
distinct( ) — 过滤掉重复数据
distinctUntilChanged( ) — 过滤掉连续重复的数据
ofType( ) — 只发射指定类型的数据
ignoreElements( ) — 丢弃所有的正常数据,只发射错误或完成通知
下面以filter为例。

filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.from(items).subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.computation()) //指定过滤操作在Schedulers.computation()上执行
        .filter(new Func1<Integer, Boolean>() { //执行过滤操作
            @Override
            public Boolean call(Integer integer) {
                return integer % 2 == 0;
            }
        })
        .observeOn(AndroidSchedulers.mainThread()) //指定ui线程执行Observer
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(Integer integer) { //处理过滤的数据
                Log.i("", "" + integer);
            }
        });

上述代码片段过滤0-9中的偶数,并且在Observer中打印出偶数。

结合操作

顾名思义,结合操作就是组合多个Observable,RxJava有下面这些结合操作。
startWith( ) — 在数据序列的开头增加一项数据
merge( ) — 将多个Observable合并为一个
mergeDelayError( ) — 合并多个Observables,让没有错误的Observable都完成后再发射错误通知
zip( ) — 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
and( ), then( ), and when( ) — (rxjava-joins) 通过模式和计划组合多个Observables发射的数据集合
combineLatest( ) — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
join( ) and groupJoin( ) — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
switchOnNext( ) — 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

doOnSubscribe

doOnSubscribe是Observable的另一个好用的函数(类似Subscriber的onStart()方法,但onStart()不能指定Scheduler)。它可以指定一个Action0在Operator执行之前回调,并且可以指定回调Action0的Scheduler。如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
String uri = "https://www.android.com/static/img/ready-for-you/hero-1024.png";
rx.Observable.just(uri).subscribeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                imageView.setImageBitmap(defBitmap);
            }
        }).subscribeOn(AndroidSchedulers.mainThread()) //指定doOnSubscribe中Action0的Scheduler为UI线程
        .observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String s) {
                Request request = new Request.Builder().url(s).build();
                try {
                    Response response = okHttpClient.newCall(request).execute();
                    InputStream is = response.body().byteStream();
                    Bitmap bitmap = BitmapFactory.decodeStream(is);
                    is.close();

                    return bitmap;
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null;
            }})
        .observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                if (bitmap != null) {
                    imageView.setImageBitmap(bitmap);
                }
            }
        });

上述代码片段的功能是imageView显示read-for-you图片。但在获取图片之前,在doOnSubscribe中先显示了默认的图片。

运用

github地址:https://github.com/ReactiveX/RxJava RxJava作为一种响应式编程方法,确实在异步、数据流、事件序列上表现非常优秀:简洁、高效、易用、灵活。作为一个Android开发者,我觉得完全可以将RxJava作为一个基础架构库来构建我们的应用程序。

文章目录
  1. 1. 什么是RxJava?
  2. 2. RxJava剖析
  3. 3. RxJava使用
    1. 3.1. 创建操作
      1. 3.1.1. Create
      2. 3.1.2. Just
    2. 3.2. 变换操作
      1. 3.2.1. map
      2. 3.2.2. groupBy
    3. 3.3. 过滤操作
      1. 3.3.1. filter
    4. 3.4. 结合操作
    5. 3.5. doOnSubscribe
  4. 4. 运用