Wednesday, April 09, 2014
Mikio's Guide To Real-Time Big Data
How to choose the right real-time big data platform
File under: machine room
Everyone seems to be doing big data nowadays. From SAP Hana, to Google’s BigQuery, Twitter’s Storm, Amazon’s Kinesis, Apache Drill, to ParStream, everyone claims to be able to deal with real-time big data and get the most out of the data you have. But if you dig a bit deeper, you quickly see that there’s a whole range of different technological approaches, applications with varying requirements, and application scenarios. I’ve discussed these different approaches already, but the real question is how do you decide which approach to take?
So what is real-time data anyway? Let’s look at some applications:
- web analytics, and other kinds of dashboards
- online ad optimization (including real-time bidding)
- monitoring and fraud detection
- churn prediction for online games or ecommerce
- optimizing devices or plants based on behavior/usage (“smartgrid”)
In most of these applications, you have to deal with evented data which comes in “in real-time”. Data is constantly changing and you usually want to consider the data over a certain time frame (“page views in the last hour”), instead of just taking all of the past data into account.
The four Aspects of Real-Time
In order to pick the right approach and technology, you need to understand your problem better. I think the following four aspects help to describe your requirements better.
There are two main challenges with real-time big data: the rate at which the data comes in, and the number of different objectes your data mentions. You’ve probably heard the terms velocity, and volume in connection with Big Data, but I’d rather call this the data rate, and the event range.
Data rate is basically the rate at which events come in. Data rate mostly dictates how much CPU power you need to process your events. It all depends on how much processing you have to do for one event, but once you go beyond a few 10k events per second, things start to get complicated, especially if you need to do disk based queries or updates.
Then there is the event range, that is, number of distinct object your data stream talks about. Large event ranges basically define how much memory you need (RAM or disk) to store all the information. For example, if you look at users in a social network, you will easily see a few millions or even hundred of millions if you go on long enough. In some cases, the range is unbounded, for example, because there are always more users signing up. If you consider combinations of objects (like users and items in a webshop), numbers can easily explode, too.
If you deal with either high data rate or large event ranges, but not both, you are still lucky. If you have high data rate but small event range, you can probably keep all the data in memory, and if you have large event ranges but low data rates, you can store the data in a database. The real challenge is dealing with high data rate and large event ranges.
Latency is the time it takes to compute the analysis you need, or to do a decision. Roughly, there are three different ranges:
- Results or decisions are required immediately. For applications like real-time bidding or online ad optimization, you only have a few millisecondstime to compute the result because you immediately have to do something with the outcome.
- A few seconds up to minutes For interactive use and dashboards, it’s ok if you get few results every few seconds or have to wait a few minute for results.
- Minutes up to hours. This includes most kinds of reporting and longer term data analysis.
For the discussion below we make a difference between the first case, low latency, and the other two cases, high latency.
The real question here is whether you can (or have to) react in real-time. If data comes in at 100k events per second, but there’s only a manager who will look at the aggregates once a week to adjust some business strategy, then latency is not an issue. On the other hand, if you are deploying a new landing page for your web site and you are looking for errors and sudden lack of visitors you need to have the results immediately. As a rule “It ain’t real- time if you cannot react in real-time.”
Technologically, as I’ll discuss in more depth below, this distinction is a very important one because it tells you whether you can stick with a more batch processing style approach or should rather process data as it comes in in a streaming fashion.
Next up, there’s the question of what exactly you want to do with your data. For some applications like analytics, you only need to compute simpler statistics like counts, averages, or medians over certain time windows.
Real-time recommendation is much trickier, as you need to first aggregate user and item activity, but then also need to do further costly computations to arrive at item or user similarties.
For something like churn prediction (that is, predicting whether a user is about to stop playing a game or using a service), you would first do a very complex analysis in order to train a classifer on all past data, and then deploy a system which computes the required features and outputs predictions in real-time on new data.
The key distinction here is the amount of information required to process an event:
- only local information (for example, updating a counter)
- global aggregates (for example, trends)
- iterative and more complex operations (machine learning and full blown data analysis)
As with programming languages, most approaches I’ll discuss below are suited for all of these tasks in principle. But as always, some things are easier to express in one system then another.
If you know me, you’ll probably already see where this is going, but I believe that this aspect is important and so far underrepresented. Not all applications demand exact numbers. Some certainly do like accounting, or counting retweets exactly for all users, but other applications like trending or profiling don’t. Generally speaking, exactness is usually not essential, when
- information is aggregated and reduced significantly (for example, taking all user activity and only looking for the top most active ones)
- only relative scores matter (profiling different kinds of user activity)
- there’s high volatility in the scores (very high data rate)
- you compute intermediate results which are used in an already inaccurate process afterwards (like computing features for churn prediction)
- you are mostly looking at exploratory data analysis. If you plot graphs which are 300 pixels high, there’s really no need to have total accuracy.
In all those cases you can usually deal with imprecision. You don’t really need to know these scores up to the 5th digit after the comma.
This distinction is important because dropping the requirement for exactness opens up entirely new algorithmic possibilities to tackle a huge amount of data with significantly reduce resource requirements.
So let’s talk about technology. Scale is always assumed to be high which leaves us with latency, complexity, and accuracy. The biggest difference here is latency. If you don’t require low latency, you can stick with more traditional approaches which first collect data on disk or in memory and then crunch it later, whereas low latency requires you to process the data as it comes in. I’ll discuss accuracy in the section on low latency, because it also allows for that kind of processing. Complexity is a topic of its own, discussed afterwards.
High Latency: Batch processing
If you don’t require your results within seconds, or even minutes, you’re already lucky because this means that you can stick to batch oriented approach.
In the simplest example, you can just write a piece of code which scans through log files to do what you want. Alternatively, you put all your data into a database and run queries on your data to compute the things you want. You can go with classical SQL databases or use something like Cassandra for pure storage, or CouchDB which can also run aggregation jobs for you.
Batch processing can be scaled effectively using something like Apache Hadoop (or one of the commerical offerings through Cloudera, HortonWorks, or MapR). You can store your log data in a distributed fashion on the cluster and then run queries in a parallel fashion to get your response times down. Technically speaking, Hadoop cannot scale just any workload, but only those where you first apply the same operation in parallel to all parts of your data (the map step) and then later merge the result (the reduce step), but a surprisingly large number of operations are covered by this approach.
One of the strengths of Hadoop is that it’s very mature and has attracted many projects which built upon Hadoop. One such example is Apache Drill, a vertical database (or column-oriented database) similar to Google’s Dremel on which BigQuery is based. Vertical databases are optimized for the kind of task where you have to scan whole tables and count entries matching some criterion. Instead of storing the data by row as in traditional databases, data is stored by column. So instead of storing data as in a log file, one line per entry, one takes each field of the data and stores it together, resulting in much better IO characteristics. HP Vertica, or ParStream are other vertical databases.
In the most basic version of batch processing, you will be processing all your data for each query. For example, if you want to compute page views from web server log data, you will go through all of the logs for the specified time range over and over again in order to collect the count. You could in principle also store intermediate results, but that’s something you have to setup, and maintain yourself.
The downside of batch processing is that it becomes very costly to scale beyond a certain point. There are significant startup times for every job that you run, and disk access is a natural limit on the response times you can expect.
Some products have started to replace the disk by memory as a storage medium, most notable SAP Hana, but also GridGain, or Apache Spark to get around the disk speed limitation. On the other hand, these systems are still essentially batch processing in nature, although turnaround times between queries can become quite small. Also, memory as main storage is pretty expensive still.
One challenge with batch approaches is that you need to make sure you clean out old data eventually, in particular if you put your data on disk. Not just because your disk will eventually be full, but also because your queries will take longer and longer to run as the size of the data and the indices grows.
Depending on your database, this might be a problem and require regular cleanup via VACUUM commands or other forms of house keeping. In general, data bases don’t like it when the data is changing a lot all the time.
Stream mining algorithms have recently also begun to be used in context of batch processing when you are dealing with massive event ranges. Instead of adding even more computing power and storage, stream mining algorithms can help you to get approximate answers quickly.
Low Latency: Stream Processing
If on the other hand you decide that you need up to the second results for your data, you essentially have to process your data as it comes in. That kind of approach is called stream processing.
The general approach is to have a little bit of code which processes each of the events separately. In order to speed the processing up, you can divide the stream (consistently, for example, by using a hash of some id), and then run the processing in multiple threads or even distributed over a cluster.
Apache Storm is a popular framework for that which is used at Twitter and other companies requiring this kind of real-time processing. Other examples are Amazon’s Kinesis, or the streaming capabilities of MapR. These frameworks take care of the scaling on multiple cluster nodes and come with varying degrees of support for resilience and fault tolerance, for example, through checkpointing, to make sure the system can recover from failure.
These stream processing frameworks only take care of parallelizing the computational load, meaning that you need an additional storage layor to store the results in order to be able to query them. While the current state of the computation is contained in the stream processing framework, there is usually no way to access or query this information. Depending on the amount of data you are processing, this might mean you need a pretty large and high performance storage backend.
Apache Spark, which I already mentioned in the section about in-memory batch, is an interesting new project that tries to close the gap between batch and streaming by taking a hybrid approach. Events are collected in a buffer and then processed at fixed intervals (say every few seconds) in a batch fashion. Since Spark holds the data in memory, it can process the batches fast enough to keep up with the incoming data stream.
Finally, if exactness is not essential for you, stream mining based algorithms are also a very interesting alternative to stream processing, for example using our own streamdrill. The biggest advantage is when dealing with large event ranges, as stream mining algorithms allow you to bound the amount of required memory. Instead, the results will become less accurate. That way, you can deal with much larger event ranges than with a pure stream processing approach.
In summary, low latency processing generally means you need to process your data as it comes in, updating intermediate results accordingly. Such an approach can be scaled using stream processing frameworks like Storm. Newer Big Data frameworks like Spark try a hybrid approach which has lots of future potential. Stream mining algorithms like in streamdrill are an interesting alternative if exactness is not essential.
As I said above, all of the approaches discussed here provide quite powerful abstractions to do all kinds of data processing. Still, some operations are easier to express in one framework than the other, as is usual the case with software. Therefore it’s good if you have a general idea about the kind of data processing you’ll be most interested in in order to choose platform which matches your requirements well.
Let’s look at the three classes of computations I discussed above: local operations, global aggregates, and beyond.
All systems are naturally suitable for local operations. They are easy to parallelize. The same holds for global aggregates. In MapReduce, for example, the map step computes locally with the reduce step then aggregating the results. Only stream processing poses slightly higher obstacles to because you have to merge the different streams by hand to explicitly construct a global view.
When it comes to more complex algorithms, there are other challenges. Such algorithms often run in iterations, in order to compute some approximations to numeric optimization problems (most supervised learning algorithms), or specifically distribute information globally (like the random walk underlying PageRank).
Systems which not just provide a query language but more advanced constructs, including intermediate results, are better suited for such complex problems. Startup times can be drastically reduced by memory based approaches, making iteration feasible. System like Spark were specifically designed to allow for iterations and more complex operations beyond map and reduce.
Stream mining algorithms like in streamdrill might appear pretty basic at first, but since they are memory based and also provide storage for intermediate results using their counter algorithms (basically sums), they can also used for more complex processing. Algorithms might need to be adapted somewhat, though. For example, we’ve implemented a real-time recommendation algorithm on top of streamdrill.
Looking at higher-level frameworks, it’s interesting to see that these fall basically into two categories: SQL like query languages (Drill, Vertica, parstream, streambase), or functional collections (Scalding, Summingbird). So depending on your application area that might also be a feature to consider.
The bottom line
If you already have terabytes of information on a Hadoop cluster, there is probably little question in your mind how to start. Unless you are looking for low latency, you should just stick with Hadoop instead of building up a parallel infrastructure. The same holds if you already have your data within Google or the Amazon cloud, then you should probably just stick with their offerings, BigQuery, or Kinesis.
Another case is when you are just starting, and you don’t know yet what exactly you want to do with your data. In such cases, I think it’s wise to start with the simplest possible setup, that is a bunch of log files and maybe a few scripts in your favorite programming language to get started.
If you already have so much data that you cannot wait for it all to be processed, systems like Apache Drill allow good interactive use and provide a reasonable large set of functionality to have a good look at the data.
As you can imagine, I’m a big fan of stream mining frameworks like our own, streamdrill. Especially for exploratory purposes, accuracy is not essential, and you can quickly stream data through such a system to have a view at the data.
Another interesting tool for analysis is a product like Splunk, basically a search engine for log data.
Once you know what you want and have pinned down the requirements in terms of scale, latency, complexity, and exactness, you can pick the right tool and start developing.
As always, the more general the tool, the more work is required to make it work.
In summary, as with any piece of non-trivial technology, there’s no piece of technology uniformly better than all the rest. You need to know what you’re doing. I’ve tried to outline some aspects which help you to drill down on the alternatives.
Feel free to share your experience with real-time processing below in the comments!
Posted by Mikio L. Braun at 2014-04-09 14:53:00 +0200