Hadoop Magazine

An Odyssey of Cassandra

Author: Eric Lubow, CTO of SimpleReach, Datastax Cassandra MVP
Twitter: @elubow
Website: http://eric.lubow.org

Hello, my name is Karolina and I am one of the creators of Hadoop Magazine. I would like to share with you the article about Cassandra which was published by Software Developer’s Journal.  Contact me directly by email for more articles.

 

Not everyone starts out by processing hundreds of millions or even billions of events per day. In fact, most people never get to that point or even have the prospect of getting to that point. But in case you do, there are a few tools and tips to help you ingest, process, analyze, and store all that data.

Big Data Truths

One of the infrequently mentioned truths of dealing with big data is that even with the right tools, 80% of the work of building a big data system is acquiring and refining the raw data into usable data. Keeping that in mind, you want to make the other 20% (or whatever the breakdown) as easy as possible. This includes everything from the underlying architecture to the monitoring and deployment systems that support your application. Any chance you have to simplify things, take advantage of it. It will pay off in the long run.

Sometimes simplifying things means using data patterns that you may not already be comfortable with. These can be different approaches to schema design to datacenter layouts to infrastructure monitoring. They also apply to data storage methodologies. Sometimes you need to store data in only one way; sometimes you need to store it in 4 ways, across 3 different data storage engines. There is no one size fits all or one methodology that fits every scenario. Be sure to think each one through.

The other dirty little secret of building these types of applications is that even though there are a lot of best practices, they may not all work for your use-case. There will likely be a lot of trial and error as you grow and bring in more data. The more features you add, the more you’ll have to ensure your existing systems and methodologies are up to the task in their current state. Working with large datasets is an iterative process. A new feature may mean adding an entirely new method of storing the data. Be flexible and ready to make mistakes and subsequently deal with those mistakes. It will happen, so embrace it.

The Beginning

But how does one even know where to start on building types of applications? The best advice is to begin with what you know. If you are a Ruby on Rails programmer with a background in MySQL, then start there. If you are a Django programmer with a background in Mongo, start there. Wherever your skills are to get you going in the most efficient manner, then that’s where you should begin.

The story of SimpleReach starts out like most companies start out, we built our MVP (minimum viable product) with what technologies we knew at the time and thought would be best. We started out nice and simple with 2 or 3 machines on Amazon Web Services. We put Mongo on a database server with all the bells and whistles on an m2.4xl machine with RAID’d EBS xfs drives for performance and simplification of backup. We built a front-end in Rails and jQuery and we were off to the races.

There were events coming in first at a few per minute, then a few second. And as the growth continued, the amount of data that we had to ingest and process continued to increase. And even though Mongo had been quick for us to get going and keep going, it didn’t afford us an easy way to deal with the loads of time series data entering our system at high volumes. We also found that in order to deal with time series data the way we needed to, there was a requirement for creating a lot of indexes in Mongo. Since indexes are required to be in memory (or would otherwise slow things down), that did not turn out to be a viable option.

So we investigated switching over to an RDBMS (relational database management sytem) like PostGRESQL or MySQL. The biggest problem we found was that increments were slow. Each event would usually require a transaction that consisted of multiple increments across multiple tables:

START TRANSACTION;

UPDATE social_actions SET tweets = tweets + 1

WHERE content_id=”86b134440441e19a1da298e433f45225” AND hourstamp= 1357084800000;

UPDATE account_social_actions SET account_tweets = account_tweets + 1

WHERE account_id=” 5fd608fac762f531e42040f1” AND hourstamp= 1357084800000;

COMMIT;

The problem here is that the above 2 statements in the transaction would only account for what needed to be done for the account and the content item levels. This wouldn’t handle any of our internal metrics or changing up the groupings of data so we can have different presentations for the user. This put us in a bit of a bind. So back to Mongo it was. Mongo’s ability to fire and forget increments and then pull them within a set of B-tree indexes was something that we still couldn’t afford to let go.

Transitions

