Wednesday, April 09, 2014
How to choose the right real-time big data platform
File under: Machine Room
Everyone seems to be doing big data nowadays. From SAP Hana, to Google’s BigQuery, Twitter’s Storm, Amazon’s Kinesis, Apache Drill, to ParStream, everyone claims to be able to deal with real-time big data and get the most out of the data you have. But if you dig a bit deeper, you quickly see that there’s a whole range of different technological approaches, applications with varying requirements, and application scenarios. I’ve discussed these different approaches already, but the real question is how do you decide which approach to take?
So what is real-time data anyway? Let’s look at some applications:
- web analytics, and other kinds of dashboards
- online ad optimization (including real-time bidding)
- monitoring and fraud detection
- churn prediction for online games or ecommerce
- optimizing devices or plants based on behavior/usage (“smartgrid”)
In most of these applications, you have to deal with evented data which comes in “in real-time”. Data is constantly changing and you usually want to consider the data over a certain time frame (“page views in the last hour”), instead of just taking all of the past data into account.
The four Aspects of Real-Time
In order to pick the right approach and technology, you need to understand your problem better. I think the following four aspects help to describe your requirements better.
There are two main challenges with real-time big data: the rate at which the data comes in, and the number of different objectes your data mentions. You’ve probably heard the terms velocity, and volume in connection with Big Data, but I’d rather call this the data rate, and the event range.
Data rate is basically the rate at which events come in. Data rate mostly dictates how much CPU power you need to process your events. It all depends on how much processing you have to do for one event, but once you go beyond a few 10k events per second, things start to get complicated, especially if you need to do disk based queries or updates.
Then there is the event range, that is, number of distinct object your data stream talks about. Large event ranges basically define how much memory you need (RAM or disk) to store all the information. For example, if you look at users in a social network, you will easily see a few millions or even hundred of millions if you go on long enough. In some cases, the range is unbounded, for example, because there are always more users signing up. If you consider combinations of objects (like users and items in a webshop), numbers can easily explode, too.
If you deal with either high data rate or large event ranges, but not both, you are still lucky. If you have high data rate but small event range, you can probably keep all the data in memory, and if you have large event ranges but low data rates, you can store the data in a database. The real challenge is dealing with high data rate and large event ranges.
Latency is the time it takes to compute the analysis you need, or to do a decision. Roughly, there are three different ranges:
- Results or decisions are required immediately. For applications like real-time bidding or online ad optimization, you only have a few millisecondstime to compute the result because you immediately have to do something with the outcome.
- A few seconds up to minutes For interactive use and dashboards, it’s ok if you get few results every few seconds or have to wait a few minute for results.
- Minutes up to hours. This includes most kinds of reporting and longer term data analysis.
For the discussion below we make a difference between the first case, low latency, and the other two cases, high latency.
The real question here is whether you can (or have to) react in real-time. If data comes in at 100k events per second, but there’s only a manager who will look at the aggregates once a week to adjust some business strategy, then latency is not an issue. On the other hand, if you are deploying a new landing page for your web site and you are looking for errors and sudden lack of visitors you need to have the results immediately. As a rule “It ain’t real- time if you cannot react in real-time.”
Technologically, as I’ll discuss in more depth below, this distinction is a very important one because it tells you whether you can stick with a more batch processing style approach or should rather process data as it comes in in a streaming fashion.
Next up, there’s the question of what exactly you want to do with your data. For some applications like analytics, you only need to compute simpler statistics like counts, averages, or medians over certain time windows.
Real-time recommendation is much trickier, as you need to first aggregate user and item activity, but then also need to do further costly computations to arrive at item or user similarties.
For something like churn prediction (that is, predicting whether a user is about to stop playing a game or using a service), you would first do a very complex analysis in order to train a classifer on all past data, and then deploy a system which computes the required features and outputs predictions in real-time on new data.
The key distinction here is the amount of information required to process an event:
- only local information (for example, updating a counter)
- global aggregates (for example, trends)
- iterative and more complex operations (machine learning and full blown data analysis)
As with programming languages, most approaches I’ll discuss below are suited for all of these tasks in principle. But as always, some things are easier to express in one system then another.
If you know me, you’ll probably already see where this is going, but I believe that this aspect is important and so far underrepresented. Not all applications demand exact numbers. Some certainly do like accounting, or counting retweets exactly for all users, but other applications like trending or profiling don’t. Generally speaking, exactness is usually not essential, when
- information is aggregated and reduced significantly (for example, taking all user activity and only looking for the top most active ones)
- only relative scores matter (profiling different kinds of user activity)
- there’s high volatility in the scores (very high data rate)
- you compute intermediate results which are used in an already inaccurate process afterwards (like computing features for churn prediction)
- you are mostly looking at exploratory data analysis. If you plot graphs which are 300 pixels high, there’s really no need to have total accuracy.
In all those cases you can usually deal with imprecision. You don’t really need to know these scores up to the 5th digit after the comma.
This distinction is important because dropping the requirement for exactness opens up entirely new algorithmic possibilities to tackle a huge amount of data with significantly reduce resource requirements.
So let’s talk about technology. Scale is always assumed to be high which leaves us with latency, complexity, and accuracy. The biggest difference here is latency. If you don’t require low latency, you can stick with more traditional approaches which first collect data on disk or in memory and then crunch it later, whereas low latency requires you to process the data as it comes in. I’ll discuss accuracy in the section on low latency, because it also allows for that kind of processing. Complexity is a topic of its own, discussed afterwards.
High Latency: Batch processing
If you don’t require your results within seconds, or even minutes, you’re already lucky because this means that you can stick to batch oriented approach.
In the simplest example, you can just write a piece of code which scans through log files to do what you want. Alternatively, you put all your data into a database and run queries on your data to compute the things you want. You can go with classical SQL databases or use something like Cassandra for pure storage, or CouchDB which can also run aggregation jobs for you.
Batch processing can be scaled effectively using something like Apache Hadoop (or one of the commerical offerings through Cloudera, HortonWorks, or MapR). You can store your log data in a distributed fashion on the cluster and then run queries in a parallel fashion to get your response times down. Technically speaking, Hadoop cannot scale just any workload, but only those where you first apply the same operation in parallel to all parts of your data (the map step) and then later merge the result (the reduce step), but a surprisingly large number of operations are covered by this approach.
One of the strengths of Hadoop is that it’s very mature and has attracted many projects which built upon Hadoop. One such example is Apache Drill, a vertical database (or column-oriented database) similar to Google’s Dremel on which BigQuery is based. Vertical databases are optimized for the kind of task where you have to scan whole tables and count entries matching some criterion. Instead of storing the data by row as in traditional databases, data is stored by column. So instead of storing data as in a log file, one line per entry, one takes each field of the data and stores it together, resulting in much better IO characteristics. HP Vertica, or ParStream are other vertical databases.
In the most basic version of batch processing, you will be processing all your data for each query. For example, if you want to compute page views from web server log data, you will go through all of the logs for the specified time range over and over again in order to collect the count. You could in principle also store intermediate results, but that’s something you have to setup, and maintain yourself.
The downside of batch processing is that it becomes very costly to scale beyond a certain point. There are significant startup times for every job that you run, and disk access is a natural limit on the response times you can expect.
Some products have started to replace the disk by memory as a storage medium, most notable SAP Hana, but also GridGain, or Apache Spark to get around the disk speed limitation. On the other hand, these systems are still essentially batch processing in nature, although turnaround times between queries can become quite small. Also, memory as main storage is pretty expensive still.
One challenge with batch approaches is that you need to make sure you clean out old data eventually, in particular if you put your data on disk. Not just because your disk will eventually be full, but also because your queries will take longer and longer to run as the size of the data and the indices grows.
Depending on your database, this might be a problem and require regular cleanup via VACUUM commands or other forms of house keeping. In general, data bases don’t like it when the data is changing a lot all the time.
Stream mining algorithms have recently also begun to be used in context of batch processing when you are dealing with massive event ranges. Instead of adding even more computing power and storage, stream mining algorithms can help you to get approximate answers quickly.
Low Latency: Stream Processing
If on the other hand you decide that you need up to the second results for your data, you essentially have to process your data as it comes in. That kind of approach is called stream processing.
The general approach is to have a little bit of code which processes each of the events separately. In order to speed the processing up, you can divide the stream (consistently, for example, by using a hash of some id), and then run the processing in multiple threads or even distributed over a cluster.
Apache Storm is a popular framework for that which is used at Twitter and other companies requiring this kind of real-time processing. Other examples are Amazon’s Kinesis, or the streaming capabilities of MapR. These frameworks take care of the scaling on multiple cluster nodes and come with varying degrees of support for resilience and fault tolerance, for example, through checkpointing, to make sure the system can recover from failure.
These stream processing frameworks only take care of parallelizing the computational load, meaning that you need an additional storage layor to store the results in order to be able to query them. While the current state of the computation is contained in the stream processing framework, there is usually no way to access or query this information. Depending on the amount of data you are processing, this might mean you need a pretty large and high performance storage backend.
Apache Spark, which I already mentioned in the section about in-memory batch, is an interesting new project that tries to close the gap between batch and streaming by taking a hybrid approach. Events are collected in a buffer and then processed at fixed intervals (say every few seconds) in a batch fashion. Since Spark holds the data in memory, it can process the batches fast enough to keep up with the incoming data stream.
Finally, if exactness is not essential for you, stream mining based algorithms are also a very interesting alternative to stream processing, for example using our own streamdrill. The biggest advantage is when dealing with large event ranges, as stream mining algorithms allow you to bound the amount of required memory. Instead, the results will become less accurate. That way, you can deal with much larger event ranges than with a pure stream processing approach.
In summary, low latency processing generally means you need to process your data as it comes in, updating intermediate results accordingly. Such an approach can be scaled using stream processing frameworks like Storm. Newer Big Data frameworks like Spark try a hybrid approach which has lots of future potential. Stream mining algorithms like in streamdrill are an interesting alternative if exactness is not essential.
As I said above, all of the approaches discussed here provide quite powerful abstractions to do all kinds of data processing. Still, some operations are easier to express in one framework than the other, as is usual the case with software. Therefore it’s good if you have a general idea about the kind of data processing you’ll be most interested in in order to choose platform which matches your requirements well.
Let’s look at the three classes of computations I discussed above: local operations, global aggregates, and beyond.
All systems are naturally suitable for local operations. They are easy to parallelize. The same holds for global aggregates. In MapReduce, for example, the map step computes locally with the reduce step then aggregating the results. Only stream processing poses slightly higher obstacles to because you have to merge the different streams by hand to explicitly construct a global view.
When it comes to more complex algorithms, there are other challenges. Such algorithms often run in iterations, in order to compute some approximations to numeric optimization problems (most supervised learning algorithms), or specifically distribute information globally (like the random walk underlying PageRank).
Systems which not just provide a query language but more advanced constructs, including intermediate results, are better suited for such complex problems. Startup times can be drastically reduced by memory based approaches, making iteration feasible. System like Spark were specifically designed to allow for iterations and more complex operations beyond map and reduce.
Stream mining algorithms like in streamdrill might appear pretty basic at first, but since they are memory based and also provide storage for intermediate results using their counter algorithms (basically sums), they can also used for more complex processing. Algorithms might need to be adapted somewhat, though. For example, we’ve implemented a real-time recommendation algorithm on top of streamdrill.
Looking at higher-level frameworks, it’s interesting to see that these fall basically into two categories: SQL like query languages (Drill, Vertica, parstream, streambase), or functional collections (Scalding, Summingbird). So depending on your application area that might also be a feature to consider.
The bottom line
If you already have terabytes of information on a Hadoop cluster, there is probably little question in your mind how to start. Unless you are looking for low latency, you should just stick with Hadoop instead of building up a parallel infrastructure. The same holds if you already have your data within Google or the Amazon cloud, then you should probably just stick with their offerings, BigQuery, or Kinesis.
Another case is when you are just starting, and you don’t know yet what exactly you want to do with your data. In such cases, I think it’s wise to start with the simplest possible setup, that is a bunch of log files and maybe a few scripts in your favorite programming language to get started.
If you already have so much data that you cannot wait for it all to be processed, systems like Apache Drill allow good interactive use and provide a reasonable large set of functionality to have a good look at the data.
As you can imagine, I’m a big fan of stream mining frameworks like our own, streamdrill. Especially for exploratory purposes, accuracy is not essential, and you can quickly stream data through such a system to have a view at the data.
Another interesting tool for analysis is a product like Splunk, basically a search engine for log data.
Once you know what you want and have pinned down the requirements in terms of scale, latency, complexity, and exactness, you can pick the right tool and start developing.
As always, the more general the tool, the more work is required to make it work.
In summary, as with any piece of non-trivial technology, there’s no piece of technology uniformly better than all the rest. You need to know what you’re doing. I’ve tried to outline some aspects which help you to drill down on the alternatives.
Feel free to share your experience with real-time processing below in the comments!
Posted by Mikio L. Braun at Wed Apr 09 14:53:00 +0200 2014.
Monday, February 17, 2014
I don’t know whether this word exists, but mainstreamification is what’s happening to data analysis right now. Projects like Pandas or scikit-learn are open source, free, and allow anyone with some Python skills do lift some serious data analysis. Projects like MLbase or Apache Mahout work to make data analysis scalable such that you can tackle those terabytes of old log data right away. Events like the Urban Data Hack, which just took place in London, show how easy it has become to do some pretty impressive stuff with data.
The general message is: Data analysis has become super easy. But has it? I think people want it to be, because they have understood what data analysis can do for them, but there is a real shortage in people who are good at it. So the usual technological solution is to write tools which empower more people do it. And for many problems, I agree that this is how it works. You don’t need to know TCP/IP to fetch some data from the Internet because there are libraries for that, right?
For a number of reasons, I don’t think that you can “toolify” data analysis that easily. I wished it would be, but from my hard-won experience with my own work and teaching people this stuff, I’d say it takes a lot of experience to be done properly and you need to know what you’re doing. Otherwise you will do stuff which breaks horribly once put into action on real data.
And I don’t write this because I don’t like the projects which exists, but because I think it is important to understand that you can’t just give a few coders new tools and they will produce something which works. And depending on how you want to use data analysis in your company, this might break or make your company.
So my top four reasons are:
- data analysis is so easy to get wrong
- it’s too easy to lie to yourself about it working
- it’s very hard to tell whether it could work if it doesn’t
- there is no free lunch
Let’s take these one at a time.
Data Analysis is so easy to get wrong
If you use a library to go fetch some data from the Internet, it will give you all kinds of error message when you do something wrong. It will tell you if the host doesn’t exist or if you called the methods in the wrong order. The same is not true for most data analysis methods, because these are numerical algorithms which will produce some output even if the input data doesn’t make sense.
In a sense, Garbage In Garbage Out is even more true for data analysis. And there are so many ways to get this wrong, like discarding important information in a preprocessing step, or accidentally working on the wrong variables. The algorithms don’t care, they’ll give you a result anyway.
The main problem here is that you’ll probably not even notice it apart from the fact that the performance of your algorithms isn’t what you expect them to be. In particular when you work with many input features, there is really no way to look at the data. You are basically just working with large tables.
This is not just hypothetical, I have experienced many situations where exactly that happened, where people were accidentally permutating all their data because they messed up reading the data from the files, or did some other non-obvious preprocessing which destroyed all the information in the data.
So you always need to be aware of what you are doing and have to mentally trace the steps to have a well-informed expectation about what you are doing. It’s debugging without an error message, often you just have a gut feeling that something is quite wrong.
Sometimes it’s not even that the performance is bad, but also because it’s just very good. Let’s come to that next.
It’s too easy to lie to yourself about it working
The goal in data analysis is always good performance on future, unseen data. This is quite a challenge. Usually you start working from collected data, which you hope is representative of the future data. But it is so easy to fool yourself into thinking it works.
The most important rule is that only test performance on data which you haven’t used in any way for training is reliable for future performance. However, this rule can be violated in many, sometimes subtle ways.
The classical novice mistake is to just take the whole data set, train an SVM or some other algorithm and look at the performance you get on the data set you used for training. Obviously, it will be quite good. In fact, you can achieve perfect predictions when you just output the values you got for training (ok, if they are unambiguous) without any real learning taking place at all.
But even if you split your data right, people often make the mistake of using information from the test data in the preprocessing (for example for centering, or building dictionaries, etc.). So you do the actual training only on the training data, but through the preprocessing information has silently crept into the test data as well, giving results which are much better than what you can realistically expect on real data.
Finally, even if you do proper testing and evaluation of your method, your estimates of future performance will become optimistic as you try out many different approaches because you implicitly optimize for the test set as well. This is called multiple testing and something one has to be aware of, too.
One can be trained to do all this properly, but if you are under the pressure to produce results, you have to resist the temptation to just run with the first thing which gives good numbers. And it helps if you’ve gone that route once and failed miserably.
And even if you did evaluate according to all secrets of the trade, the question is still whether the data you worked on was really representative of future data.
It’s very hard to tell whether it could work if it doesn’t
A different problem is that it is fundamentally difficult to know whether you can get better if your current approach doesn’t work well. The first thing you try will most likely not work, as will probably the next thing, and then you need someone with experience to tell you whether there is a chance or not.
There is really no way to automatically tell whether a certain approach works or not. The algorithms just extract whatever information fits their model and the representation of the data, but there are many, many ways to do this differently, and that’s when you need a human expert.
Over time you develop a feeling for whether a certain piece of information is contained in the data or not, and ways to make that information more prominent through some form of preprocessing.
Tools only provide you with possibilities, but you need to know how to use them.
There is no free lunch
Now you might think “but can’t we build all that into the tools?” Self-healing tools which tell you when you make mistakes and automatically find the right preprocessing? I’m not saying that it’s impossible, but these are problems which are still hard and unsolved in research.
Also, there is no universally optimaly learning algorithm as shown by the No Free Lunch Theorem: There is no algorithm which is better than all the rest for all kinds of data.
No way around learning data analysis skills
So in essence, there is no way around properly learning data analysis skills. Just like you wouldn’t just give a blowtorch to anyone, you need proper training so that you know what you’re doing and produce robust and reliable results which deliver in the real-world. Unfortunately, this training is hard, as it requires familiarity with at least linear algebra and concepts of statistics and probability theory, stuff which classical coders are not that well trained in.
Still, it’s pretty awesome to have those tools around, back when I started with my Ph.D. everyone had his own private stack of code. Which is ok if you’re in research and need to implement methods from scratch anyway. So we’re definitely better off nowadays.
Posted by Mikio L. Braun at Mon Feb 17 12:33:00 +0100 2014.