cassandra, libraries, scala

Access Cassandra using phantom – A type-safe scala library

This post is a quick tutorial on how to use Scala for interacting with Cassandra database. We will look into how to use phantom library for achieving this. phantom is a library that is written in Scala and acts as a wrapper on top of the cassandra-driver (Java driver for Cassandra).

The library phantom offers a very concise and typesafe way to write and design Cassandra interactions using Scala. In this post, I will be sharing my 2 cents for people getting started in using this library.

Prerequisites

Before getting started, I would like to make a few assumptions to keep this post concise and short as much as possible. My assumptions are that;

  1. You know what Cassandra is.
  2. You have Cassandra running locally or on a cluster for this tutorial.
  3. You have a piece of basic knowledge of what is Scala and sbt.

In this post, let’s write a simple database client for accessing User information. For simplicity sake, our User model would just have id, firstName, lastName and emailId as its fields. Since cassandra is all about designing your data-model based on your query use-case, we shall have two use-cases for accessing data and hence shall define two tables in cassandra.

The tables would have the following schema.

CREATE KEYSPACE user_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE user_keyspace.user_by_first_name (
firstname text PRIMARY KEY,
email text,
id uuid,
lastname text
);
CREATE TABLE user_keyspace.user_by_id (
id uuid PRIMARY KEY,
email text,
firstname text,
lastname text
);
view raw keyspace.cql hosted with ❤ by GitHub

Make sure your database has this schema configured before proceeding further.


Step 0: Add phantom library to the dependencies

Let us start by creating a simple sbt scala project and add the following dependency to build.sbt file to the libraryDependencies.

"com.outworkers" % "phantom-dsl_2.12" % "2.30.0"

Step 1: Define your model class

In cassandra, it is totally acceptable to denormalize the data and have multiple tables for the same model. phantom is designed to allow this. We define one case class as the data holder and several model classes that correspond to our different use cases. Let us go ahead and define the model and case classes for our User data-model.

// Base model used as DTO
case class User(id: UUID, firstName: String, lastName: String, email: String)
view raw User.scala hosted with ❤ by GitHub

Now, we define the tables that correspond to the tables in Cassandra. This allows phantom to construct the queries for us in a typesafe manner.

The following are the use cases for which we would need clients;

  1. To access the data using the user id. (user_by_id table)
  2. To access the data using the user’s first name. (user_by_first_name table)

We create appropriate models that reflect these tables in cassandra and define low-level queries using phantom-dsl to allow phantom to construct queries for us so we don’t have to write any cql statements ever in our application.

// Query based model
abstract class UserById extends Table[UserById, User] {
// Override table metadata
override def tableName: String = "user_by_id"
// Define the table schema
object id extends UUIDColumn with PartitionKey
object fName extends StringColumn {
override def name: String = "firstName"
}
object lName extends StringColumn {
override def name: String = "lastName"
}
object email extends StringColumn
// Define low level queries
def createUserById(uuid: UUID, firstName: String, lastName: String, emailId: String): Future[ResultSet] = {
insert
.value(_.id, uuid)
.value(_.fName, firstName)
.value(_.lName, lastName)
.value(_.email, emailId)
.future()
}
def getUserById(uuid: UUID): Future[Option[User]] = select
.all()
.where(_.id eqs uuid)
.one()
}
abstract class UserByFirstName extends Table[UserByFirstName, User] {
// Override table metadata
override def tableName: String = "user_by_first_name"
// Define the table schema
object fName extends StringColumn with PartitionKey {
override def name: String = "firstName"
}
object id extends UUIDColumn
object lName extends StringColumn {
override def name: String = "lastName"
}
object email extends StringColumn
// Define low level queries
def createUserByUserName(uuid: UUID, firstName: String, lastName: String, emailId: String): Future[ResultSet] = {
insert
.value(_.id, uuid)
.value(_.fName, firstName)
.value(_.lName, lastName)
.value(_.email, emailId)
.future()
}
def getUserByFirstName(firstName: String): Future[Option[User]] = select
.all()
.where(_.fName eqs firstName)
.one()
}
view raw UserModel.scala hosted with ❤ by GitHub

You can access the state of the project until this step in GitHub here.

Now that we have our models and low level queries defined, we need to design an effective way to manage our session and database instances.

Step 2: Encapsulate database instances in a Provider

Since we have interactions with multiple tables for a single model User, phantom provides a way to encapsulate these database instances at one place and manage it for us. This way, our implementations won’t be scattered around and will be effectively managed with the appropriate session object.

