Common usage scenarios of RxJava

227 words
This article collects common usage scenarios and examples of RxJava.

Usage Scenarios

1. Fetching data in a background thread (network requests, file access, etc.), displaying in the main thread (UI updates) [ subscribeOn() , observeOn() ]

java
Observable.just(1,2,3,4)
 .subscribeOn(Schedulers.io()) // Specify that subscribe() occurs on the IO thread
 .observeOn(AndroidSchedulers.mainThread()) // Specify that Subscriber's callback occurs on the main thread
 .subscribe(new Action1<Integer>() {
    @Override
    public void call (Integer number){
        Log.d(tag, "number:" + number);
    }
});
Observable.just(1,2,3,4)
 .subscribeOn(Schedulers.io()) // Specify that subscribe() occurs on the IO thread
 .observeOn(AndroidSchedulers.mainThread()) // Specify that Subscriber's callback occurs on the main thread
 .subscribe(new Action1<Integer>() {
    @Override
    public void call (Integer number){
        Log.d(tag, "number:" + number);
    }
});

2. Check cache before fetching data [ concat() ]

Example: First check if there is cache in memory, then check for file cache, and finally fetch from the network. If cache is found, subsequent operations will not be executed.

java
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (memoryCache != null) {
            subscriber.onNext(memoryCache);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        String cachePref = rxPreferences.getString("cache").get();
        if (!TextUtils.isEmpty(cachePref)) {
            subscriber.onNext(cachePref);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> network = Observable.just("network");

Observable.

concat(memory, disk, network)
        .

first()
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(s ->{
memoryCache ="memory";
        System.out.

println("--------------subscribe: "+s);
        });
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        if (memoryCache != null) {
            subscriber.onNext(memoryCache);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        String cachePref = rxPreferences.getString("cache").get();
        if (!TextUtils.isEmpty(cachePref)) {
            subscriber.onNext(cachePref);
        } else {
            subscriber.onCompleted();
        }
    }
});
Observable<String> network = Observable.just("network");

Observable.

concat(memory, disk, network)
        .

first()
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(s ->{
memoryCache ="memory";
        System.out.

println("--------------subscribe: "+s);
        });

3. Wait for multiple requests to complete before proceeding, or merge the data returned from multiple requests [ merge() ]

java
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.

merge(observable1, observable2)
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(System.out::println);
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.

merge(observable1, observable2)
        .

subscribeOn(Schedulers.newThread())
        .

subscribe(System.out::println);

4. One request depends on the data returned from another request (eliminating nested callbacks)

Example: After logging in, retrieve the message list based on the obtained token.

java
NetworkService.getToken("username","password")
        .

flatMap(s ->NetworkService.

getMessage(s))
        .

subscribe(s ->{
        System.out.

println("message: "+s);
        });
NetworkService.getToken("username","password")
        .

flatMap(s ->NetworkService.

getMessage(s))
        .

subscribe(s ->{
        System.out.

println("message: "+s);
        });

5. Polling requests [ schedulePeriodically() ]

java
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call ( final Subscriber<? super String> observer){

        Schedulers.newThread().createWorker()
                .schedulePeriodically(new Action0() {
                    @Override
                    public void call() {
                        observer.onNext(doNetworkCallAndGetStringResult());
                    }
                }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
    }
}).

subscribe(new Action1<String>() {
    @Override
    public void call (String s){
        log.d(tag, "polling….”);
    }
});
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call ( final Subscriber<? super String> observer){

        Schedulers.newThread().createWorker()
                .schedulePeriodically(new Action0() {
                    @Override
                    public void call() {
                        observer.onNext(doNetworkCallAndGetStringResult());
                    }
                }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
    }
}).

subscribe(new Action1<String>() {
    @Override
    public void call (String s){
        log.d(tag, "polling….”);
    }
});

6. Prevent rapid consecutive clicks on UI buttons [ throttleFirst() ]

java
RxView.clicks(findViewById(R.id.btn_throttle))
        .

throttleFirst(1,TimeUnit.SECONDS)
        .

subscribe(aVoid ->{
        System.out.

println("click");
        });
RxView.clicks(findViewById(R.id.btn_throttle))
        .

throttleFirst(1,TimeUnit.SECONDS)
        .

subscribe(aVoid ->{
        System.out.

println("click");
        });

7. Reactive UI

Example: When a certain checkbox is checked, automatically update the corresponding preference.

java
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.

checkedChanges(checkBox)
        .

subscribe(checked.asAction());
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.

checkedChanges(checkBox)
        .

subscribe(checked.asAction());

8. Reduce frequent requests [ debounce() ]

Example: A search box with auto-suggestion, avoiding making a suggestion for every character typed or deleted.

java
RxTextView.textChangeEvents(inputEditText)
        .

debounce(400,TimeUnit.MILLISECONDS)
        .

observeOn(AndroidSchedulers.mainThread())
        .

subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted () {
        log.d(tag, "onComplete");
    }

    @Override
    public void onError (Throwable e){
        log.d(tag, "Error");
    }

    @Override
    public void onNext (TextViewTextChangeEvent onTextChangeEvent){
        log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});
RxTextView.textChangeEvents(inputEditText)
        .

debounce(400,TimeUnit.MILLISECONDS)
        .

observeOn(AndroidSchedulers.mainThread())
        .

subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted () {
        log.d(tag, "onComplete");
    }

    @Override
    public void onError (Throwable e){
        log.d(tag, "Error");
    }

    @Override
    public void onNext (TextViewTextChangeEvent onTextChangeEvent){
        log.d(tag, format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});

9. Timed operations [ timer() ], periodic operations [ interval() ]

Example: Output "hello world" every 2 seconds.

java
Observable.interval(2,TimeUnit.SECONDS)  
 .

subscribe(new Observer<Long>() {
    @Override
    public void onCompleted () {
        log.d("completed");
    }

    @Override
    public void onError (Throwable e){
        log.e("error");
    }

    @Override
    public void onNext (Long number){
        log.d("hello world");
    }
});
Observable.interval(2,TimeUnit.SECONDS)  
 .

subscribe(new Observer<Long>() {
    @Override
    public void onCompleted () {
        log.d("completed");
    }

    @Override
    public void onError (Throwable e){
        log.e("error");
    }

    @Override
    public void onNext (Long number){
        log.d("hello world");
    }
});

10. Data transformation, such as filtering out data that does not meet certain criteria [ filter() ], removing duplicates [ distinct() ], taking the first few items [ take() ], etc.

java
Observable.just("1","2","2","3","4","5")
        .

map(Integer::parseInt)
        .

filter(s ->s >1)
        .

distinct()
        .

take(3)
        .

reduce((integer, integer2) ->integer.

intValue() +integer2.

intValue())
        .

subscribe(System.out::println);
Observable.just("1","2","2","3","4","5")
        .

map(Integer::parseInt)
        .

filter(s ->s >1)
        .

distinct()
        .

take(3)
        .

reduce((integer, integer2) ->integer.

intValue() +integer2.

intValue())
        .

subscribe(System.out::println);

RxJava-Android-Samples

RxJava-Android-Samples:https://github.com/kaushikgopal/RxJava-Android-Samples

This repository collects some real and useful examples of using RxJava to develop Android applications, which can help understand the application scenarios of RxJava in Android development.

Comments

Pleaseto continueComments require admin approval before being visible

No comments yet. Be the first to comment!