Common usage scenarios of RxJava
This article collects common usage scenarios and examples of RxJava.
Usage Scenarios
1. Fetching data in a background thread (network requests, file access, etc.), displaying in the main thread (UI updates) [ subscribeOn() , observeOn() ]
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io()) // Specify that subscribe() occurs on the IO thread
.observeOn(AndroidSchedulers.mainThread()) // Specify that Subscriber's callback occurs on the main thread
.subscribe(new Action1<Integer>() {
@Override
public void call (Integer number){
Log.d(tag, "number:" + number);
}
});
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io()) // Specify that subscribe() occurs on the IO thread
.observeOn(AndroidSchedulers.mainThread()) // Specify that Subscriber's callback occurs on the main thread
.subscribe(new Action1<Integer>() {
@Override
public void call (Integer number){
Log.d(tag, "number:" + number);
}
});
2. Check cache before fetching data [ concat() ]
Example: First check if there is cache in memory, then check for file cache, and finally fetch from the network. If cache is found, subsequent operations will not be executed.
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
Observable.
concat(memory, disk, network)
.
first()
.
subscribeOn(Schedulers.newThread())
.
subscribe(s ->{
memoryCache ="memory";
System.out.
println("--------------subscribe: "+s);
});
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
Observable.
concat(memory, disk, network)
.
first()
.
subscribeOn(Schedulers.newThread())
.
subscribe(s ->{
memoryCache ="memory";
System.out.
println("--------------subscribe: "+s);
});
3. Wait for multiple requests to complete before proceeding, or merge the data returned from multiple requests [ merge() ]
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.
merge(observable1, observable2)
.
subscribeOn(Schedulers.newThread())
.
subscribe(System.out::println);
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.
merge(observable1, observable2)
.
subscribeOn(Schedulers.newThread())
.
subscribe(System.out::println);
4. One request depends on the data returned from another request (eliminating nested callbacks)
Example: After logging in, retrieve the message list based on the obtained token.
NetworkService.getToken("username","password")
.
flatMap(s ->NetworkService.
getMessage(s))
.
subscribe(s ->{
System.out.
println("message: "+s);
});
NetworkService.getToken("username","password")
.
flatMap(s ->NetworkService.
getMessage(s))
.
subscribe(s ->{
System.out.
println("message: "+s);
});
5. Polling requests [ schedulePeriodically() ]
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call ( final Subscriber<? super String> observer){
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).
subscribe(new Action1<String>() {
@Override
public void call (String s){
log.d(tag, "polling….”);
}
});
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call ( final Subscriber<? super String> observer){
Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).
subscribe(new Action1<String>() {
@Override
public void call (String s){
log.d(tag, "polling….”);
}
});
6. Prevent rapid consecutive clicks on UI buttons [ throttleFirst() ]
RxView.clicks(findViewById(R.id.btn_throttle))
.
throttleFirst(1,TimeUnit.SECONDS)
.
subscribe(aVoid ->{
System.out.
println("click");
});
RxView.clicks(findViewById(R.id.btn_throttle))
.
throttleFirst(1,TimeUnit.SECONDS)
.
subscribe(aVoid ->{
System.out.
println("click");
});
7. Reactive UI
Example: When a certain checkbox is checked, automatically update the corresponding preference.
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.
checkedChanges(checkBox)
.
subscribe(checked.asAction());
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.
checkedChanges(checkBox)
.
subscribe(checked.asAction());
8. Reduce frequent requests [ debounce() ]
Example: A search box with auto-suggestion, avoiding making a suggestion for every character typed or deleted.
RxTextView.textChangeEvents(inputEditText)
.
debounce(400,TimeUnit.MILLISECONDS)
.
observeOn(AndroidSchedulers.mainThread())
.
subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted () {
log.d(tag, "onComplete");
}
@Override
public void onError (Throwable e){
log.d(tag, "Error");
}
@Override
public void onNext (TextViewTextChangeEvent onTextChangeEvent){
log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
}
});
RxTextView.textChangeEvents(inputEditText)
.
debounce(400,TimeUnit.MILLISECONDS)
.
observeOn(AndroidSchedulers.mainThread())
.
subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted () {
log.d(tag, "onComplete");
}
@Override
public void onError (Throwable e){
log.d(tag, "Error");
}
@Override
public void onNext (TextViewTextChangeEvent onTextChangeEvent){
log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
}
});
9. Timed operations [ timer() ], periodic operations [ interval() ]
Example: Output "hello world" every 2 seconds.
Observable.interval(2,TimeUnit.SECONDS)
.
subscribe(new Observer<Long>() {
@Override
public void onCompleted () {
log.d("completed");
}
@Override
public void onError (Throwable e){
log.e("error");
}
@Override
public void onNext (Long number){
log.d("hello world");
}
});
Observable.interval(2,TimeUnit.SECONDS)
.
subscribe(new Observer<Long>() {
@Override
public void onCompleted () {
log.d("completed");
}
@Override
public void onError (Throwable e){
log.e("error");
}
@Override
public void onNext (Long number){
log.d("hello world");
}
});
10. Data transformation, such as filtering out data that does not meet certain criteria [ filter() ], removing duplicates [ distinct() ], taking the first few items [ take() ], etc.
Observable.just("1","2","2","3","4","5")
.
map(Integer::parseInt)
.
filter(s ->s >1)
.
distinct()
.
take(3)
.
reduce((integer, integer2) ->integer.
intValue() +integer2.
intValue())
.
subscribe(System.out::println);
Observable.just("1","2","2","3","4","5")
.
map(Integer::parseInt)
.
filter(s ->s >1)
.
distinct()
.
take(3)
.
reduce((integer, integer2) ->integer.
intValue() +integer2.
intValue())
.
subscribe(System.out::println);
RxJava-Android-Samples
RxJava-Android-Samples:https://github.com/kaushikgopal/RxJava-Android-Samples
This repository collects some real and useful examples of using RxJava to develop Android applications, which can help understand the application scenarios of RxJava in Android development.
Comments
No comments yet. Be the first to comment!