Skip to content

RxJava in Android

Published: at 8 min read

Table of content

RxJava - Definition

Components of RxJava

  1. Observable: It represents a stream of data or events that can be observed. It can emit data, errors, and completion events.

  2. Observer: Subscribes to an Observable to receive data or events. It defines methods like onNext(), onError(), and onComplete() to handle emitted items, errors, and the completion event.

  3. Operators: RxJava provides a rich set of operators that allow to transform, filter, combine, or manipulate the data emitted by Observables. Examples of operators include map, filter, merge, zip, and more.

  4. Schedulers: RxJava allows to specify the threads or schedulers on which the Observable emits data and the Observer consumes it. This can help manage concurrency and threading issues.

  5. Backpressure: RxJava has mechanisms for dealing with backpressure, where an Observable produces data faster than the Observer can consume it. One can use backpressure strategies like onBackpressureBuffer or onBackpressureLatest to handle this.

  6. Subjects: Subjects are a way to bridge the gap between traditional non-reactive code and RxJava. They can act as both an Observable and an Observer, making it easier to integrate RxJava into existing code.

RxJava in Android

RxJava is widely used in Android app development for

rxjava

It can make code more readable and maintainable by representing complex asynchronous operations as a sequence of transformations and actions on observables.

Common use cases for RxJava in Android

Inside RxJava

rxjavaAndroid

Types of Observables:

  1. Observables : A stream of data that can emit zero or more items, and optionally terminates with either a completion or an error. Observable is commonly used when dealing with asynchronous data streams or events that can emit multiple items.

  2. Single : A stream that emits a single item or an error.

    • Subscribers of a Single can handle the emitted item via the onSuccess method or handle an error via the onError method.
    • Single is useful when one expects only a single result from an operation, such as making a network request and receiving a response.
  3. Maybe : Represents a stream that can emit either a single item, no item at all, or terminate with an error.

    • Subscribers of a Maybe can handle the emitted item via the onSuccess method, handle the absence of an item via the onComplete method, or handle an error via the onError method.
    • Maybe is suitable when one needs to handle situations where the result may or may not be present.
  4. Completable : Represents an asynchronous operation that either completes successfully without emitting any item or terminates with an error.

    • Subscribers of a Completable only need to handle the completion event or an error, as it does not emit any items.
    • Completable is useful when one is only interested in the completion status of an operation, rather than receiving data.
  5. Flowable : Designed to handle backpressure, where an Observable is emitting data faster than the subscriber can consume.

    • Flowable supports asynchronous data streams that can emit zero or more items, and optionally terminate with either a completion or an error.
    • It provides additional operators for dealing with backpressure and controlling the flow of data between the producer and the consumer.

Types of Observer

  1. onNext() -> the next upcoming data will be received here.
  2. onError() -> if got error in observing data
  3. onComplete() -> if all data received from Observable
  4. onSubscribe() -> Once observer got subscribed to the observable the stream of data flow starts.

Types of Operators

  1. Transformation Operators:

    • map(): Transforms the items emitted by an observable into another type.
    • flatMap(): Transforms each item emitted by an observable into another observable and flattens the results into a single observable.
    • concatMap(): Similar to flatMap(), but maintains the order of emitted items.
    • switchMap(): Emits items from the most recently active inner observable.
  2. Filtering Operators:

    • filter(): Filters items emitted by an observable based on a specified condition.
    • distinct(): Emits only distinct items, removing duplicates.
    • take(): Takes a specified number of items from the beginning of the observable.
    • skip(): Skips a specified number of items from the beginning of the observable.
    • debounce(): Emits an item only if a certain amount of time has passed without emitting another item.
  3. Combining Operators:

    • merge(): Combines multiple observables into one observable, emitting items from all of them.
    • concat(): Concatenates multiple observables, emitting items from one observable before moving to the next.
    • zip(): Combines the emissions of multiple observables into pairs or triples.
    • combineLatest(): Combines the most recent emissions from multiple observables into a single item.
  4. Utility Operators:

    • doOnNext(): Performs a side effect for each emitted item.
    • doOnError(): Performs a side effect when an error occurs.
    • doOnComplete(): Performs a side effect when the observable completes.
    • subscribeOn(): Specifies the thread on which the source observable emits items.
    • observeOn(): Specifies the thread on which the observer receives and processes items.
  5. Conditional and Boolean Operators:

    • all(): Determines if all items emitted by the observable meet a condition.
    • any(): Checks if at least one item emitted by the observable meets a condition.
    • contains(): Checks if the observable contains a specified item.
    • isEmpty(): Checks if the observable is empty.

A typical example of RxJava usage

Observable.fromArray(1, 2, 3, 4, 5) // #1
    .map { "$it (mapped)" } // #2
    .doOnNext { println("doOnNext($it)") } // #2
    .take(3) // #2
    .subscribe( // #3
        { value -> println("onNext received: $value") }, // #4
        { throwable -> println("onError received: $throwable") }, // #4
        { println("onComplete") } // #4
    )

While explaining the Key Points :

#1: An observable source —> some function or factory method that provides an initial Observable instance.

#2: A number of chained operators which manipulate the stream values and its behavior

#3: The subscription

#4: Events callbacks

Step-by-step guide with an example of how to use RxJava in an Android app

implementation 'io.reactivex.rxjava2:rxjava:2.x.y'
// Replace x and y with the appropriate version numbers
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
Observable<Integer> numbersObservable = Observable.just(1, 2, 3, 4, 5);

numbersObservable
    .subscribeOn(Schedulers.io()) // Perform operations on a background thread
    .observeOn(AndroidSchedulers.mainThread()) // Observe and update UI on the main thread
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            // This is called when the subscription is established.
            // You can use the Disposable to cancel the subscription if needed.
        }

        @Override
        public void onNext(Integer value) {
            // Handle each emitted item here (e.g., update the UI)
            textView.setText(String.valueOf(value));
        }

        @Override
        public void onError(Throwable e) {
            // Handle any errors that occur during the operation
            Log.e("RxJavaExample", "Error: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            // This is called when the Observable has completed its emissions
            Log.d("RxJavaExample", "Completed");
        }
    });
private Disposable disposable;

// ...

disposable = numbersObservable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(/*...*/);

// When done with the subscription, dispose of it:
disposable.dispose();

Back points of RxJava

  1. The observables and reactive constructs can introduce some overhead in terms of memory and processing.

  2. Overusing RxJava for simple tasks can lead to over-engineering and unnecessary complexity in your codebase.

  3. RxJava can lead to resource leaks, memory leaks, especially if subscriptions are not disposed of correctly.

  4. RxJava increases the size of the APK.

Some points on RxAndroid

RxAndroid is an ->

RxJava Vs LiveData in Android

Concluding Note

RxJava uses various data structures and algorithms to manage the flow of items through the observable sequences and to handle concurrency and synchronization. It’s a complex library that leverages principles from reactive programming and functional programming to provide a flexible and powerful framework for handling asynchronous and event-driven tasks in a reactive and composable manner.

Happy Learning !

Share :
Written by:Parita Dey

Interested in Writing Blogs, showcase yourself ?

If you're passionate about technology and have insights to share, we'd love to hear from you! Fill out the form below to express your interest in writing technical blogs for us.

If you notice any issues in this blog post or have suggestions, please contact the author directly or send an email to hi@asdevs.dev.