RxJava 常见使用场景

361 字
这篇文章收集了 RxJava 常见的使用场景以及示例。

使用场景

1. 后台线程取数据(网络请求、文件访问等),主线程展示(界面更新)[ subscribeOn() , observeOn() ]

java
Observable.just(1,2,3,4)
 .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
 .subscribe(new Action1<Integer>() {
    @Override
    public void call (Integer number){
        Log.d(tag, "number:" + number);
    }
});
Observable.just(1,2,3,4)
 .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
 .subscribe(new Action1<Integer>() {
    @Override
    public void call (Integer number){
        Log.d(tag, "number:" + number);
    }
});

2. 获取数据时先检查缓存 [ concat() ]

例:先检查内存中是否有缓存,再检查是否有文件缓存,最后从网络获取。如果找到了缓存,就不会执行后面的操作。

java
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (memoryCache != null) {
            subscriber.onNext(memoryCache);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        String cachePref = rxPreferences.getString("cache").get();
        if (!TextUtils.isEmpty(cachePref)) {
            subscriber.onNext(cachePref);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> network = Observable.just("network");

Observable.

concat(memory, disk, network)
        .

first()
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(s ->{
memoryCache ="memory";
        System.out.

println("--------------subscribe: "+s);
        });
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (memoryCache != null) {
            subscriber.onNext(memoryCache);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        String cachePref = rxPreferences.getString("cache").get();
        if (!TextUtils.isEmpty(cachePref)) {
            subscriber.onNext(cachePref);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> network = Observable.just("network");

Observable.

concat(memory, disk, network)
        .

first()
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(s ->{
memoryCache ="memory";
        System.out.

println("--------------subscribe: "+s);
        });

3. 等待多个请求完成后再进行操作,或将多个请求返回的数据合并 [ merge() ]

java
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.

merge(observable1, observable2)
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(System.out::println);
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.

merge(observable1, observable2)
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(System.out::println);

4. 一个请求依赖另一个请求返回的数据(消除嵌套回调)

例:登陆后根据拿到的 token 获取消息列表

java
NetworkService.getToken("username","password")
        .

flatMap(s ->NetworkService.

getMessage(s))
        .

subscribe(s ->{
        System.out.

println("message: "+s);
        });
NetworkService.getToken("username","password")
        .

flatMap(s ->NetworkService.

getMessage(s))
        .

subscribe(s ->{
        System.out.

println("message: "+s);
        });

5. 轮询请求 [ schedulePerodically() ]

java
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call ( final Subscriber<? super String> observer){

        Schedulers.newThread().createWorker()
                .schedulePeriodically(new Action0() {
                    @Override
                    public void call() {
                        observer.onNext(doNetworkCallAndGetStringResult());
                    }
                }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
    }
}).

subscribe(new Action1<String>() {
    @Override
    public void call (String s){
        log.d(tag, "polling….”);
    }
});
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call ( final Subscriber<? super String> observer){

        Schedulers.newThread().createWorker()
                .schedulePeriodically(new Action0() {
                    @Override
                    public void call() {
                        observer.onNext(doNetworkCallAndGetStringResult());
                    }
                }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
    }
}).

subscribe(new Action1<String>() {
    @Override
    public void call (String s){
        log.d(tag, "polling….”);
    }
});

6. 界面按钮防止连续点击 [ throttleFirst() ]

java
RxView.clicks(findViewById(R.id.btn_throttle))
        .

throttleFirst(1,TimeUnit.SECONDS)
        .

subscribe(aVoid ->{
        System.out.

println("click");
        });
RxView.clicks(findViewById(R.id.btn_throttle))
        .

throttleFirst(1,TimeUnit.SECONDS)
        .

subscribe(aVoid ->{
        System.out.

println("click");
        });

7. 响应式界面

例:勾选了某个 checkbox ,自动更新对应的 preference

java
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.

checkedChanges(checkBox)
        .

subscribe(checked.asAction());
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.

checkedChanges(checkBox)
        .

subscribe(checked.asAction());

8. 减少频繁的请求 [ debounce() ]

例:带有自动联想的搜索框,避免每输入或删除一个字就做一次联想

java
RxTextView.textChangeEvents(inputEditText)
        .

debounce(400,TimeUnit.MILLISECONDS)
        .

observeOn(AndroidSchedulers.mainThread())
        .

subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted () {
        log.d(tag, "onComplete");
    }

    @Override
    public void onError (Throwable e){
        log.d(tag, "Error");
    }

    @Override
    public void onNext (TextViewTextChangeEvent onTextChangeEvent){
        log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});
RxTextView.textChangeEvents(inputEditText)
        .

debounce(400,TimeUnit.MILLISECONDS)
        .

observeOn(AndroidSchedulers.mainThread())
        .

subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted () {
        log.d(tag, "onComplete");
    }

    @Override
    public void onError (Throwable e){
        log.d(tag, "Error");
    }

    @Override
    public void onNext (TextViewTextChangeEvent onTextChangeEvent){
        log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});

9. 定时操作 [ timer() ]、周期性操作 [ interval() ]

例:每隔 2 秒输出日志“hello world”

java
Observable.interval(2,TimeUnit.SECONDS)  
 .

subscribe(new Observer<Long>() {
    @Override
    public void onCompleted () {
        log.d("completed");
    }

    @Override
    public void onError (Throwable e){
        log.e("error");
    }

    @Override
    public void onNext (Long number){
        log.d("hello world");
    }
});
Observable.interval(2,TimeUnit.SECONDS)  
 .

subscribe(new Observer<Long>() {
    @Override
    public void onCompleted () {
        log.d("completed");
    }

    @Override
    public void onError (Throwable e){
        log.e("error");
    }

    @Override
    public void onNext (Long number){
        log.d("hello world");
    }
});

10. 数据变换 ,例如过滤掉不符合条件的数据 [ filter() ]、去掉重复的数据 [ distinct() ]、取出前几个数据 [ take() ] 等

java
Observable.just("1","2","2","3","4","5")
        .

map(Integer::parseInt)
        .

filter(s ->s >1)
        .

distinct()
        .

take(3)
        .

reduce((integer, integer2) ->integer.

intValue() +integer2.

intValue())
        .

subscribe(System.out::println);
Observable.just("1","2","2","3","4","5")
        .

map(Integer::parseInt)
        .

filter(s ->s >1)
        .

distinct()
        .

take(3)
        .

reduce((integer, integer2) ->integer.

intValue() +integer2.

intValue())
        .

subscribe(System.out::println);

RxJava-Android-Samples

RxJava-Android-Samples:https://github.com/kaushikgopal/RxJava-Android-Samples

这个仓库收集了一些使用 RxJava 开发 Android 应用的真实存在的、有用的例子,可以帮助了解 RxJava 在 Android 开发中的应用场景。

评论

后继续评论需要管理员审核后可见

暂无评论,来发表第一条评论吧