Friday, August 22, 2014

Big Data & Machine Learning Convergence

More Linear Algebra and Scalable Computing

I recently had two pretty interesting discussions with students here at TU Berlin which I think are representative with respect to how big the divide between the machine learning community and the Big Data community still is.

Linear Algebra vs. Functional collections

One student is working on implementing a boosting method I wrote a few years ago using next-gen Big Data frameworks like Flink and Spark as part of his master thesis. I choose this algorithm because it the operations involved were quite simple: computing scalar products, vector differences, and norms of vectors. Probably the most complex thing is to compute a cumulative sum.

These are all operations which boil down to linear algebra, and the whole algorithm is a few lines of code in pseudo-notation expressed in linear algebra. I was wondering just how hard it would be to formulate this using a more “functional collection” style API.

For example, in order to compute the squared norm of a vector, you have to square each element and sum them up. In a language like C you’d do it like this:

double squaredNorm(int n, double a[]) {
  double s = 0;
  for(i = 0; i < n; i++) {
    s += a[i] * a[i];
  }
  return s;
}

In Scala, you’d express the same thing with

def squaredNorm(a: Seq[Double]) =
	a.map(x => x*x).sum

In a way, the main challenge here consists in breaking down these for-loops into these sequence primitives provided by the language. Another example: the scalar product (sum of product of the corresponding elements of two vectors) would become

def scalarProduct(a: Seq[Double], b: Seq[Double]) =
	a.zip(b).map(ab => ab._1 * ab._2).sum

and so on.

Now turning to a system like Flink or Spark which provides a very similar set of operations and is able to distribute them, it should be possible to use a similar approach. However, the first surprise was that in distributed systems, there is no notion of order of a sequence. It’s really more of a collection of things.

So if you have to compute the scalar product between the vectors, you need to extend the stored data to include the index of each entry as well, and then you first need to join the two sequences on the index to be able to perform the map.

The student is still half way through with this, but already it has cost considerable amount of mental work to rethink standard operations in the new notation, and most importantly, have faith in the underlying system that it is able to perform things like joining vectors such that elements are aligned in a smart fashion.

I think the main message here is that machine learners really like to think in terms of matrices and vectors, not so much databases and query languages. That’s the way algorithms are described in papers, that’s the way people think, and the way people are trained, and it would be tremendously helpful if there is a layer for that on top of Spark or Flink. There are already some activites in that direction like distributed vectors in Spark or the spark-shell in Mahout, and I’m pretty interested to see whether how they will develop.

Big Data vs. Big Computation

The other interesting discussion was with a Ph.D. student who works on predicting properties in solid state physics using machine learning. He apparently didn’t knew too much about Hadoop and when I explained it to him he also found it not appealing at all, although he is spending quite some compute time on the groups cluster.

There exists a medium sized cluster at TU Berlin for the machine learning group. It consists of about 35 nodes, and hosts about 13TB of data for all kinds of research projects from the last ten or so years. But the cluster does not run on Hadoop, it uses Sun’s gridengine, which is now maintained by Univa. There are historical reasons for that. Actually, the current infrastructure developed over a number of years. So here is the short-history of distributed computing at the lab:

Back in the early 2000s, people were still having desktop PCs under their desks. At the time, people were doing most of their work on their own computer, although I think disk space was already shared over NFS (probably mainly for backup reasons). As people required more computing power, people started to log into other computers (of course, after asking whether that was ok), in addition to several larger sized computers which were bought at the time.

That didn’t go well for a long time. First of all, manually finding computers with resources to spare was pretty cumbersome, and oftentimes, your computer would become very noisy although you weren’t doing any work yourself. So the next step was to buy some rack servers, and put them into a server room, still with the same centralized filesystem shared over NFS.

The next step was to keep people from logging in to individual computers. Instead, gridengine was installed, which lets you submit jobs in the form of shell scripts to execute on the cluster when there were free resources. In a way, gridengine is like YARN, but restricted to shell scripts and interactive shells. It has some more advanced capabilities, but people mostly submit it to run their programs somewhere in the cluster.

Compute cluster for machine learning research.

Things have evolved a bit by now, for example, the NFS is now connected to a SAN over fibre channel, and there exist different slots for interactive and batch jobs, but the structure is still the same, and it works. People use it for matlab, native code, python, and many other things.