So, in this step, we define an encapsulation for all the database instances by extending phantom‘s Database class. This is where we create instances for the models we defined in the above step.

import com.outworkers.phantom.dsl._
// This class will encapsulate all the valid database instances
class UserDatabase(implicit cassandraConnection: CassandraConnection)
extends Database[UserDatabase](connector = cassandraConnection) {
object userByIdInstance extends UserById with Connector
object userByFirstName extends UserByFirstName with Connector
}
// This trait will act as a database instance provider.
trait UserDbProvider extends DatabaseProvider[UserDatabase]

Notice that we also defined a trait extending DatabaseProvider[UserDatabase] in the above snippet. We will in the next step discuss how this trait is useful.

You can access the state of the project until this step here.

Step 3: Orchestrate your queries using DatabaseProvider

Now, that you have your model and database instances in place, exposing these methods may not be a good design. What if you need some kind of data-validation/data-enrichment before accessing these methods. Well, to be very specific in our case, we would need to enter data into two tables when a User record is created. To serve such purposes, we need an extra layer of abstraction for accessing the queries we defined. This is the exact purpose of DatabaseProvider trait.

We define our database access layer (like a service) by extending the trait DatabaseProvider and orchestrate our low-level queries so that the application can access data with the right abstraction.

import com.outworkers.phantom.dsl._
import scala.concurrent.Future
trait UserDatabaseService extends UserDbProvider {
// Orchestrate your low level queries appropriately.
def createUser(uuid: UUID, firstName: String, lastName: String, email:String): Future[Option[UUID]] =
database.userByIdInstance.createUserById(uuid, firstName, lastName, email)
.flatMap(_ => {
database.userByFirstName.createUserByUserName(uuid, firstName, lastName, email)
.map(rs => if(rs.wasApplied) Some(uuid) else None)
})
def selectUserById(uuid: UUID): Future[Option[User]] = database.userByIdInstance.getUserById(uuid)
def selectUserByFirstName(firstName: String): Future[Option[User]] = database.userByFirstName.getUserByFirstName(firstName)
}

You can see that we were able to combine both the inserts (to user_by_id & user_by_first_name) in one method call. We could have definitely, done some sort of validation or data-transformation if we wanted to here.

You can access the state of the project until this step here.

Step 4: Putting everything together

We are all set to the database service we created so far. Lets see how this is done.

We start out by creating our CassandraConnection instance. This is how we can inject out cassandra configuration into phantom and let it manage the database session for us.

implicit val cassandraConnection: CassandraConnection = {
ContactPoint.local.keySpace("user_keyspace")
}

Here we are using cassandra installed locally, hence we used ContactPoint.local. Ideally in real-world we would have to use ContactPoints(hosts).

Then we create our database encapsulation instance and the service object.

object UserDatabase extends UserDatabase
object databaseService extends UserDatabaseService {
override def database: UserDatabase = UserDatabase
}
view raw Service.scala hosted with ❤ by GitHub

Now, it is as simple as just calling a bunch of methods to test out if our database interactions work.

val userPair = for{
uuid <- databaseService.createUser(UUID.nameUUIDFromBytes("Sumanth".getBytes), "Sumanth", "Kumar", "sumanth@indywiz.com")
userByFirstName <- databaseService.selectUserByFirstName("Sumanth")
userById <- databaseService.selectUserById(uuid.get)
} yield (userById, userByFirstName)
val res = userPair.map {
case (userById, userByFirstName) =>
logger.info(s"User by Id : ${userById.get}")
logger.info(s"User by first name: ${userByFirstName.get}")
}
view raw Call.scala hosted with ❤ by GitHub

You can find the final state of the project here.

We might have had a lot of constructs like Database, DatabaseProvider, etc, bundled with phantom-dsl. But in my opinion, this is something that makes it more than just yet another scala dsl-library. It is because of these design constructs, phantom implicitly promotes good design for people using it.

Hope you found my rambling useful.

Standard
cassandra

Load test Cassandra – The native way – Part 2: The How

In the previous post we talked briefly about why do we need to load test a service. In this post we’ll continue on where we left at the last post on how to load test cassandra. So, lets get started on how to use this amazingly powerful tool; cassandra-stress.


