asynchronous, scala

Testing futures in scala using scala test

Non-blocking external calls (web service) calls often would return a Future which will resolve to some value in the future. Testing such values in Java is pretty sad. Testing futures in java will usually involve performing a get()to resolve a future and then perform the assertions. I am pretty new to scala and was curious if I would have to do the same. Fortunately with Scala this is not the case. I will walk you thru my experience with an example and lets see how we do it in scala. In this post, we will be using scala-test library as our testing library and use FlatSpec type of testing.

Let’s start by defining our class to test. 

import scala.concurrent.{ExecutionContext, Future}
class NetworkCallExample {
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
private def getResponseFromServer(url: String): Future[Int] = {
Future {
Thread.sleep(5000)
10
}
}
// Dummy method to simulate a network call
def statusOfSomeOtherResponse(url: String): Future[Int] = {
println("Sending out request to some other url")
Future {
Thread.sleep(1000)
200
}
}
//Another dummy call
def lengthOfTheResponse(url: String): Future[Int] = {
println("Sending out request asynchronously to the server")
getResponseFromServer(url)
}
}

The above code is a simple scala class that will simulate a dummy network call by just sleeping for an arbitrary amount time.

lengthOfTheResponse(url: String): Future[Int] is a function that waits for 5 seconds and returns the number 10. 

Similarly, statusOfSomeOtherResponse(url: String): Future[Int]is another function that waits for 1 second and returns the number 200. Let’s see how we write tests for these function. 

import org.scalatest.FlatSpec
import org.scalatest.concurrent.ScalaFutures
class NetworkCallExampleTest extends FlatSpec with ScalaFutures {
it should "verify the future returns when complete" in {
val networkCallExample = new NetworkCallExample
//This will fail with a time out error
whenReady(networkCallExample.lengthOfTheResponse("testurl")) { res =>
assert(res == 10)
}
}
}

The above example looks great but will fail with the following error.

A timeout occurred waiting for a future to complete. Queried 10 times, sleeping 15 milliseconds between each query.

The default behavior of whenReady is to keep polling 10 times by waiting for 15 milliseconds in between to see if the future is resolved. If not, we get the above error. This clearly is not a consistent solution as we are depending on a finite time and polling to figure out the future result. Sure, we can configure the above whenReady method with a higher timeout and retry but there are better ways to test. Let’s see another approach.

class NetworkCallExampleTest extends AsyncFlatSpec {
it should "verify the future returns with for and yield" in {
val networkCallExample = new NetworkCallExample
for{
res <- networkCallExample.lengthOfTheResponse("testurl")
} yield {
assert(res == 10)
}
}
}

The above example will first execute the lengthOfTheResponse("testurl") resolve the future and yield the result Intinto resvariable. We then assert the result. But there is a gotcha we need to keep in mind. It is important that we use the AsyncFlatSpec instead of FlatSpec

We can also do more fluent version with multiple network calls. 

class NetworkCallExampleTest extends AsyncFlatSpec {
it should "verify the future returns with for and yield" in {
val networkCallExample = new NetworkCallExample
// More fluent version
for {
_ <- networkCallExample.lengthOfTheResponse("testurl")
status <- networkCallExample.statusOfSomeOtherResponse("some other url")
} yield {
assert(status == 200)
}
}
}

This way, we can kind of chain the futures in the order that we want to test and perform assertions accordingly. This will ensure that the futures resolve appropriately.

Let me know what you think of this and would certainly like to learn other approach to test futures.

Standard
RxJava
asynchronous, java, libraries

RxJava: A much needed programmer friendly library

Recently, I had to do a lot of concurrent-programming. To be honest, it has not been a pleasure ride. Writing java programs with concurrency in mind is not a straightforward task. Primary reason being, not many utilities come bundled with the language to ease up the concurrent task in hand. A few of them to name are Future, CompletableFuture, Executors. In my opinion, the best help java provides to us is thru CompletableFutures. We have talked about CompletableFutures in a post already.