I think the main reason that this system still works is that the jobs which are run here are mostly compute intensive and no so much data intensive. Mostly the system is used to run large batches of model comparison, testing many different variants on essentially the same data set.

Most jobs follow the same principle: They initially load the data into memory (usually not more than a few hundred MB) and then compute for minutes to hours. In the end, the resulting model and some performance numbers are written to disk. Usually, the methods are pretty complex (this is ML research, after all). Contrast this with “typical” Big Data settings where you have terabytes of data and run comparatively simple analysis methods or search on them.

The good message here is that scalable computing in the way it’s mostly required today is not that complicated. So this is less about MPI and hordes of compute workers, but more about support for managing long running computation tasks, dealing with issues of job dependency, snapshotting for failures, and so on.

Big Data to Complex Methods?

The way I see it, Big Data has so far been driven mostly by the requirement to deal with huge amount of data in a scalable fashion, while the methods were usually pretty simple (well, at least in terms to what is considered simple in machine learning research).

But eventually, more complex methods will also become relevant, such that scalable large scale computations will become more important, and possible even a combination of both. There already exists a large body of work for large scale computation, for example from people running large scale numerical simulation in physics or meterology, but less so from database people.

On the other hand, there is lots of potential for machine learners to open up new possibilities to deal with vasts amount of data in an interactive fashion, something which is just plain impossible with a system like gridengine.

As these two fields converge, work has to be done to provide the right set of mechanisms and abstractions. Right now I still think there is a considerable gap which we need to close over the next few years.

Posted by Mikio L. Braun at 2014-08-22 16:21:00 +0200.

Monday, August 11, 2014

The Streaming Landscape, 2014 edition: Convergence, APIs, Data Access

A year ago, I wrote a post on the real-time big data landscape, identifying different approaches to deal with real-time big data. As I saw it back then, there was sort of an evolution from database based approaches (put all your data in, run queries), up to stream processing (one event at a time), and finally algorithmic approaches relying on stream mining algorithsm, together with all kinds of performance “hacks” like parallelization, or using memory instead of disks.

In principle, this picture is still adequate in terms of the underlying mode of data processing, that is, where you store your data, whether you process it as it comes in or in a more batch oriented fashion later on, and so on, but there is always the question how to build systems around these approaches. And given the amount of money which is currently infused into the whole Big Data company landscape, quite a lot is happening in that area.

Convergence

Currently, there is a lot of convergence happening. One such example is the lambda architecture, which combines batch-oriented processing with stream processing to get both low-latency results (potentially inaccurate and incomplete) and results on the full data sets. Instead of scaling batch processing to a point where the latency is small enough, a parallel stream processing layer processes events as they come along, with both routes piping results into a shared database to provide the results for visualization or other kinds of presentation.

Some point out that one problem with this approach is that you potentially need to have all your analytics logic in two distinct, and conceptually quite different systems. But there are systems like Apache Spark, which can run the same code in a batch fashion or near-streaming in micro-batches, or Twitter’s Scalding, which can take the same code to run on Hadoop or Storm.

Others, like Linkedin’s Jay Kreps, ask why you can’t use stream processing also to recompute stuff in batch. Such systems can be implemented by combining a stream processing system with a system like Apache Kafka which is a distributed publish/subscribe event transport layer which doubles as a database for log data by retaining data for a predefined amount of time.

APIs: Functional Collections vs. Actors

These kinds of approaches make you wonder just how interchangable streaming and map-reduce style processing really is, whether it allows you to do the same set of operations. If you think about it, map-reduce is already very stream oriented. In classical Hadoop, both the data input and output to the map and reduce stage is presented via iterators and output pipes, so that you could in principle also stream by the data. In fact, Scalding seems to be taking advantage of exactly that.

Generally, this “functional collection” style APIs seem to become quite popular, as Spark and also systems like Apache Flink use that kind of approach. If you haven’t seen this before, the syntax is very close to the set of operations you have in functional languages like Scala. The basic data type is a collection of objects and you formulate your computations in terms of operations like map, filter, groupby, reduce, but also joins.