Things to keep in mind:

  • Load test should be simulating the real-time scenario. So, it is very important to have this setup as close to the one in production. It is highly recommended that we use a separate node/host in proximity to the cluster for load testing (Eg: Deploy the load test server in the same region if you are deployment is in AWS).
  • Do not use any node from the cluster itself for load testing. It is not unusual to think that, since cassandra-stress is a tool that comes bundled with the cassandra distribution and logically it makes sense to directly use the tool in one of the nodes. Because, cassandra-stress is a heavy-weight process and can consume a lot of JVM resources and can in-turn cloud your node’s performance.
  • We should also keep in mind that cassandra-stress tool is not actually a distributed program, so in order to test a cluster, we need to make sure that memory is not a bottleneck, so I would recommend to have a host with at-least 16Gigs of memory.

How to use cassandra-stress:

Step 1 : The configuration file

The configuration file is the way to let cassandra-stress tool to prepare key-space and table and prepare data for the load test. We need to configure a bunch of properties for defining the keyspace, table, data-distribution for the test and the queries to test. 

keyspaceKeyspace name
keyspace_definitionDefine keyspace
tableTable name
table_definitionDefine the table definition
columnspecColumn Distribution Specifications 
inserinsertBatch Ratio Distribution Specifications
queriesA list of queries you wish to run against the schema
# Keyspace Name
keyspace: keyspace_to_load_test
# The CQL for creating a keyspace (optional if it already exists)
keyspace_definition: |
CREATE KEYSPACE keyspace_to_load_test with replication = {'class': 'SimpleStrategy', 'replication_factor' : '3'}
# Table name
table: table_to_load_test
# The CQL for creating a table you wish to stress (optional if it already exists)
table_definition: |
CREATE TABLE table_to_load_test (
id uuid,
column1 text,
column2 int,
PRIMARY KEY((id), column1))
### Column Distribution Specifications ###
columnspec:
– name: id
population: GAUSSIAN(1..1000000, 500000, 15) # Normal distribution to mimic the production load
– name: column1
size: uniform(5..20) # Anywhere from 5 characters to 20 characters
cluster: fixed(5) #Assuming that we would be having 5 distinct carriers
– name: column2
size: uniform(100..500) # Anywhere from 5 characters to 20 characters
### Batch Ratio Distribution Specifications ###
insert:
partitions: fixed(1) # We are just going to be touching single partiton with an insert
select: fixed(1)/5 # We would want to update 1/5th of the rows in the partition at any given time
batchtype: UNLOGGED # No batched inserts
#
# A list of queries you wish to run against the schema
#
queries:
queryForUseCase:
cql: select * from table_to_load_test where id = ? and column1 = ?
fields: samerow

Now that we have this configuration file ready, we can use this to run our load test by using the cassandra-stress tool. Lets see how to run the tool now.


Step 2 : Command options

cassandra-stress tool comes bundled with your cassandra distribution download. You will be able to find the tool in apache-cassandra-<version>/tools/bin/.apache-cassandra-<version>/tools/bin/.  You can also learn the options available more deeply by checking out the help option in the tool. I will go thru an example and show you how to run the tool in this post. 

cassandra-stress user profile=stresstest.yaml duration=4h 'ops(insert=100, queryForUseCase =1)' cl=LOCAL_QUORUM -node <nodelist seperated by commas> -rate 'threads=450' throttle=30000/s -graph file="stress-result-4h-ratelimit-clients.html" title=Stress-test-4h -log file=result.log

Lets go over the options I used one by one to understand what they mean. This is by no means a comprehensive explanation. I would highly recommend giving the documentation a good read to know more about these options.

userSpecify the tool to say that cassandra-stress is used for running a load test on User specified schema.
profileSpecify where the configuration file (yaml file) exist.
durationDuration for which your load test should run
opsOperations defined in the yaml file to be included as part the load test. In our example it is insert and queryForUseCase defined in the yaml file.
clConsistency level for your operations
nodeNodes in the cluster
rate# of threads and peak ops/sec limit
graphGraphical report of the run. Specify the file name and title of the report
logLog file name

It is as simple as this. The tool will now run for the duration specified and output a detailed report on the run.

I hope you found this helpful and would certainly be delighted to answer any question regarding this.  

Standard
cassandra

Load test Cassandra – The native way – Part 1: The Why

Load testing is an imperative part of the software development process. The idea is to test out a feature/service in a prod-like environment with a realistic high load for an unusual time frame, just to gain confidence that the service would not bail out on us during critical times. Quite logical isn’t it? In this series, I’ll go thru my very brief experience load testing a schema in cassandra. So lets get started right way!

