RxJava 常见使用场景
这篇文章收集了 RxJava 常见的使用场景以及示例。
使用场景
1. 后台线程取数据(网络请求、文件访问等),主线程展示(界面更新)[ subscribeOn() , observeOn() ]
java
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call (Integer number){
Log.d(tag, "number:" + number);
}
});
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call (Integer number){
Log.d(tag, "number:" + number);
}
});
2. 获取数据时先检查缓存 [ concat() ]
例:先检查内存中是否有缓存,再检查是否有文件缓存,最后从网络获取。如果找到了缓存,就不会执行后面的操作。
java
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
Observable.
concat(memory, disk, network)
.
first()
.
subscribeOn(Schedulers.newThread())
.
subscribe(s ->{
memoryCache ="memory";
System.out.
println("--------------subscribe: "+s);
});
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
Observable.
concat(memory, disk, network)
.
first()
.
subscribeOn(Schedulers.newThread())
.
subscribe(s ->{
memoryCache ="memory";
System.out.
println("--------------subscribe: "+s);
});
3. 等待多个请求完成后再进行操作,或将多个请求返回的数据合并 [ merge() ]
java
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.
merge(observable1, observable2)
.
subscribeOn(Schedulers.newThread())
.
subscribe(System.out::println);
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.
merge(observable1, observable2)
.
subscribeOn(Schedulers.newThread())
.
subscribe(System.out::println);
4. 一个请求依赖另一个请求返回的数据(消除嵌套回调)
例:登陆后根据拿到的 token 获取消息列表
java
NetworkService.getToken("username","password")
.
flatMap(s ->NetworkService.
getMessage(s))
.
subscribe(s ->{
System.out.
println("message: "+s);
});
NetworkService.getToken("username","password")
.
flatMap(s ->NetworkService.
getMessage(s))
.
subscribe(s ->{
System.out.
println("message: "+s);
});
5. 轮询请求 [ schedulePerodically() ]
java
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call ( final Subscriber<? super String> observer){
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).
subscribe(new Action1<String>() {
@Override
public void call (String s){
log.d(tag, "polling….”);
}
});
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call ( final Subscriber<? super String> observer){
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).
subscribe(new Action1<String>() {
@Override
public void call (String s){
log.d(tag, "polling….”);
}
});
6. 界面按钮防止连续点击 [ throttleFirst() ]
java
RxView.clicks(findViewById(R.id.btn_throttle))
.
throttleFirst(1,TimeUnit.SECONDS)
.
subscribe(aVoid ->{
System.out.
println("click");
});
RxView.clicks(findViewById(R.id.btn_throttle))
.
throttleFirst(1,TimeUnit.SECONDS)
.
subscribe(aVoid ->{
System.out.
println("click");
});
7. 响应式界面
例:勾选了某个 checkbox ,自动更新对应的 preference
java
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.
checkedChanges(checkBox)
.
subscribe(checked.asAction());
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.
checkedChanges(checkBox)
.
subscribe(checked.asAction());
8. 减少频繁的请求 [ debounce() ]
例:带有自动联想的搜索框,避免每输入或删除一个字就做一次联想
java
RxTextView.textChangeEvents(inputEditText)
.
debounce(400,TimeUnit.MILLISECONDS)
.
observeOn(AndroidSchedulers.mainThread())
.
subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted () {
log.d(tag, "onComplete");
}
@Override
public void onError (Throwable e){
log.d(tag, "Error");
}
@Override
public void onNext (TextViewTextChangeEvent onTextChangeEvent){
log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
}
});
RxTextView.textChangeEvents(inputEditText)
.
debounce(400,TimeUnit.MILLISECONDS)
.
observeOn(AndroidSchedulers.mainThread())
.
subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted () {
log.d(tag, "onComplete");
}
@Override
public void onError (Throwable e){
log.d(tag, "Error");
}
@Override
public void onNext (TextViewTextChangeEvent onTextChangeEvent){
log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
}
});
9. 定时操作 [ timer() ]、周期性操作 [ interval() ]
例:每隔 2 秒输出日志“hello world”
java
Observable.interval(2,TimeUnit.SECONDS)
.
subscribe(new Observer<Long>() {
@Override
public void onCompleted () {
log.d("completed");
}
@Override
public void onError (Throwable e){
log.e("error");
}
@Override
public void onNext (Long number){
log.d("hello world");
}
});
Observable.interval(2,TimeUnit.SECONDS)
.
subscribe(new Observer<Long>() {
@Override
public void onCompleted () {
log.d("completed");
}
@Override
public void onError (Throwable e){
log.e("error");
}
@Override
public void onNext (Long number){
log.d("hello world");
}
});
10. 数据变换 ,例如过滤掉不符合条件的数据 [ filter() ]、去掉重复的数据 [ distinct() ]、取出前几个数据 [ take() ] 等
java
Observable.just("1","2","2","3","4","5")
.
map(Integer::parseInt)
.
filter(s ->s >1)
.
distinct()
.
take(3)
.
reduce((integer, integer2) ->integer.
intValue() +integer2.
intValue())
.
subscribe(System.out::println);
Observable.just("1","2","2","3","4","5")
.
map(Integer::parseInt)
.
filter(s ->s >1)
.
distinct()
.
take(3)
.
reduce((integer, integer2) ->integer.
intValue() +integer2.
intValue())
.
subscribe(System.out::println);
RxJava-Android-Samples
RxJava-Android-Samples:https://github.com/kaushikgopal/RxJava-Android-Samples
这个仓库收集了一些使用 RxJava 开发 Android 应用的真实存在的、有用的例子,可以帮助了解 RxJava 在 Android 开发中的应用场景。
评论
暂无评论,来发表第一条评论吧