This raises the question what exactly streaming analytics is. For some, streaming is any kind of approach which allows you to process data in one go, without the need to go back, and also with more or less bounded resource requirements. Interestingly, this seems to naturally lead to functional collection style APIs, like illustrated in the toolz Python library, although one issue for me here is always that the functional collection style APIs imply that the computation ends at some point, when in reality, it does not.

The other family of APIs uses a more actor-based approach. Stream processing systems like Apache Storm, Apache Samza, or even akka use that kind of approach where you are basically defining worker nodes which take in a stream of data and output another one, and you construct systems by explicitly sending messages asynchronously around between those nodes. In this setting, the on-line nature of the computation is much more explicit.

I personally find actor based approaches always a bit hard to work with mentally, because you have to slice up operations into different actors just to parallelize when conceptually it’s just one step. The functional collection style approach works much better here, however, you then have to rely on the underlying system being able to parallelize your computations well. Systems like Flink take ideas from query optimizations in databases here to attack this problem which I think is a very promising approach.

In general, what I personally would like to see is even more convergence between the functional collection and actor based approaches. I haven’t found too much on that but, to me, that seems like something which is bound to happen.

Data Input and Output

Concerning data input and output, I find it interesting that all of these approaches don’t deal with the question of how to get at the results of your analysis. One of the key features of real-time is that you need to get results as the data comes in, so results have to be continuously updated. This is IMHO also not modelled well in the functional collection style APIs, which imply that the function call returns once the result is computed. Which is never when you process data in an online fashion.

The answer to that solution seems to be to use your highly parallelized, low-latency computation to deal with all the data, but then periodically write out results to some fast, distributed storage layer like a redis database and use that to query the results. It’s generally not possible to access a running stream processing system “from the side” to get at the state which is somewhere distributed in this system. While this approach is possible, it seems to me that it requires you to set up yet another distributed system just to store results.

Concerning data input, there’s of course the usual coverage of all possible kinds of input, from REST, UDP packages, messaging frameworks, log files, and so on. I currently find Kafka quite interesting, because it seems like a good abstraction of combination of a bunch of log files and a log database. You get a distributed set of log data together with the ability to go back in time and replay data. In a way, this is exactly what we had been doing with TWIMPACT when analyzing Twitter data.

Streamdrill

Which brings me back to streamdrill (you knew, this was coming, right?), less because I need to tell you just how great it is, but because it sort of defines where I stand in this landscape myself.

So far, we’ve mainly focussed on the core processing engine. The question of getting the data out has been answered quite differently from the other approaches, as you can directly access the results of your computation by querying the internal state via a REST interface. For getting historical data, you still need to push the data to a storage backend, though. Directly exposing the internal state of the computation is such a big detour from other approaches that I don’t see how you could easily retrofit streamdrill on top of Spark or akka, even though it would be great to get scaling capabilities that way.

I think the most potential for improvement with streamdrill is the part where you encode the actual computation. So far, streamdrill is written and deployed as a more or less classical Jersey webapp, which means that everything is very event-driven. We’re trying to separate functional modules from the REST endpoint code, but still it would need a fair understanding of Java webapps to write anything yourself (and I honestly don’t see data scientists doing that). Here, a more high-level, “functional collection”-style approach would definitely be better.

Posted by Mikio L. Braun at 2014-08-11 13:51:00 +0200.

Wednesday, July 02, 2014

What is Scalable Machine Learning?

Scalability has become one of those core concept slash buzzwords of Big Data. It’s all about scaling out, web scale, and so on. In principle, the idea is to be able to take one piece of code and then throw any number of computers at it to make it fast.

The terms “scalable” and “large scale” have been used in machine learning circles long before there was Big Data. There had always been certain problems which lead to a large amount of data, for example in bioinformatics, or when dealing with large number of text documents. So finding learning algorithms, or more generally data analysis algorithms which can deal with a very large set of data was always a relevant question.

Interestingly, this issue of scalability were seldom solved using actual scaling in in machine learning, at least not in the Big Data kind of sense. Part of the reason is certainly that multicore processors didn’t yet exist at the scale they do today and that the idea of “just scaling out” wasn’t as pervasive as it is today.

Instead, “scalable” machine learning is almost always based on finding more efficient algorithms, and most often, approximations to the original algorithm which can be computed much more efficiently.