With micro-service architecture being a norm at almost every turn in software development, it is worth spending time, talking about how to load test a micro-service. Is it going to be different than testing a monolithic service? Since we say that we test out a near prod-like setup, does it mean that I have to spend a whole lot and setup the same number of nodes that prod-cluster has? But, what if I have some kind of auto-scaling setup? These were a few questions that I had when I had to load-test a micro-service. The answer is quite simple mathematics; extrapolation. We simulate the load to a node and then extrapolate the result. This however, may not be accurate as there may be a few things that might be left out of the equation like network bandwidth, disk I/O, etc. It is also essential to load test the load-balancer to get a clear picture.

But wait! The above method works fine as long as each service have just one responsibility. How about load testing a scenario where the architecture is supposed to perform only if its a part of a cluster?. What do we do, if these processes talk to each other and gossip among them? There are many big-data architectures like this and one such service is cassandra. Fortunately, there is a tool that comes bundled with cassandra for this very purpose; cassandra-stress.

cassandra-stress was initially developed as an internal tool created by developers of cassandra to load test the internals of cassandra. Later, a mode was added to this tool to enable cassandra users test their schema.

I wouldn’t definitely claim that cassandra-stress is the only way to achieve this. In fact, load testing cassandra was possible way before this tool was generally available to test cassandra cluster. My online research yielded the next most popular public choice was to use a JMeter plugin. I choose cassandra-stress because of the obvious fact that its a native tool that comes bundled with cassandra and has a pretty easy learning curve.

Let’s go over how to configure your own load test using the cassandra-stress tool in another post.

Standard
cassandra

Cassandra – A shift from SQL

We have had lot of shifts in the paradigm in how we think about persisting data and retrieving them as efficiently as possible. For the longest time, we have had SQL as our goto solution for persisting data. SQL definitely is an awesome solution and that is the reason why it had survived the test of time for so long (Fun Fact: The first commercial RDBMS was Oracle which got released in 1979 [1]) and is definitely not in any space to go down in the near future.

 In my opinion, SQL gave us all a near perfect, generic solution for persisting data. SQL gives a lot of importance to things such as data-integrity, atomicity, normalization of data so the data is always in a consistent state whilst maintaining the query performance. Hence, it has got its own way to sort out things among itself with joins, foreign-keys, etc. Of-course, life is not always too fair. This magic that SQL does comes with a price to scale. SQL datastores are often not horizontally scalable and would require manual and application logic to shard the data to decrease the load. One other big challenge with SQL is having a single point of failure. The reason for this is again attributing to its inability to scale horizontally.

We were taught to think database design in this way, and hence that is what we do the best. But majority of the applications do not care about the size of the data or how its stored. On the other hand we don’t actually mind if the data is replicated in multiple locations. In fact, memory has become so cheap now, that we would love to have data duplication where ever possible.

With the intrusion of big data on almost every domain, it does not always make sense to hold on to SQL way of doing things. I’m in no way suggesting that SQL does not have future and everyone who are depending on big data need to resort to NoSQL way of doing things. The idea is to think by prioritizing the feature-set you would need for your datastore to have and, be open to pick and prioritize what you would need. Keep in mind that using both NoSQL and SQL hand-in-hand is not considered a bad practice at all. Just make sure that you are not over engineering your use case.

There are multiple options in the market right now. But, we are going to talk about one such datastore; Cassandra. Why Cassandra? Because, that is the one I have the most insight to talk about. Cassandra, has now reached a very mature state with many big-shots using it as their datastore. Few of them worth mentioning are Netflix, Apple, SoundCloud, etc. So, what is this Cassandra all about? Cassandra is a very powerful, battle-tested data-store that provides high availability with high read and write throughput.

I did talk about letting go few features that relational database provide could give you benefits such as ability to scale horizontally, increase the availability, etc. So, what is the compromise that we have to make if we choose Cassandra? It is the data model.

Data model is the best way you can fine tune your cassandra cluster. Relational databases often deal with creating a logical entity called tables, and relationships among the tables using Foreign-keys. Since Cassandra does not have any kind of joins or relation-ships, the data has to be stored in a denormalized fashion. This is actually not a bad thing in casssandra. But the catch is that, we need to know the query access patterns before designing the model. But if done properly, the performance that we get out of Cassandra is phenomenal. Here is a link that talks about the benchmarking on cassandra cluster at Netflix.

I would like to get this going as a series, so I will stop talking about cassandra now and we’ll start off with how to model your data in cassandra in a different post.

 

Standard