java, libraries

RxJava: Hot Observables

Hot observables are a type of observables where the observer would be able to multicast the message simultaneously to all its subscribers at once rather than creating one stream for each subscriber. Let me elaborate that a little bit. Usually, when you have an Observable, a subscriber subscribing to this observer will initiate the subscription and consume every message until the complete event is sent out. Every other subscriber will start the consumption from the start again.


List<Integer> integerList = IntStream.range(1, 11).boxed().collect(Collectors.toList());
Observable<Integer> integerObservable = Observable.fromIterable(integerList)
.map(integer -> integer * 2);
Disposable subscribe1 = integerObservable
.map(integer -> integer * 10)
.subscribe(integer -> System.out.println("From first subscriber: "+integer));
Disposable subscribe2 = integerObservable
.map(integer -> integer * 100)
.subscribe(integer -> System.out.println("From second subscriber: "+integer));

The output of the above code would be:

From first subscriber: 20
From first subscriber: 40
From first subscriber: 60
From first subscriber: 80
From first subscriber: 100
From first subscriber: 120
From first subscriber: 140
From first subscriber: 160
From first subscriber: 180
From first subscriber: 200
From second subscriber: 200
From second subscriber: 400
From second subscriber: 600
From second subscriber: 800
From second subscriber: 1000
From second subscriber: 1200
From second subscriber: 1400
From second subscriber: 1600
From second subscriber: 1800
From second subscriber: 2000

When you have more than one Observer, the default behavior is to create a separate stream for each one. We do not want this behavior always. Hot Observables are used to solve this. If there is a scenario where we want to share Observables, RxJava has a specific way to achieve this. Let’s see how that is done.


// Define your publisher
ConnectableObservable<Integer> integerObservable = Observable.fromIterable(integerList)
.map(integer -> integer * 2)
.publish();
// Define your subscribers
Disposable subscribe1 = integerObservable
.map(integer -> integer * 10)
.subscribe(integer -> System.out.println("From first subscriber: " + integer));
Disposable subscribe2 = integerObservable
.map(integer -> integer * 100)
.subscribe(integer -> System.out.println("From second subscriber: " + integer));
//Start publishing simultaneously
integerObservable.connect();

The above example is how you define a hot obsevervable. ConnectableObservable is a special type of Observable that would send events to all its subscribers simultaneously. All you need to do is to define your publisher by calling publish() method. Then define your subscribers normally as you would do with a normal Observable. Once you define your subscribers, you need to call the connect() method on the ConnectableObserver to start publishing the message simultaneously to all the subscribers.

Let’s see the output of the above code snippet:


From first subscriber: 20
From second subscriber: 200
From first subscriber: 40
From second subscriber: 400
From first subscriber: 60
From second subscriber: 600
From first subscriber: 80
From second subscriber: 800
From first subscriber: 100
From second subscriber: 1000
From first subscriber: 120
From second subscriber: 1200
From first subscriber: 140
From second subscriber: 1400
From first subscriber: 160
From second subscriber: 1600
From first subscriber: 180
From second subscriber: 1800
From first subscriber: 200
From second subscriber: 2000

ConnectableObservable will force emissions from the source to become hot, pushing a single stream of emissions to all Observers at the same time rather than giving a separate stream to each Observer.

 

Standard
RxJava
asynchronous, java, libraries

RxJava: A much needed programmer friendly library

Recently, I had to do a lot of concurrent-programming. To be honest, it has not been a pleasure ride. Writing java programs with concurrency in mind is not a straightforward task. Primary reason being, not many utilities come bundled with the language to ease up the concurrent task in hand. A few of them to name are Future, CompletableFuture, Executors. In my opinion, the best help java provides to us is thru CompletableFutures. We have talked about CompletableFutures in a post already.

Nevertheless, I feel that CompletableFutures solves a very specific problem, i.e. to chain asynchronous tasks together in a very fluid way. Ideally what we would want in a concurrent program is the ability to not worry about number of threads, a pool to manage threads whilst having the ability to compose and chain tasks together.

RxJava is one such library that solves scenarios like this with ease. Though RxJava is not designed to solve problems involving concurrency, the underlying concepts that RxJava is based on, implicitly gives us an awesome way to develop concurrent applications.

RxJava is based on the Reactive Manifesto. Reactive manifesto primarily deals with four main traits as shown in the figure below.

Reactive manifesto traits

Reactive manifesto traits

Let’s not get deep into each one of these traits. But the reason RxJava provides easy concurrent programming inherently is due its message-driven trait.

According to the reactive manifesto:

Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary.

RxJava leverages this message-driven trait in all its components. Hence, we could design a seamless message publish-subscribe kind of application that inherently gives you ability to change your implementation to an asynchronous very easily. We just have to specify if the task on hand is CPU intensive work or I/O bound. RxJava internally takes care of managing the concurrency for you.

Let’s see an example on what I meant.

Observable.fromArray(1, 2, 3, 4, 5)
    .observeOn(Schedulers.computation())
    .map(integer -> {
      System.out.println("Executing cpu intensive task on thread : "+Thread.currentThread().getName());
      return cpuIntensiveTask(integer);
    })
    .observeOn(Schedulers.io())
    .map(integer -> {
      System.out.println("Executing i/o bound task on thread : "+Thread.currentThread().getName());
      return ioBoundTask(integer);
    })
    .blockingSubscribe(integer -> System.out.println("Result = "+integer));

If you see the above code, all we had to specify was to tell if it is going to be a task that is cpu-intensive or i/o-intensive. This will output the following:

Screen Shot 2018-05-20 at 12.49.08 AM

The above code is very similar to a Java stream but, yet it is very different from what streams are. RxJava provides the fluidity that java streams provide along with the power of composing multiple asynchronous tasks similar to what we know about CompletableFutures.

There is lot more to RxJava than what we have talked about in this post. Lets get more hands on in future posts. The intent of this post was to share my perspective on RxJava and why it is one of the best programmer-friendly library out there. I would encourage you to definitely try out RxJava and let me know what you feel about this awesome library.

Standard