To illustrate this, let’s search for NIPS papers (the annual Advances in Neural Information Processing Systems, short NIPS, conference is one of the big ML community meetings) for papers which have the term “scalable” in the title.

Here are some examples:

  • Scalable Inference for Logistic-Normal Topic Models

    … This paper presents a partially collapsed Gibbs sampling algorithm that approaches the provably correct distribution by exploring the ideas of data augmentation …

    Partially collapsed Gibbs sampling is a kind of estimation algorithm for certain graphical models.

  • A Scalable Approach to Probabilistic Latent Space Inference of Large-Scale Networks

    … With […] an efficient stochastic variational inference algorithm, we are able to analyze real networks with over a million vertices […] on a single machine in a matter of hours …

    Stochastic variational inference algorithm is both an approximation and an estimation algorithm.

  • Scalable kernels for graphs with continuous attributes

    … In this paper, we present a class of path kernels with computational complexity $O(n^2(m + \delta^2 ))$ …

    And this algorithm has squared runtime in the number of data points, so wouldn’t even scale out well even if you could.

Usually, even if there is potential for scalability, it usually something that is “embarassingly parallel” (yep, that’s a technical term), meaning that it’s something like a summation which can be parallelized very easily. Still, the actual “scalability” comes from the algorithmic side.

So how do scalable ML algorithms look like? A typical example are the stochastic gradient descent (SGD) class of algorithms. These algorithms can be used, for example, to train classifiers like linear SVMs or logistic regression. One data point is considered at each iteration. The prediction error on that point is computed and then the gradient is taken with respect to the model parameters, giving information about how to adapt these parameters slightly to make the error smaller.

Vowpal Wabbit is one program based on this approach and it has a nice definition of what it considers to mean scalable in machine learning:

There are two ways to have a fast learning algorithm: (a) start with a slow algorithm and speed it up, or (b) build an intrinsically fast learning algorithm. This project is about approach (b), and it’s reached a state where it may be useful to others as a platform for research and experimentation.

So “scalable” means having a learning algorithm which can deal with any amount of data, without consuming ever growing amounts of resources like memory. For SGD type algorithms this is the case, because all you need to store are the model parameters, usually a few ten to hundred thousand double precision floating point value, so maybe a few megabytes in total. The main problem to speed this kind of computation up is how to stream the data by fast enough.

To put it differently, not only does this kind of scalability not rely on scaling out, it’s actually not even necessary or possible to scale the computation out because the main state of the computation easily fits into main memory and computations on it cannot be distributed easily.

I know that gradient descent is often taken as an example for map reduce and other approaches like in this paper on the architecture of Spark, but that paper discusses a version of gradient descent where you are not taking one point at a time, but aggregate the gradient information for the whole data set before making the update to the model parameters. While this can be easily parallelized, it does not perform well in practice because the gradient information tends to average out when computed over the whole data set.

If you want to know more, this large scale learning challenge Sören Sonnneburg organized in 2008 still has valuable information on how to deal with massive data sets.

Of course, there are things which can be easily scaled well using Hadoop or Spark, in particular any kind of data preprocessing or feature extraction where you need to apply the same operation to each data point in your data set. Another area where parallelization is easy and useful is when you are using cross validation to do model selection where you usually have to train a large number of models for different parameter sets to find the combination which performs best. Again, even here there is more potential for even speeding up such computations using better algorithms like in this paper of mine.

I’ve just scratched the surface of this, but I hope you got the idea that scalability can mean quite different things. In Big Data (meaning the infrastructure side of it) what you want to compute is pretty well defined, for example some kind of aggregate over your data set, so you’re left with the question of how to parallelize that computation well. In machine learning, you have much more freedom because data is noisy and there’s always some freedom in how you model your data, so you can often get away with computing some variation of what you originally wanted to do and still perform well. Often, this allows you to speed up your computations significantly by decoupling computations. Parallelization is important, too, but alone it won’t get you very far.

Luckily, there are projects like Spark and Stratosphere/Flink which work on providing more useful abstractions beyond map and reduce to make the last part easier for data scientists, but you won’t get rid of the algorithmic design part any time soon.

Posted by Mikio L. Braun at 2014-07-02 17:38:00 +0200.

older posts