Nevertheless, I feel that CompletableFutures solves a very specific problem, i.e. to chain asynchronous tasks together in a very fluid way. Ideally what we would want in a concurrent program is the ability to not worry about number of threads, a pool to manage threads whilst having the ability to compose and chain tasks together.

RxJava is one such library that solves scenarios like this with ease. Though RxJava is not designed to solve problems involving concurrency, the underlying concepts that RxJava is based on, implicitly gives us an awesome way to develop concurrent applications.

RxJava is based on the Reactive Manifesto. Reactive manifesto primarily deals with four main traits as shown in the figure below.

Reactive manifesto traits

Reactive manifesto traits

Let’s not get deep into each one of these traits. But the reason RxJava provides easy concurrent programming inherently is due its message-driven trait.

According to the reactive manifesto:

Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary.

RxJava leverages this message-driven trait in all its components. Hence, we could design a seamless message publish-subscribe kind of application that inherently gives you ability to change your implementation to an asynchronous very easily. We just have to specify if the task on hand is CPU intensive work or I/O bound. RxJava internally takes care of managing the concurrency for you.

Let’s see an example on what I meant.

Observable.fromArray(1, 2, 3, 4, 5)
    .observeOn(Schedulers.computation())
    .map(integer -> {
      System.out.println("Executing cpu intensive task on thread : "+Thread.currentThread().getName());
      return cpuIntensiveTask(integer);
    })
    .observeOn(Schedulers.io())
    .map(integer -> {
      System.out.println("Executing i/o bound task on thread : "+Thread.currentThread().getName());
      return ioBoundTask(integer);
    })
    .blockingSubscribe(integer -> System.out.println("Result = "+integer));

If you see the above code, all we had to specify was to tell if it is going to be a task that is cpu-intensive or i/o-intensive. This will output the following:

Screen Shot 2018-05-20 at 12.49.08 AM

The above code is very similar to a Java stream but, yet it is very different from what streams are. RxJava provides the fluidity that java streams provide along with the power of composing multiple asynchronous tasks similar to what we know about CompletableFutures.

There is lot more to RxJava than what we have talked about in this post. Lets get more hands on in future posts. The intent of this post was to share my perspective on RxJava and why it is one of the best programmer-friendly library out there. I would encourage you to definitely try out RxJava and let me know what you feel about this awesome library.

Standard
asynchronous, java

Combining/Merging more than two CompletableFuture’s together

I recently ran into a situation where I wanted to make a bunch of asynchronous calls and gather those results, to finally construct an object based on the result. Well, in other words, I was exploring on how to resolve these CompletableFuture‘s and merge them properly.

For the sake of this post, lets take an example and work our way through. My CompletableFuture‘s would look something like this snippet.