This was the beginning of our journey into distributed database systems and eventually the world of Cassandra. As any good investigation would do, we lined up the major players (in our eyes) and moved forward. Describing the steps we took to compare Cassandra, HBase, Riak, and the like would make a full article by itself. So I will skip our methods for brevity and just say that we decided to go with Cassandra.

In a perfect world, the easiest transition from Mongo to Cassandra would just be to copy the data over and keep on going about our business. But we don’t live in a perfect world and a transition plan was required (and it wasn’t going to be easy). As with any other technology requirement, it had to be a seamless transition for the customer. That meant no downtime (or as little as possible) and no data loss.

We decided that for a large portion of the system, we would leave Mongo in place considering it was the data storage engine backend for our Ruby on Rails ORM (MongoID). Adding more work to the transition (moving users, accounts, settings, and other relationships) was not something we were interested in doing. When doing these kinds of transitions, you want the fewest number of moving parts possible at a time.

So we started by bring up 3 m1.xlarge AWS instances running Cassandra. We needed to get some of our existing data in there and see how our queries performed. We also needed to start getting live data flowing in so we could test our cluster’s performance under normal loads (and additionally to do some stress testing).

Let The Games Begin

In order to integrate everything into our infrastructure, we had to start at our edge servers (which were written in Node.JS). We tried the existing Node.JS driver and found it lacking for a variety of reasons. There were many global variable leaks, it didn’t have full Cassandra support (no composites or CQL), and it didn’t support connection pooling. That’s when we decided to build Helenus, the Node.JS driver for Cassandra.

By building the driver, not only did we intimately learn how Cassandra sends and receives data, but we also learned quite a bit about how it works internally. That’s when we first decided that our use/case for Cassandra was going to be heavily based on composites.

Composites

When we originally envisioned our schema, we saw ourselves doing quite a bit of using underscores as naming separators. We were making this choice based on the fact that we wanted to have finer grained control over our range slices. We use wide rows everywhere and our goal was to be able pull specific types of data with a single query. First we tried manually splitting columns.

=> (column=pageviews_data_1348983820074_c98998a0-0ac1-11e2-8aed-c51111062569, value={“user-agent”:”Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.79 Safari/537.4″,”user-id”:”299ed0b1-3033-4263-93d8-85d35f2fb9f2″,”ip”:”192.168.63.204″}, timestamp=1348983820074000)

