Table of content
- RxJava - Definition
- Components of RxJava
- RxJava in Android
- Common use cases for RxJava in Android
- Inside RxJava
- A typical example of RxJava usage
- Back points of RxJava
- Some points on RxAndroid
- RxJava Vs LiveData in Android
- Concluding Note
RxJava - Definition
- RxJava is a popular library for reactive programming in Java.
- It’s a part of the larger ReactiveX (Reactive Extensions) family, which includes similar libraries for various programming languages.
- RxJava is designed to write asynchronous and event-driven code using observable sequences.
Components of RxJava
-
Observable: It represents a stream of data or events that can be observed. It can emit data, errors, and completion events.
-
Observer: Subscribes to an Observable to receive data or events. It defines methods like onNext(), onError(), and onComplete() to handle emitted items, errors, and the completion event.
-
Operators: RxJava provides a rich set of operators that allow to transform, filter, combine, or manipulate the data emitted by Observables. Examples of operators include map, filter, merge, zip, and more.
-
Schedulers: RxJava allows to specify the threads or schedulers on which the Observable emits data and the Observer consumes it. This can help manage concurrency and threading issues.
-
Backpressure: RxJava has mechanisms for dealing with backpressure, where an Observable produces data faster than the Observer can consume it. One can use backpressure strategies like onBackpressureBuffer or onBackpressureLatest to handle this.
-
Subjects: Subjects are a way to bridge the gap between traditional non-reactive code and RxJava. They can act as both an Observable and an Observer, making it easier to integrate RxJava into existing code.
RxJava in Android
RxJava is widely used in Android app development for
- handling asynchronous tasks,
- managing UI events
- simplifying the handling of complex event chains.
It can make code more readable and maintainable by representing complex asynchronous operations as a sequence of transformations and actions on observables.
Common use cases for RxJava in Android
-
Network Operations: RxJava can be used to make network requests using libraries like Retrofit. It allows to define observables for API calls and handle responses and errors in a reactive way.
-
UI Event Handling: One can use RxJava to react to UI events, such as button clicks, text input changes, or other user interactions. This can make UI code more responsive and easier to manage.
-
Background Processing: RxJava can handle background tasks, such as file downloads, database operations, or image processing, by using schedulers to offload work from the main thread and avoid blocking the UI.
-
Data Transformation and Filtering: RxJava’s operators allow to transform and filter data in a declarative manner. This can be especially useful when dealing with data streams from sensors, databases, or other sources.
-
Reactive UI: One can use RxJava to create reactive UIs where UI components automatically update when underlying data changes. This can simplify UI updates and keep the app’s state in sync.
Inside RxJava
Types of Observables:
-
Observables : A stream of data that can emit zero or more items, and optionally terminates with either a completion or an error. Observable is commonly used when dealing with asynchronous data streams or events that can emit multiple items.
-
Single : A stream that emits a single item or an error.
- Subscribers of a Single can handle the emitted item via the onSuccess method or handle an error via the onError method.
- Single is useful when one expects only a single result from an operation, such as making a network request and receiving a response.
-
Maybe : Represents a stream that can emit either a single item, no item at all, or terminate with an error.
- Subscribers of a Maybe can handle the emitted item via the onSuccess method, handle the absence of an item via the onComplete method, or handle an error via the onError method.
- Maybe is suitable when one needs to handle situations where the result may or may not be present.
-
Completable : Represents an asynchronous operation that either completes successfully without emitting any item or terminates with an error.
- Subscribers of a Completable only need to handle the completion event or an error, as it does not emit any items.
- Completable is useful when one is only interested in the completion status of an operation, rather than receiving data.
-
Flowable : Designed to handle backpressure, where an Observable is emitting data faster than the subscriber can consume.
- Flowable supports asynchronous data streams that can emit zero or more items, and optionally terminate with either a completion or an error.
- It provides additional operators for dealing with backpressure and controlling the flow of data between the producer and the consumer.
Types of Observer
- onNext() -> the next upcoming data will be received here.
- onError() -> if got error in observing data
- onComplete() -> if all data received from Observable
- onSubscribe() -> Once observer got subscribed to the observable the stream of data flow starts.
Types of Operators
-
Transformation Operators:
- map(): Transforms the items emitted by an observable into another type.
- flatMap(): Transforms each item emitted by an observable into another observable and flattens the results into a single observable.
- concatMap(): Similar to flatMap(), but maintains the order of emitted items.
- switchMap(): Emits items from the most recently active inner observable.
-
Filtering Operators:
- filter(): Filters items emitted by an observable based on a specified condition.
- distinct(): Emits only distinct items, removing duplicates.
- take(): Takes a specified number of items from the beginning of the observable.
- skip(): Skips a specified number of items from the beginning of the observable.
- debounce(): Emits an item only if a certain amount of time has passed without emitting another item.
-
Combining Operators:
- merge(): Combines multiple observables into one observable, emitting items from all of them.
- concat(): Concatenates multiple observables, emitting items from one observable before moving to the next.
- zip(): Combines the emissions of multiple observables into pairs or triples.
- combineLatest(): Combines the most recent emissions from multiple observables into a single item.
-
Utility Operators:
- doOnNext(): Performs a side effect for each emitted item.
- doOnError(): Performs a side effect when an error occurs.
- doOnComplete(): Performs a side effect when the observable completes.
- subscribeOn(): Specifies the thread on which the source observable emits items.
- observeOn(): Specifies the thread on which the observer receives and processes items.
-
Conditional and Boolean Operators:
- all(): Determines if all items emitted by the observable meet a condition.
- any(): Checks if at least one item emitted by the observable meets a condition.
- contains(): Checks if the observable contains a specified item.
- isEmpty(): Checks if the observable is empty.
A typical example of RxJava usage
Observable.fromArray(1, 2, 3, 4, 5) // #1
.map { "$it (mapped)" } // #2
.doOnNext { println("doOnNext($it)") } // #2
.take(3) // #2
.subscribe( // #3
{ value -> println("onNext received: $value") }, // #4
{ throwable -> println("onError received: $throwable") }, // #4
{ println("onComplete") } // #4
)
While explaining the Key Points :
#1: An observable source —> some function or factory method that provides an initial Observable instance.
#2: A number of chained operators which manipulate the stream values and its behavior
#3: The subscription
#4: Events callbacks
Step-by-step guide with an example of how to use RxJava in an Android app
- Add RxJava to your Android project
implementation 'io.reactivex.rxjava2:rxjava:2.x.y'
// Replace x and y with the appropriate version numbers
- Import RxJava Classes:
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
- Create and Subscribe to an Observable:
Observable<Integer> numbersObservable = Observable.just(1, 2, 3, 4, 5);
numbersObservable
.subscribeOn(Schedulers.io()) // Perform operations on a background thread
.observeOn(AndroidSchedulers.mainThread()) // Observe and update UI on the main thread
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// This is called when the subscription is established.
// You can use the Disposable to cancel the subscription if needed.
}
@Override
public void onNext(Integer value) {
// Handle each emitted item here (e.g., update the UI)
textView.setText(String.valueOf(value));
}
@Override
public void onError(Throwable e) {
// Handle any errors that occur during the operation
Log.e("RxJavaExample", "Error: " + e.getMessage());
}
@Override
public void onComplete() {
// This is called when the Observable has completed its emissions
Log.d("RxJavaExample", "Completed");
}
});
- Remember to Dispose : It’s important to remember to dispose of the subscription when it’s no longer needed. One can use the Disposable obtained in the onSubscribe method to do this. Failing to dispose can lead to memory leaks.
private Disposable disposable;
// ...
disposable = numbersObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/*...*/);
// When done with the subscription, dispose of it:
disposable.dispose();
Back points of RxJava
-
The observables and reactive constructs can introduce some overhead in terms of memory and processing.
-
Overusing RxJava for simple tasks can lead to over-engineering and unnecessary complexity in your codebase.
-
RxJava can lead to resource leaks, memory leaks, especially if subscriptions are not disposed of correctly.
-
RxJava increases the size of the APK.
Some points on RxAndroid
RxAndroid is an ->
- Android-specific extension of the RxJava library
- Designed to make it easier to work with RxJava in Android applications.
- Provides some Android-specific schedulers and utilities to help integrate RxJava seamlessly with Android’s main thread (UI thread) and its asynchronous programming model.
RxJava Vs LiveData in Android
-
LiveData is not a STREAM, while in RxJava is a STREAM.
-
LiveData is a lifecycle-aware observable data holder class. But RxJava is not lifecycle aware.
-
LiveData is synchronous,but RxJava is asynchronous.
-
RxJava can be used for the app’s business logic (network call, data manipulation etc, anything that happens in and beyond Repository) and LiveData for the presentation layer. By this, one can get transformation and stream capabilities for the business logic and lifecycle-aware operation for the UI.
Concluding Note
RxJava uses various data structures and algorithms to manage the flow of items through the observable sequences and to handle concurrency and synchronization. It’s a complex library that leverages principles from reactive programming and functional programming to provide a flexible and powerful framework for handling asynchronous and event-driven tasks in a reactive and composable manner.
Happy Learning !