ForkJoinPool myThreadPool = new ForkJoinPool(10);
CompletableFuture<Integer> myCompletableFutureInt = CompletableFuture.supplyAsync(() -> {
try {
int sleepTime = new Random().nextInt(2000);
Thread.sleep(sleepTime);
System.out.println(
"Sleeping for " + sleepTime + " in myCompletableFutureInt on thread "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextInt(50);
}, myThreadPool);
CompletableFuture<BigDecimal> myCompletableFutureBigDecimal = CompletableFuture
.supplyAsync(() -> {
try {
int sleepTime = new Random().nextInt(1000);
Thread.sleep(sleepTime);
System.out.println(
"Sleeping for " + sleepTime + " in myCompletableFutureBigDecimal on thread "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return new BigDecimal(new Random().nextDouble() * 50);
}, myThreadPool);
CompletableFuture<Long> myCompletableFutureLong = CompletableFuture.supplyAsync(() -> {
try {
int sleepTime = new Random().nextInt(1000);
Thread.sleep(sleepTime);
System.out.println(
"Sleeping for " + sleepTime + " in myCompletableFutureLong on thread "
+ Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextLong();
}, myThreadPool);

First thought that occurred to me was to collect all the CompletableFuture‘s in a List and iterate over the List, do a CompletableFuture::get on each and collect these results into another List.

I hated to do this. In fact, I did not even try this, because, this was the main reason why I did not want to use a Future. I wanted a clean way to have a handle on my flow. I wanted to chain these Future‘s so I can construct the object in fluid way while handling errors. Doing a blocking call to get the result of each CompletableFuture would defeat the whole purpose of having a CompletableFuture.

Second way was to chain all these CompletableFuture‘s with a thenApply method. But, if you think about it, sure, I’ll be able to chain it properly, but that would soon become a mess. I understood it once I tried it. Here’s how the code would look if I used thenApply method to chain the CompletableFuture‘s and combine them.


CompletableFuture<CompletableFuture<CompletableFuture<Map<String, Object>>>> result = myCompletableFutureBigDecimal.thenApply(bigDecimal ->
myCompletableFutureInt.thenApply( integer ->
myCompletableFutureLong.thenApply(aLong -> {
Map<String, Object> objectHashMap = new HashMap<>();
objectHashMap.put("IntegerValue", integer);
objectHashMap.put("LongValue", aLong);
objectHashMap.put("BigDecimalValue", bigDecimal);
return objectHashMap;
})));
try {
Map<String, Object> re = result
.get(2, TimeUnit.SECONDS)
.get()
.get();
System.out.println(re);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

Each thenApply method would return another CompletableFuture. So, every time we are in a need to add an asynchronous call, we would need the return type to have a nested CompletableFuture. Also, see how awful it would be if we would want to get the value of the result. We are doing a get() call on all the nested CompletableFuture. That is not good at all and would not be much different than the list-loop approach we talked about before.

Now, at-least with this method we were able to achieve some kind of chaining for these CompletableFuture‘s. All we now need is to make the end of the chain return one CompletableFuture that has the Map object ready for us to consume. Fortunately, there is a way to do it using the awesome thenCompose method. So, what’s so special about this thenCompose method.

The thenCompose method is like flatMap in Java streams. Like how a flatMap would take in a Stream and return another stream by flattening it out, thenCompose would flatten a CompletableFuture.

Visual representation of Java's flatMap operation

Visual representation of Java’s flatMap operation

Like the above figure illustrates, we can use thenCompose method to return a new CompletableFuture by flattening the inner CompletableFuture‘s. All we have to do is, keep flattening the CompletableFuture‘s and use thenCombine to mimic as if we were just combining two CompletableFuture‘s together.


CompletableFuture<Map<String, Object>> myObjectCompletableFuture =
myCompletableFutureBigDecimal.thenCompose(bigDecimalValue ->
myCompletableFutureInt
.thenCombine(myCompletableFutureLong,
((integerValue, longValue) -> {
Map<String, Object> objectHashMap = new HashMap<>();
objectHashMap.put("IntegerValue", integerValue);
objectHashMap.put("LongValue", longValue);
objectHashMap.put("BigDecimalValue", bigDecimalValue);
return objectHashMap;
})));

Since we have just three CompletableFuture‘s to merge, we used thenCompose to flatten-up two CompletableFutures to one and then chained the resulting flattened CompletableFuture to the third CompletableFuture, where we combine all the results to construct our Map and return it.


try {
Map<String, Object> myObjectMap = myObjectCompletableFuture.get(2, TimeUnit.SECONDS);
myObjectMap.entrySet().forEach(stringObjectEntry -> System.out
.println(stringObjectEntry.getKey() + " = " + stringObjectEntry.getValue().toString()));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

Once we have the final result we just call a blocking call just once. The beauty is, no matter how many CompletableFutures we chain together we just call get once to get result from the final merged CompletableFuture. Here in our case, the get() will wait for utmost two seconds to get the result or throw a TimeoutException.

Result of the resolved CompletableFuture

Result of the resolved CompletableFuture

This method allows us to chain as many number of CompletableFuture‘s together, compose them nicely and get the result in a very fluid way.

Let me know what you feel about this way of resolving CompletableFuture‘s. I would also love to know if there is any other better way of merging two or more CompletableFuture‘s together into one. Just leave a comment below and I’ll be happy to discuss further. Cheers!

Standard
asynchronous, java

CompletableFutures – Asynchronous programming made easy

Asynchronous programming is often not a very pleasant experience when it comes to Java. Java concurrency could be very challenging, there is an awesome book on this topic by Brian Goetz named “Java Concurrency in Practice“. It is quite extensive and often considered a definite read if you are working on projects involving Java or concurrency.

While there are many problems that threads, in general, could cause, my pick of the most annoying part is that when I want to chain parallel stuff together and pass down the result once I resolve a result in a pipeline kind of data-flow.

New languages like Scala, Javascript, etc., that have a good language design when it comes to writing concurrent programs. For example, Javascript provides an elegant solution for chaining asynchronous calls using promises. In my quest for something similar in Java realm, I was all excited when I first learned about java.util.concurrency.Future interface. While Futures were a good step towards a better future of Java, it did not catch up to the modern requirement of the Java community.

According to Oracle,

[1]A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. …

[1] https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html

If you focus on the official documentation of Future interface, it says one disappointing thing;  we will have to wait or manually check if the Future is done by callingisDone() method on the Future. So, if you want to use a Future you would have to keep polling the future to see if the future is complete.

Example:


package completablefutureexample;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//Future example
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> future = executorService.submit(() -> {
System.out.println("Executing a task.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextInt(20);
});
while (!future.isDone()) {
System.out.println("Task not yet done…");
Thread.sleep(100);
}
System.out.println("The future is resolved. Value generated = " + future.get());
executorService.shutdown();
}
}

view raw

App.java

hosted with ❤ by GitHub

Also, if you think of how to chain subsequent calls, it can get really messy. We will have to check if the Future is done and handle the next Future. This is where Future interface fails to provide a fluent interface to chain calls and handling errors would open another can of worms.

Fortunately, Java 8 came to the party with a new update. Java 8 introduced a new class called CompletableFuture. CompletableFuture, in my opinion is the second best feature after Streams in Java 8. It is a type of Future which also implements CompletionStage interface. This interface would enable clean way of chaining and handling errors while writing asynchronous code.


package completablefutureexample;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class App {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
//Define your flow
completableFuture.thenApply((integer) -> {
int randomNumber = 0;
try {
randomNumber = generateRandomNumber();
System.out.println("Adding " + integer + " to the random number " + randomNumber);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"Generated random number=" + randomNumber + " from thread : " + Thread.currentThread()
.getName());
return randomNumber + integer;
}).thenApplyAsync(integer -> {
System.out.println("Got result from the first thread.");
return integer + 500;
}).thenAccept(integer -> {
System.out.println("Added 500 to the result. Terminating the completable future.");
});
//Start the processing
completableFuture.complete(10);
}
private static int generateRandomNumber() throws InterruptedException {
int number = new Random().nextInt(100);
System.out.println("Sleeping for " + number + "ms");
Thread.sleep(number);
return number;
}
}

view raw

App.java

hosted with ❤ by GitHub

Lets now walk through what the above code is doing. We are defining a CompletableFuture where we start off by defining the flow, after which we call complete() method to pass in data to the pipeline to eventually pass the result of one operation nicely down the pipeline.

If you have not already observed, you can see that there are different flavors of thenApply method. CompletableFuture has two flavors of each of such methods, where the XXXAsync methods would run the operation on a different thread (By default if you do not specify any ForkJoinPool, it uses the ForkJoinCommonPool)

CompletableFuture also provides a variety of methods to compose, combine two or more CompletableFutures. This provides a very fluent, readable and maintainable asynchronous code.

We barely scraped the surface of CompletableFuture in this post. The motive of this post was to introduce CompletableFuture and see how different it is from Future interface. It would probably make sense to dedicate a whole new post to get more deeply into the capabilities of CompletableFuture. Let see that in a different post.

Standard