=> (column=pageviews_referrer_1348983820074_c98998a0-0ac1-11e2-8aed-c51111062569, value=http://www.google.com/url?url=http%3A%2F%2Fexample.com, timestamp=1348983820074000)

=> (column=pageviews_ua_1348983820074_c98998a0-0ac1-11e2-8aed-c51111062569, value=Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.79 Safari/537.4, timestamp=1348983820074000)

While using underscores does accomplish the same type of thing, it is limiting. For instance, having all the data in column names where the column name is UTF8Type means that you can only compare UTF8Type columns to other UTF8Type columns. But with composite columns, you can start to make comparisons on based on the composite value type (UTF8Type, LongType, etc).

=> (column=pageviews:data:1348983820074:c98998a0-0ac1-11e2-8aed-c51111062569, value={“user-agent”:”Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.79 Safari/537.4″,”user-id”:”299ed0b1-3033-4263-93d8-85d35f2fb9f2″,”ip”:”192.168.63.204″}, timestamp=1348983820074000)

=> (column=pageviews:referrer:1348983820074:c98998a0-0ac1-11e2-8aed-c51111062569, value=http://www.google.com/url?url=http%3A%2F%2Fexample.com, timestamp=1348983820074000)

=> (column=pageviews:ua:1348983820074:c98998a0-0ac1-11e2-8aed-c51111062569, value=Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.4 (KHTML, like Gecko) Chrome/22.0.1229.79 Safari/537.4, timestamp=1348983820074000)

As a result, our data is now stored more like you see above. This means that we can now slice out all pageview (first composite, UTF8Type) referrer data (second composite, UTF8Type) that exists between 2 time periods (third composite, LongType) for everything within a logical time period (ie. that row). The problem with doing a manual sort is that you don’t get the benefit of a type-aware sort (meaning that everything will be sorted as strings; time UUIDs as strings instead of time, integers as strings instead of integers, etc).

Knowing the data storage patterns in advance like this is what allowed the use of static composites. We have intentionally avoided the use of dynamic composites to simplify our application logic both on the reading side and the writing side. Although it is supported, the use of dynamic composites is frowned upon unless absolutely required.

This was just the tip of the iceberg for what composites did for our schema. From there on in, every time there was a logical separation in data formats, we chose to use composites. In addition to more efficient storage, the other big bonus here is that composite column names are just easier to read than a long string of data separated by underscores.

To achieve this composite column setup we create our column family as follows.

create column family events

with column_type = ‘Standard’

and comparator = ‘CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.LongType,org.apache.cassandra.db.marshal.UUIDType)’

and default_validation_class = ‘UTF8Type’;

Once we did this, it then became easy to see and make use of composites in other ways. For instance, if we want to store information about a blog post, the data should be laid out similar to row below in order to keep things separated both visually and organized logically.

2311d5d0-8574-c25d-69e8-1e17aba15c67:

=> (column=meta:title, value=An Odyssey of Cassandra, timestamp=1468943520012000)

=> (column=tag:cassandra, value=, timestamp=1468943520012000)

=> (column=tag:database, value=, timestamp=1468943520012000)

=> (column=category:databases, value=, timestamp=1468943520012000)

=> (column=meta:published_date value=1468943520012000, timestamp=1468943520012000)

=> (column=meta:url, value=http://example.com/2013/an-odyssey-of-cassandra, timestamp=1468943520012000)

=> (column=author:ericlubow, value=, timestamp=1468943520012000)

Tuning Tunables

Now that we have added our existing data in to Cassandra, new data was coming in to the cluster, and all of it was organized in an efficient and visually pleasing manner, it was time to start pushing a production level read load at the cluster to really test things out. As it turns out, there are quite a few tunable parameters in Cassandra that can potentially get you some quick wins.

The first, and arguably most important set of knobs for Cassandra in AWS are the ones that have to do with connectivity of nodes. The parameter that we started with is changing the phi_convict_threshold to 10 from a default of 8. The phi_convict_threshold is what adjusts the sensitivity level of failure detection between nodes. This is especially useful on clusters built on AWS because the transient network responses can allow for nodes to seem to pop in and out of the ring during periods of network instability.

Several other parameters also lend themselves well to dealing with network transience. hinted_handoff_throttle_delay_in_ms and rpc_timeout_in_ms also apply in cases like this. By upping hinted_handoff_throttle_delay_in_ms to 150ms, there is more time given to a node that has to recover (ie bring it’s data back to a closer to consistent state). The rpc_timeout_in_ms is what allowed RPC commands a little more leeway to make it through network problems. The default value is set at 10000ms (or 10 seconds). During periods of high traffic, having the value set at 30000ms (or 30 seconds), allowed the nodes to stay alive and in the cluster more reliably.

If you are using Cassandra 1.2 or later, you can also take advantage of the binary protocol timeout properties. Currently the default setting for non-truncate requests that use the binary protocol are set for 10000ms (or 10 seconds). By upping that to 20000ms (or 20 seconds), you can tolerate more variances in the network.

# cassandra.yaml

phi_convict_threshold: 12

hinted_handoff_throttle_delay_in_ms: 150

rpc_timeout_in_ms: 30000

read_request_timeout_in_ms: 20000

range_request_timeout_in_ms: 20000

request_timeout_in_ms: 20000

write_request_timeout_in_ms: 20000

Tuning the JVM is also a critical path to go down when working with your Cassandra cluster. There are some quick wins here that are available here as well just by changing the MAX_HEAP_SIZE and HEAP_NEWSIZE variables in the cassandra-env.sh file. On an AWS m1.xlarge with approximately 15G of memory, it is safe to set the MAX_HEAP_SIZE to 8G. This gives plenty of room for the memtables and caching while leaving the system enough memory to function for the OS level tasks. As a general rule, HEAP_NEWSIZE is typically set as 100 megabytes per physical CPU core.

# cassandra-env.sh

MAX_HEAP_SIZE=”7G”

HEAP_NEWSIZE=”800M”

There are also many configuration items to change at the column family level. One of the easiest ways to get some big wins is with caching. There are 4 settings: NONE, KEYS_ONLY, ROWS_ONLY, and ALL. Getting the cache set properly on a column family could mean the difference between a 30ms response time for a query and a 250ms response time for a query. That might not seem like a lot unless you put that in the context of doing millions of queries per day. This could save minutes or even hours of query load on your system over the course of a day.

Another column family level configuration parameter is read_repair_chance (including the dclocal_read_repair_chance). What this parameter does is it tells Cassandra the frequency to check the other replicas in set to make sure the data read from this replica is the same. In other words, if you have an RF (replication factor) of 3, that means that when you read one copy of the data and read_repair_chance is met, the other two replicas are checked to ensure that the data is the same (after the request completes). If the data is not the same (ie. inconsistent), then a repair is enqueued for the problem range. dclocal_read_repair_chance is the same thing, only the replica checks stay in the same data center within the ring.

Another way to tune performance if you are working with a Cassandra codebase prior to 1.1 is sstable_compression settings. Prior to Cassandra 1.1, column family compression was turned off by default. In Cassandra 1.1 and forward, the SnappyCompressor is turned on by default for all newly created column families. It is not uncommon to see percentage speed gains in the double digits by enabling compression. If your column family has a read heavy load, then it’s likely best to use the SnappyCompressor library. For a more write heavy load (where reads are less frequent), then you can even save space in addition to the speed increase by enabling the DeflateCompressor. Depending on the size of your rows changing the chunk_length_kb option for SnappyCompression may yield some additional benefits. Typically this option would be used in cases where the rows in your column family are fairly static in size.

Management Beyond Performance Tuning

As your Cassandra cluster starts to grow, there are a few things that always need to taken into consideration. The biggest item for anyone watching systems is monitoring. Of course you need to monitor the system level metrics like disk space, IO, and memory, but you shouldn’t be alerting at the same levels that you would for on an application server. Unless you are using SSD backed machines, CPU will rarely be a bottleneck for any Cassandra related operations (including compactions).

There are a few metrics that your monitoring system should focus on to properly diagnose the correct time for upgrading a cluster. Your mileage may vary on what the thresholds within each metric might be, but the metrics themselves are fairly consistent.

One of the common first signs that your cluster is having IO troubles is the await (access wait) time. This is what a normal but active system’s access wait time looks like via “iostat –x”.

Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await svctm %util

xvdb 0.00 0.00 10.00 1.20 340.80 102.40 39.57 0.10 9.29 4.82 5.40

When compared to the following output of “iostat –x” on an overly active system for the same device, it is easy to see that the await time is really high.

Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await svctm %util

xvdb 73.40 44.50 667.20 1.40 24753.60 122.40 67.40 92.58 2976.68 2.39 88.00

Making more comparisons, it’s also worth noting that reads and writes per second are very high. There are a few potential ways of dealing with this situation. If it’s mostly reads, it may be as simple as tuning the read_repair_chance settings (as mentioned earlier). If it is read heavy, perhaps the RF is set too high for the number of nodes (an RF of 3 on a 3 node cluster will likely be overwhelming for the drives). The more likely solution is probably adding capacity.

It is also a good idea to keep your system on a consistent repair schedule. It is currently recommended that a repair is run at least once per replica less than every gc_grace_seconds. gc_grace_seconds is a column family specific setting that specifies the time to wait before garbage collecting tombstones (columns that are marked for deletion). The default is that a repair is recommended to be run every 864000 seconds (10 days).

Even if the application doesn’t do any deletes, it is still a good idea to run repairs regularly. This helps to keep data consistent across nodes when not doing local_quorum reads. When a read is done that is not a type of quorum read, the read_repair_chance algorithm is run. As mentioned earlier, if the read repair is hit, then the range is repaired.

In general, one of the best ways to keep your data consistent across nodes is to keep the repairs running regularly. Every time a repair on one node finishes, another one gets started. The repairs are never run on more than one replica in a replica set in a row. The additional benefit to running repairs regularly (other than the cleaning up of tombstones) is that the read_repair_chance will trigger fewer repairs if your cluster is consistently repairing itself. Note that this is only a good idea if you have the spare IO cycles to do this. If your cluster is under a constant strain (ie. IO bound), then run repairs as infrequently as is required until you can relieve the IO pressure and add more capacity to the cluster.

Where To Now?

Now that the cluster is up and running and tuned, it’s time to ensure that you’ve made good decisions. One way to test some of your decisions up to this point is to push additional load at your infrastructure as a method of stress testing. It is possible for the performance of the queries to degrade as more data is read (depending on your query pattern). In other words, if you are averaging 10 queries per second and things seem to be running well. Try doubling your query load and see if your application still performs as expected. This gives the added benefit of knowing that you have some additional room for growth or traffic spikes. If the reaction of the cluster when stressed is to slow down in response times, then rethinking the interaction between the application and the database may be necessary.

Although it’s a challenge to get completely comfortable when managing distributed systems, if you’ve thought through and attempted many of the things I’ve mentioned, you should be well on your way. Maintaining a Cassandra cluster is not too different from managing any other system; you should know the patterns that your system is subject to. You should know your peaks and valleys and watch your systems at those points for anything out of the ordinary.

It’s incredibly important that monitoring is being done on the cluster as well. It’s good to use something like Nagios for machine and OS level checks. There are a few checks that are typically common to all machines. One of those checks is for available disk space on a partition. While it’s typical to have an alert fire when < 20% of the partition is available, for the partition that Cassandra is on, the alert should fire much earlier. It is more beneficial to know when there is < 60% of the partition free (in other words, 40% of the partition is full). You will need all the additional space for compactions and cleanups.

A good rule to abide by is that there should be no more than about 300G-350G of data per node. This means that the size of the partition should be a minimum of 750G. This will give plenty of space for growth, compactions, and cleanups (ie. all the necessary evils of running a Cassandra cluster).

Looking Forward

One of the best things about a system that is maturing is the toolsets that spring up around them. As people continue to use Cassandra, there will be contributions to the community that come in the form of premade applications. Datastax has done this by not only putting together Hadoop (Hive and Pig) and building Cassandra integrations for them, but also with Solr. Netflix has also done some great things in this arena by releasing systems like Priam for token management or contributing their own Cassandra Java driver. At SimpleReach, we are also contributing with the Node.js driver Helenus.

As your cluster grows and Cassandra becomes more entrenched in your infrastructure, look for what’s being developed and contributed to by the community. There is probably at least one other person who is in the same boat as you (or has experienced that problem before). In other words, know where you can find answers about Cassandra or ask your questions. Of course you can use search engines, but IRC is the place for the latest and greatest answers in a constantly evolving environment. The channel is #cassandra on Freenode (irc.freenode.net). Everyone from the core developers to avid users (like myself) to new users hang out in there. Stop by and say hello or just watch the channel and learn from some of the experts. Good luck.

About the author: Eric Lubow, CTO of SimpleReach, began his career building secure systems at Guardian Digital before joining Conductor.com, where he helped develop and run their ad systems. After graduating from Rutgers University with a degree in Information Systems, Eric did multiple tours with the U.S. Army including serving in Iraq as a Lieutenant and Albania as a Captain. He is a board advisor for the IronMatt.org charity for pediatric brain tumors, a mixed martial artist, motorcyclist, and seasoned skydiver. He is also a Datastax Cassandra MVP.

Hello, my name is Karolina and I am one of the creators of Hadoop Magazine. I would like to share with you the article about Cassandra which was published by Software Developer’s Journal. Contact me directly by e-mail [email protected] for more articles.

 

 

April 29, 2014