One does not simply scale into real-time

Monday, October 10, 2011

Real-time seems to be the next big thing in big data. Map-Reduced has shown how to perform big analyses on huge data sets in parallel, and the next challenge seems to be to find a similar kind of approach to real-time.

When you look around the web, there are two major approaches out there which try to building something which can scale to deal with Twitter-firehose-scale amounts of data. One is starting with a MapReduce framework like Hadoop and somehow finagle real-time or at least streaming capabilities on it. The other approach starts with some event-driven “streaming” computing architecture and makes it scale on cluster.

These are interesting and very cool projects, however from our own experience with retweet analysis at TWIMPACT, I get the feeling that both approaches fall short of providing a definitive answer.

In short: One does not simply scale into real-time.

Real-Time Stream Analysis

So what is real-time stream analysis? (Apart from the fact that I seem to be unable to decide whether to write it with a hyphen or as one word like Donaudampfschifffahrtskapitänsmütze).

Basically, the idea is that you have some sort of event stream like Twitter messages, or Apache log data, or URL expansion requests at, which comes in at a high volume of several hundreds or even thousands of events per second. Let’s just focus on Twitter stream analysis for now.

Typical applications are to compute some statistics from the stream which summarizes it in a nice fashion. For example

  • what is the most frequently retweeted tweet?
  • what is the most frequently mentioned URL?
  • what are the most influental users (in terms of mentions/retweets)

These are pretty basic counting tasks. Very closely linked are questions like what the score of an arbitrary tweet or URL is, or what the first few hundred most influental users are, and so on. You can also compute more complex scores based on more of these numbers. For example, our own TWIMPACT score is based on all the counts of a user’s retweets, and something like the Klout score is also based on a number of statistics (I’m only assuming here, of course).

Since the data arrives in real-time, you typically also want to get the results in real-time (instead of a daily report which is already hours old when you get it). Ideally, you get updated scores with each event, also I’d say everything is ok if a query always takes less than a second.

Finally, you’d probably also want to be able to look at historical data to see how a user or retweet has performed in the past and to see whether its activity is going up or going down.

Just to give you an idea of the amount of data involved: Each tweet corresponds to about 1k of data (including all metadata). If we assume that we have about 1000 tweets per second (actually it’s probably more), then we get about 86.4 million tweets per day, or about 82.4GB of new data per day (about 30TB per year).

Now let’s discuss how you would approach this problem in a database centric fashion and using a stream processing framework.

Databases Approach

With “database approach” I try to cover a whole range including traditional relational databases, NoSQL databases and MapReduce frameworks. The common denominator is that you basically pipe your data into the database and use the built-in queries of the database to compute your statistics. Depending on the type of NoSQL database you are using you’ll probably have to do the analysis online to precompute all your statistics because you the database doesn’t have sufficient query capabilities.

As I see it, there are two main problems with this approach: First of all, the size of your database grows at a non-trivial rate. Unless you’re Google and you’ve planned for exponential growths of your data centers anyway, you will eventually run out of space and performance. Even if you assume that the reponse time will increase only logarithmically in your data size, your cluster will eventually become quite slow to deal with real-time data.

This will directly affect the time it will take for your queries to complete. However, as queries also put quite some load on your disks, this problem will only get worse over time.

At the same time, most of the data will likely be irrelevant for your current analysis. Tweets stop being retweeted, URLs fall out of fashion. In other word, there is a huge amount of historical baggage clogging up your servers. It is true that you still need this data in order to compute historical statistics, but note that the data doesn’t change anymore, so that it would make much more sense to analyse the data once and only put the results in some read-only storage.

You could of course try to keep the database size constant by periodically discarding old data. I think this is the right approach and I’ll discuss it in more detail below. Note, however that many databases cannot really deal well with huge amount of deletions, as they need some form of more or less disruptive garbage collection (vacuum, compaction, etc.) to actually free up the space.

Finally, one should also not forget that MapReduce doesn’t work with all kinds of problems, but only with problems which are already inherently easy to parallelize. Actually, if you look into research in parallel algorithms, you will see that almost no problems scale linearly in the number of available processors (and you will also see that many of the efficient algorithms work with shared mutable state! Ugh!)

Stream Processing

So if storing all your data on slow disks to process it later is the wrong approach, you should probably try to process the data as it comes in through some form of pipeline. This is more or less the idea behind stream processing. Two example frameworks are Storm, originally developed by BackType (recently acquired by Twitter), and S4 developed by Yahoo.

If you look closer, these are basically quite sophisticated frameworks to scale an actor based approach to concurrency. If you’re not familiar with it, it’s the idea to structure some computation in terms of independent small pieces of code which do not share state and communicate with one another through messages.

Frameworks like the ones above let you define a computation in terms of a number of processing nodes which may also run on different servers, with the ability to add more parallel workers of a certain kind on the fly to scale up processing resources where necessary.

This approach is essentially orthogonal to the database approach. In fact, stream processing frameworks usually don’t even deal with persistence, they only focus on the computation. Therefore, there is also nothing specific to real-time stream processing in these frameworks. In essence, they deal with the question of how to split up a computation into small independent parts and how to scale such a network on a cluster.

To me, the basic problem with this approach (and with actor based concurrency) is that it doesn’t deal well with peak volumes which surpass the computation bandwith. In fact, it’s even not so simple to say given such a network what the maxium throughput is. Conceptually, it is the throughput of the slowest component, but you also have to take the message routing topology into account.

Now, once more messages need to be processed than possible, somewhere in the system messages queues are starting to fill up. This is a general problem with such systems, not specific to actor based concurrency. The book Release It! contains some very entertaining and also frightening real-world war stories of what can go wrong with systems where you plug together components with quite different capacities.

Another problem is actor based concurrency tends to break a simple function which may fit on a single screen into dozens of classes, but that is another problem.

In any case, the question is how you can guarantee that your system will run stably even for high peak volumes, apart from just adding more nodes? The easiest thing would be to randomly drop events (resulting in a more or less consistent subsample which at least has the same distribution as the original data stream), but is there something different you could do?

Stream Distillation

So to summarize: Putting all your data into a database is problematic because the data steadily grows and computing statistics based on the data is too slow. You also don’t really need to keep all your data at hand to have an analysis of the current state of the stream.

Stream processing, on the other hand, is a nice tool to scale your computations, but it doesn’t deal well with peak volumes, and depending on how you persist your data, you run into the same scaling issues as the database centric approach.

In other words, what we need is a way to separate the data we currently need for our analysis from the historic data and analyses, and we also need a way to limit the bandwidth of events while having as little distortion as possible on the statistics we’re interested in.

As it turns out, these kinds of problems are closely related to “frequent itemset” problems in data mining. The idea is to identify the top most frequent items in a data stream with limited resources, both in terms of memory and computation. Such methods are discussed, for example, in Chapter 4 of Rajaraman and Ullman’s upcoming book Mining of Massive Datasets.

Most of these methods work by keeping a fixed amount of counts and then replacing the least frequent ones when a new type of event occurs. They come with some sort of theoretical guarantee to identify the correct set of items, or at least some bound on the differences in the count. However, when analysing retweets you get quite good results because only a fraction of the retweets is retweeted more than once, so that most of the slots are occupied by retweets which do not reoccur and discarding them doesn’t hurt.


As you might have expected, this is exactly the approach we took with TWIMPACT. All the demos you can see at our site are powered by our new trending backend (codename “Trevor”), which is basically an in-memory trend database built using ideas from stream mining, together with read-only snapshots on disk for later historical analyses.

These ideas can of course also be combined with databases and stream processing, but already without a huge amount of parallelism, we’re able to process a few thousand tweets per second.

In summary: it’s not enough to just scale your database needs and your computational algorithms, to make your analysis framework stable against peak volumes and sustainable in terms of data growth, you also need to separate your analysis database from the historic data and add some ideas from stream mining to extract a statistically approximate subsample with capped bandwidth from your data.

Posted by Mikio L. Braun at 2011-10-10 22:15:00 +0000

blog comments powered by Disqus