Tag Archives: MapReduce

Congratulations To Bryan Fink And Joseph Blomstedt

July 9, 2012

We have some good news from the Basho Engineering Team to share with the community: Bryan Fink and Joseph Blomstedt have both had papers accepted to the Eleventh ACM SIGPLAN Erlang Workshop.

Bryan’s paper, “Experience Report: Distributed Computation on Dynamo-style Distributed Storage: Riak Pipe”, details the design and internals of Riak Pipe, the distributed processing framework that forms the foundation for Riak’s MapReduce engine. Bryan is the primary author of Riak Pipe.

Joseph’s submission, “Concurrent Property-based Testing: From Prototype to Final Implementation”, is based on the work that he and the team did (and continue to do) to test and bullet-proof the resiliency of Riak. (He gave a related talk at Erlang Factory this past March.)

They will both be part of the Workshop happening September 14th in Copenhagen, Denmark.

Congratulations to Bryan and Joseph!

The Basho Team

When API Compatible Isn't

June 18, 2012

There’s no quicker way to learn the unspoken, de facto standard components of an API than to write a compatible replacement for an old implementation. I was reminded of this recently while tracking down an issue with Riak’s MapReduce APIs.

It has now been over a year since Riak Pipe was announced, and nearly nine months since it was released with Riak to take over MapReduce processing. The compatibility layer we wrote to run Riak MapReduce jobs on Riak Pipe accepts the same query specification, provides the same data to the same processing functions, and produces output formatted in the same manner as the previous system … unless you consider the number of results in each message to be part of the format.

The format of each message in the external streaming format is a list of one or more results, marked by which phase produced them. Riak Pipe took full advantage of this for its naive first pass at compatibility: it put one result in each message. The system it replaced, though, chose to always deliver the entirety of a reduce result in one message, and also chose to batch map results into chunks of 100.

Consuming these two correctly is no different: just accumulate all the results into bins for each phase. When time and space are concerned, however, the manner in which that binning is done makes all the difference in the world.

To wit: in a world where all reduce results are delivered in one go and map results are delivered in batches of 100, binning with orddict:append_list/3, an O(MN)* operation (M = messages, N = results), is not horrible, because M is usually small. But, in a world where each map or reduce result is delivered as its own message, M is equal to N, meaning we’re dealing with an O(N^2) operation, 100 to N times more expensive. Growing in time and space (garbage production) at the square of the size of your input is not a solution fit for large amounts of data.

A couple of quick fixes in the 1.2 release have shrunk the growth factor back down to O(N) for the Erlang Protocol Buffers client library. Similar fixes have improved the same situation in the HTTP interface as well. Or, in simpler terms, Riak’s 1.2 release speeds up non-streamed MapReduce results by leaps and bounds.

We’ve been quite happy with the Riak Pipe implementation of Riak’s MapReduce system from usability and debugging standpoints. With all of the improvements we’ve made in the last year, and the few that we have planned for the next major release after 1.2 (delivery of multiple results per message, for example), we feel confident in deprecating the previous, “legacy” system in 1.2, in preparation for removing it entirely in the following release. Transitioning to exclusively Riak Pipe for MapReduce will clean up the codebase, simplifying maintenance and make way for future growth.

-Bryan

Riak and Hadoop Word Count Example

December 1, 2011

My last post was a brief description of the integration between Riak and Hadoop MapReduce. This, the follow up post, will be a bit more hands on and walk you through an example riak-hadoop MapReduce job. If you want to have a go, you need to follow these steps. I won’t cover all the details of installing everything, but I’ll point you at resources to help.

Getting Set Up

First you need both a Riak and Hadoop install. I went with a local devrel Riak cluster and a local Hadoop install in pseudo distributed mode. (A singleton node of both Hadoop and Riak installed from a package will do, if you’re pushed for time.)

The example project makes use of Riak’s Secondary Indexes (“2i”) so you will need to switch the backend on Riak to use LevelDB. (NOTE: This is not a requirement for riak-hadoop, but this demo uses 2i, which does require it). So, for each Riak node, change the storage backend from bitcask to eleveldb by editing the app.config.

riak_kv, [
%% Storage_backend specifies the Erlang module defining the storage
%% mechanism that will be used on this node.
{storage_backend, riak_kv_eleveldb_backend},
%% ...and the rest
]},

Now start up your Riak cluster, and start up Hadoop:

for d in dev/dev*; do $d/bin/riak start; done
cd $HADOOP_HOME
bin/start-dfs.sh
bin/start-mapred.sh

Next you need to pull the riak-hadoop-wordcount sample project from GitHub. Checkout the project and build it:

git clone https://github.com/russelldb/riak-hadoop-wordcount
cd riak-hadoop-wordcount
mvn clean install

A Bit About Dependencies…

Warning: Hadoop has some class loading issues. There is no class namespace isolation. The Riak Java Client depends on Jackson for JSON handling, and so does Hadoop, but different versions (naturally). When the Riak-Hadoop driver is finished it will come with a custom classloader. Until then, however, you’ll need to replace your Hadoop lib/jackson*.jar libraries with the ones in the lib folder of this repo on yourJobTracker / Namenode only. On your tasknodes, you need only remove the Jackson jars from your hadoop/lib directory, since the classes in the job jar are at least loaded. (There is an open bug about this in Hadoop’s JIRA. It has been open for 18 months, so I doubt it will be fixed anytime soon. I’m very sorry about this. I will address it in the next version of the driver.)

Loading Data

The repo includes a copy of Mark Twain’s Adventures Of Huckleberry Finn from Project Gutenberg, and a class that chunks the book into chapters and loads the chapters into Riak.

To load the data run the Bootstrap class. The easiest way is to have maven do it for you:

mvn exec:java -Dexec.mainClass=”com.basho.riak.hadoop.Bootstrap”
-Dexec.classpathScope=runtime

This will load the data (with an index on the Author field.) Have a look at the Chapter class if you’d like to see how easy it is to store a model instance in Riak.

Bootstrap assumes that you are running a local devrel cluster. If your Riak install isn’t listening on the PB interface on 127.0.0.1 on port 8081 then you can specify the transport and address like this:

mvn exec:java -Dexec.mainClass=”com.basho.riak.hadoop.Bootstrap”
-Dexec.classpathScope=runtime -Dexec.args=”[pb|http PB_HOST:PB_PORT|HTTP_URL]”

That should load one item per chapter into a bucket called wordcount. You can check it succeeded by running (being sure to adjust the url based on your configuration):

curl http://127.0.0.1:8091/riak/wordcount?keys=stream

Package The Job Jar

If you’re running the devrel cluster locally you can just package up the jar now with:

mvn clean package

Otherwise, first edit the RiakLocations in the RiakWordCount class to point at your Riak cluster/node, e.g.

conf = RiakConfig.addLocation(conf, new RiakPBLocation(“127.0.0.1″, 8081));
conf = RiakConfig.addLocation(conf, new RiakPBLocation(“127.0.0.1″, 8082));

Then simply package the jar as before.

Run The job

Now we’re finally ready to run the MapReduce job. Copy the jar from the previous step to your hadoop install directory and kick off the job.

cp target/riak-hadoop-wordcount-1.0-SNAPSHOT-job.jar $HADOOP_HOME
cd $HADOOP_HOME
bin/hadoop jar riak-hadoop-wordcount-1.0-SNAPSHOT-job.jar

And wait… Hadoop helpfully provides status messages as it distributes the code and orchestrates the MapReduce execution.

Inspect The Results

If all went well there will be a bucket in your Riak cluster named wordcount_out. You can confirm it is populated by listing its keys:

curl http://127.0.0.1:8091/riak/wordcount_out?keys=stream

Since the WordCountResult output class has RiakIndex annotations for both the count and word fields, you can perform 2i queries on your data. For example, this should give you an idea of the most common words in Huckleberry Finn:

curl 127.0.0.1:8091/buckets/wordcount_out/index/count_int/1000/3000

Or, if you wanted to know which "f" words Twain was partial too, run the following:

curl 127.0.0.1:8091/buckets/wordcount_out/index/word_bin/f/g

Summary

We just performed a full roundtrip MapReduce, starting with data stored in Riak, feeding it to Hadoop for the actual MapReduce processing, and then storing the results back into Riak. It was a trivial task with a small amount of data, but it illustrates the principle and the potential. Have a look at the RiakWordCount class. You can see that only a few lines of configuration and code are needed to perform a Hadoop MapReduce job with Riak data. Hopefully the riak-hadoop-wordcount repo can act as a template for some further exploration. If you have any trouble running this example, please let me know by raising a GitHub issue against the project or by jumping on the Riak Mailing List to ask some questions.

Enjoy.

Russell

Riak and Hadoop (Sitting in a tree)

November 29, 2011

It has been pointed out on occasion that Riak MapReduce isn’t real “MapReduce”, often with reference to Hadoop, which is. There are many times that Riak’s data processing pipeline is exactly what you want, but in case it isn’t, and you want to leverage existing Hadoop expertise and investment, you may now use Riak as an input/output, instead of HDFS.

This started off as a tinkering project, and it is currently released as riak-hadoop-0.2. I wouldn’t recommend it for production use today, but it is ready for exploratory work, whilst we work on some more serious integration for the future.

Input

Hadoop M/R usually gets its input from HDFS, and writes its results to HDFS. Riak-hadoop is a library that extends Hadoop’s InputFormat and OutputFormat classes so that a Riak cluster can stand in for HDFS in a Hadoop M/R job. The way this works is pretty simple. When defining your Hadoop job, you declare the InputFormat to be of type RiakInputFormat. You configure you cluster members and locations using the JobConf, and a helper class called RiakConfig. Your Mapper class must also extend RiakMapper, since there are some requirements for handling eventual consistency that you must satisfy. Apart from that, you code your Map method as if for a typical Hadoop M/R job.

Keys, for the splits

When Hadoop creates a Mapper task it assigns an InputSplit to that task. An input split is the subset of data that the Mapper will process. In Riak’s case this is a set of keys. But how do we get the keys to Map over? When you configure your job, you specify a KeyLister. You can use any input to Hadoop M/R that you would use for Riak M/R: provide a list of bucket/key pairs, a 2i query, a Riak Search query, or, ill advisedly, a bucket. The KeyLister will fetch the keys for the job and partition them into splits for the Mapper tasks. The Mapper tasks then access the data for the keys using a RiakRecordReader. The record reader is a thin wrapper around a Riak client, it fetches the data for the current key when the Hadoop framework asks.

Output

In order to output reduce results to Riak your Reducer only need implement the standard Reducer interface. When you configure the Job, just specify that you wish to use the RiakOutputFormat, and declare an output bucket as a target for results. The keys/values from your reduce will then be written to Riak as regular Riak objects. You can even specify secondary indexes, Riak metadata and Riak links on your output values, thanks to the Riak Java Client’s annotations and object mapping (courtesy of Jackson’s object mapper.)

Hybrid

Of course you don’t need to use Riak for both input and output. You could read from HDFS, process and store results in Riak, or read from Riak and store results in HDFS.

Why do this?

This is really a proof of concept integration. It should be of immediate use to anyone who already has Hadoop knowledge and a Hadoop cluster. If you’re a Riak user with no Hadoop requirements right now, I’d say, don’t go there at once: setting up a Hadoop cluster is way more complex than running Riak, and maintaining it is, operationally, taxing. If, however, you already have Hadoop, adding Riak as a data source and sink is incredibly easy, and gives you a great, scalable, live database for serving reads and taking writes, and you can leverage your existing Hadoop investment to aggregate that data.

What next?

The thinking reader might be saying “Huh? You stream the data in and out over the network, piecemeal?”. Yes, we do. Ideally we’d do a bulk, incremental replication between Riak and Hadoop (and back) and that is the plan for the next phase of work.

Summary

Riak-Hadoop enables Hadoop users to use a Riak cluster as a source and sink for Hadoop M/R jobs. This exposes he entire Hadoop toolset to Riak data (including the query languages like Hive and Pig!) This is only a first phase pass at the integration problem, and though usable today, smarter sync is coming.

Please clone, build, and play with this project. Have at it. There’s a follow up post with a look at an example Word Count Hadoop Map/Reduce job coming soon. If you can’t wait, just add a dependency on riak-hadoop, version 0.2 to your pom.xml and get started. Let me know how you get on, via the Riak mailing list.

Russell

Riak Pipe – the New MapReduce Power

September 19, 2011

A few months ago, I announced the opening of Riak Pipe, as well as two goals for the project. With the upcoming 1.0 release of Riak, we have achieved the first goal: new clusters will use Riak Pipe to power their MapReduce processing. Existing clusters will also be able to migrate to using Riak Pipe, with no changes needed from the client perspective.

There are a few reasons you should be excited about running your MapReduce queries on Riak Pipe. First and foremost, Riak Pipe is designed as a work distribution system, and as such, it is better able to take advantage of the parallel resources available in the cluster. One small example of how Riak Pipe achieves this is simply by splitting the “map” phase processing into two steps: fetching the object from Riak KV, and transforming it. This allows the work of each step to happen in parallel; the next input will be fetched while the transformation of the last one is in progress.

Riak Pipe also recognizes that a cluster’s resources are finite, and that sometimes it’s better to delay one pile of work in order to make progress on another. Processing phases in Riak Pipe, called fittings, provide backpressure to fittings upstream from them by means of limiting the sizes of their input queues. The upstream fittings pause their processing when the downstream queues are full, freeing up system resources (or at least not increasing their consumption) to allow those downstream processes a chance to catch up.

Input queues are another example of Riak Pipe’s parallel resource use. Inter-phase results are delivered directly from a vnode running one stage to the vnode that will process them for the next stage. Since they are not forced through a single, central process, the memory of the entire cluster can be used to move them around, instead of requiring a single machine’s memory to handle them.

The KV-object fetching stage of the new Riak Pipe MapReduce system is also much more of a well-behaved KV user. That is, the requests it makes are much more fairly balanced with respect to regular Riak KV operations (get, put, etc.). This means MapReduce on Riak Pipe should have much less impact on the performance of rest of your Riak use.

Using Riak Pipe MapReduce is simple. Make sure that the setting {mapred_system, pipe} is in the riak_kv section of your cluster’s app.config, and then … just send MapReduce queries over HTTP or Protocol Buffers as you always have. The results should be the same. There are a few knobs you can tweak, which control batching of reduce phase evaluation, but the goal of this release was a 100% compatible implementation of the existing MapReduce functionality.

There is much more on the horizon for Riak Pipe, including more efficiency gains and exposing some of the new processing statistics it tracks, not to mention exposing more of its functionality beyond Riak KV’s MapReduce. We’re very excited about the future.

If you would like to learn more about Riak Pipe, in general, and get involved, I recommend paging through the README to get an idea of its structure, and then browsing the new Riak KV MapReduce code for some examples.

-Bryan

Why MapReduce Is Hard

April 20, 2011

At the highest level, MapReduce can be a fundamentally hard concept to grasp. In our last blog post on MapReduce, you learned how easy it is. After all, it’s just code, right?

The story gets a bit more complicated as soon as you run MapReduce in a distributed environment like Riak. Some of these caveats are just things to be aware of, others are simply in the nature of both Riak’s implementation and the ideas in MapReduce itself. Adding to that, it’s just a hard concept to grasp. I’ll be the first to admit it took me a long time to understand.

MapReduce is hard because it’s just code

Wait, didn’t I just tell you that MapReduce is easy because it’s just code? I believe I did, Bob.

Code requires knowledge of what it does. Code requires the discipline to keep it clean and easy to grasp. Code requires knowledge of the data structures involved. Code requires knowledge of how Riak treats your MapReduce functions depending on their phase in a particular MapReduce request.

Take Riak’s map functions. Depending on whether you’re intending to chain multiple functions in one request, they must either return a bucket/key pair or the data for either the client or the following reduce function.

One way to solve this would be to add an extra argument, based on which the map function can decide whether it returns a bucket/key pair or the data. Why would you do this? Consider a scenario where you want to extract a set of objects based on a match in one attribute and then run a second map matching another attribute. Here’s an example:

javascript
var extractByMatchingAttribute = function(value, keydata, args) {
var doc = Riak.mapKeyValuesJson(value)[0];
if (doc.host.match(/basho.com/)) {
if (args.last) {
return [1];
} else {
return [[value.bucket, value.key]];
}
}
}

Now you can pass in an argument object (e.g. {last: true}) to every phase in a MapReduce request. This is a valid solution, but makes the map function painfully more aware of the environment in which it’s running.

“But hey, ” you wonder, “couldn’t I just make these into a single map function instead and return the value immediately?” You could, but depending on other MapReduce scenarios you could end up repeating a lot of code across a lot of functions. Once again, you could extract it, ignore it, or try to abstract, making map functions chainable through extra arguments, which then can be passed to every single map phase independently.

I told you, MapReduce is hard because it’s just code, didn’t I?

MapReduce is hard because it doesn’t (yet) have a common language

Wouldn’t it be nice if we could just use a simple language to express what we want our Riak cluster to do instead? Something like the following, where we have a bucket full of log entries, every one of them represented by a JSON object, which handily includes a host attribute:

sql
LOAD log_entries FROM KEY '2011-04-08' TO KEY '2011-05-10' INTO entries
FILTER entries BY host MATCHING /basho.com/ AND
GROUP entries BY host

There, three lines of easily readable text in a purely hypothetical example, although slightly inspired by Apache Pig. “Oh hey,” you’re saying, “doesn’t that look a bit like SQL?” Maybe it does, maybe it doesn’t, but that’s not the point. More important than that, it’s easier to read.

Will we see something along these lines in Riak? No comment.

MapReduce is hard because accessing lots of data is expensive

MapReduce is the technique du jour to query and filter data in Riak. It’s intended to be used as a tool to transform and analyze a set of data. You hand in a bunch of bucket/key pairs, and Riak feeds them into your map and reduce functions. Simple enough.

It has been our experience, however, that what users want is to feed the data from a whole bucket into a MapReduce request or a whole range of them. Thankfully you can do the latter using the rather nice key filters.

Still, fetching all the keys from a bucket requires a lot of resources. Key filters can certainly help to reduce it, because they fold the list of keys before the data is loaded from disk, which is certainly faster than loading everything right away. But it still takes a lot of time when there’s millions of keys stored in a Riak cluster. Not to mention the fact that every node in the cluster is involved in both fetching all objects from a bucket and running key filters.

If this is what you’re trying to achieve, you should have a look at Riak
Search
. It does a much better job at these reverse lookup queries (which some call secondary indexes), and you can neatly feed the results into a MapReduce request, something we’ve implemented in Riaktant, our little syslog collector and aggregator. But we’ll leave dissecting that code to another blog post.

We have some more tricks up our sleeve which should be appearing in the near future, so stay tuned. I’m sure it’ll blow your mind just like it blew mine.

MapReduce is hard because going distributed requires coordination

Whenever you fire off a MapReduce request in a Riak cluster, the node you’re requesting becomes the coordinator of this particular request. This poor node suddenly has the responsibility of sending your map request to all the relevant nodes in the cluster, using the the preference list where applicable.

The coordinating node suddenly has the job of sending out the map requests and waiting for them to return, triggering the timeout if it doesn’t. If the map phases return bucket/key pairs and there’s another map phase to run, it starts over, sending new requests to the nodes responsible for the pairs.

Map phases in a Riak cluster always run close to the data to reduce overhead both on network traffic and the general load in the cluster, as it greatly reduces the amount of data sent across the wire, and lets the nodes responsible for the data get some of that MapReduce action as well.

That’s a lot of work for the coordinating node, but except for the reduce part, it’s not necessarily heavy computationally. It does, however, have to track state of all the phases sent out to the cluster, collect the results and control behaviour in case of failure of just a single node in the cluster.

What can you do about this? Other than keep your initial data set for a MapReduce set as small as possible, the ball is pretty much in our court. It’s Riak’s task to make sure a MapReduce request is handled and coordinated properly.

MapReduce is hard because reduce runs recursively

Enter the brain bender called re-reduce. If you haven’t run into it, you’re in for a treat. A reduce function returns a list of aggregation results based on a list of inputs. Depending on how large the initial result set from the function is going to be, Riak’s MapReduce will feed it the first round of results again, a technique commonly called re-reduce.

In general your reduce functions should be oblivious to the kind of data they’re fed, they should work properly in both the normal and the re-reduce case. Two options here:

  • Make your reduce functions self-aware.
    They know what kind of data they get from a previous map phase, and what kind of data they return. But that means adding some sort of type checking to your reduce functions, which is cumbersome and, especially with JavaScript, not exactly beautiful and actually achievable. So let’s assume I never told you about this option.
  • Make sure they always return data in the format they receive it.
    This is the way you’ll usually end up going. It’s just a lot easier to spend a little bit more time thinking how to build a result that you can re-run the same function and the result will either be unchanged or just aggregated again in the same fashion as before.

Here’s an example: a function that groups by an attribute. It assumes a list containing JavaScript objects with attributes as keys and numbers as values, e.g. {attribute: 1}. It simply adds up the values for every object, so that a list containing three pairs of the aforementioned example turns into {attribute: 3}. This makes it nicely
ignorant of where the data comes from and re-reduce will simply be fed the same data structures as the initial reduce.

The second parameter to the function is an argument you can pass to the reduce phase, telling it which particular attribute you’re interested in, making the function nicely reuseable along the way.

javascript
function(values, field) {
var result = {};
for (value in values) {
for (field in values[value]) {
if (field in result) {
result[field] += values[value][field];
} else {
result[field] = values[value][field];
}
}
}
return [result];
}

Re-reduce is a weird concept to grasp, but when you just think of it as two different ways Riak will pass data to your reduce function, it’s actually not that hard. Just make sure a reduce function returns data in a way it can run on again, and you’re fine.

MapReduce is hard because debugging errors is hard

Finding out where things go wrong in a distributed MapReduce environment like Riak is hard. The only useful way of inspecting data you have is by using the ejsLog() function in JavaScript, and that means you’ll have to go look on every node in the cluster to find out what’s wrong or if the data is in the format you expect it to be. The following dumps the list of objects for a map or reduce function as JSON into the log file /tmp/mapreduce.log.

javascript
ejsLog('/tmp/mapreduce', JSON.stringify(values));

The small fix: test your MapReduce functions on a cluster with a single node before cranking it up a notch, or have a look at Sean Cribbs’ riak-qc.js framework for testing Riak MapReduce functions.

The MapReduce bottom line

MapReduce is hard, but we’re not going shopping just yet. Instead, we’re keeping a close eye on what our users are trying to achieve, improving Riak along the way. For more details on Riak’s MapReduce, check out the wiki, read a blog post on more practical MapReduce, or watch last year’s webinar on MapReduce. If you’re feeling adventurous, have a look at the examples and MapReduce code used in Riaktant. I’ll be sure to get back to you with a proper introduction on how it uses MapReduce to sift through syslog data.

Mathias

Why MapReduce is Easy

March 30, 2011

There’s something about MapReduce that makes it seem rather scary. It almost has this Big Data aura surrounding it, making it seem like it should only be used to analyze a large amount of data in a distributed fashion. It’s one of the pieces that makes Riak a pretty versatile key-value store. Feed a bunch of keys into it, and do some analytics on the objects, quite handy.

But when you narrow it down to just the basics, MapReduce is pretty simple. I’m almost 100% certain even that you’ve used it in one way or another in an application you’ve written. So before we go all distributed, let’s break MapReduce down into something small that you can use every day. That certainly has helped me understand it much better.

For our webinar on Riak and Node.js we built a little application with Node.js and Riak Search to store and search syslog messages. It’s called Riaktant and handily converts and stores syslog messages in a way that’s friendlier for both Riak Search and MapReduce. We’ll base this on examples we used in building the application.

MapReduce is easy because it works on simple data

MapReduce loves simple data structures. Why? Because when there are no deep, nested relationships between say, objects, distributing data for parallel processing is a breeze. But I’m getting a little ahead of myself.

Let’s take the data Riaktant stores in Riak and see how easy it is to sift through it without even having to go distributed. It uses a JavaScript library called glossy to parse a syslog message and turn it into this nice JSON data structure.

javascript
message = {
"originalMessage": "<35>1 2011-02-14T11:10:25.137+01:00 lb1.basho.com ftpd 7003 - Client disconnected",
"time": "2011-02-14T10:10:25.137Z",
"severityID": 3,
"facility": "auth",
"version": 1,
"prival": 35,
"host": "lb1.basho.com",
"facilityID": 4,
"message": "7003 - Client disconnected",
"severity": "err"
}

MapReduce is easy because you use it every day

I’m almost 100% certain you use MapReduce every day. If not daily, then at least once a week. Whenever you have a list of items that you loop or iterate over and transform into something else one by one, if only to extract a single attribute, there’s your map function.

Keeping with JavaScript, here’s how you’d extract the host from the above JSON, for a whole list:

“`javascript
messages = [message];

messages.map(function(message) {
return message.host
}))
“`

Or, if you insist, here’s the Ruby equivalent:

ruby
messages.map do |message|
message[:host]
end

If you must ask, here’s Python, using a list comprehension, for added functional programming sugar:

python
[message['hello'] for message in messages]

There, so simple, right? Halfway there to some full-fledged MapReduce action.

MapReduce is easy because it’s just code

Before we continue, let’s add another syslog message.

javascript
message2 = {
"originalMessage": "<35>1 2011-02-14T11:10:25.137+01:00 web2.basho.com ftpd 7003 - Client disconnected",
"time": "2011-02-14T10:12:37.137Z",
"severityID": 3,
"facility": "http",
"version": 1,
"prival": 35,
"host": "web2.basho.com",
"facilityID": 4,
"message": "7003 - Client disconnected",
"severity": "warn"
}
messages.push(message2)

We can take the above example even further (still using JavaScript), and perform some additional operations like result sorting, for example.

javascript
messages.map(function(message) {
return message.host
}).sort()

This gives us a nice sorted list of hosts. Coincidentally, sorting happens to be the second step in traditional MapReduce. Isn’t it nice how easily this is coming together?

The third and last step involves, you guessed it, more code. I don’t know about you, but I love things that involve code. Let’s reduce the list of hosts and count the occurrences of each host, (and if this reminds you of an SQL query that involves GROUP BY, you’re right on track).

“`
var reduce = function(total, host) {
if (host in total) {
total[host] += 1
} else {
total[host] = 1
}
return total
}

messages.map(function(message) {
return message.host
}).sort().reduce(reduce, {})
“`

There’s one tiny bit missing for this to be as close to MapReduce as we can get without going distributed. We need to slice up the list before we hand it to the map function. As JavaScript doesn’t have a built-in function to partition a list we’ll whip up our own real quick. After all, we’ve come this far.

function chunk(list, chunkSize) {
for(var position, i = 0, chunk = -1, chunks = []; i < list.length; i++) {
if (position = i % chunkSize) {
chunks[chunk][position] = list[i]
} else {
chunk++;
chunks[chunk] = [list[i]]
}
}
return chunks;
}

It loops through the list, splitting it up into equally sized chunks, returning them neatly wrapped in a list.

Now we can chunk the initial list of messages, and boom, we have our own little MapReduce going, without magic, just code. Let’s put the new chunk function to good use.

javascript
var mapResults = [];
chunk(messages, 2).forEach(function(chunk) {
var messages = chunk.map(function(message) {
return message.host
})
mapResults = mapResults.concat(messages)
})
mapResults.sort().reduce(reduce, {})

We split up the messages into two chunks, run the map function for each chunk, collecting the results as we go. Then we sort the results and feed them into the reduce function. That’s MapReduce in eight lines of JavaScript code. Easy, right?

That’s all there’s to MapReduce. You use it every day, whether you’re aware of it or not. It works nicely with simple data structures, and it’s just code.

Unfortunately, things get complicated as soon as you go distributed, for example in a Riak cluster. But we’ll save that for the next post, where we’ll examine why MapReduce is hard.

Mathias

Follow Up To Riak and Node.js Webinar

March 18, 2011

Thanks to all who attended Wednesday’s webinar on Riak (Search) and Node.js. If you couldn’t make it you can find a screencast of the webinar below. You can also check out the slides directly.

We hope we could give you a good idea what you can use the winning combination of Riak and Node.js for, by showing you our little syslog-emulating sample application, Riaktant. We made the source code available, so if you feel like running your own syslog replacement, go right ahead and let us know how things go. Of course you can just dig into the code and see how nicely Node.js and Riak play together too.

If you want to get some practical ideas how we utilized Riak’s MapReduce to analyze the log data, have a look at the functions used by the web interface. You can throw these right into the Node.js console and try them out yourself, since riak-js, the Node.js client for Riak, accepts JavaScript functions, so you don’t have to serialize them into a string yourself.

Thanks to Joyent for providing us with SmartMachines running Riak, and for offering No.de, their great hosting service for Node.js applications, where we deployed our little app with great ease.

Sean and Mathias

Free Webinar – Riak with Node.js – March 15 @ 2PM Eastern

March 8, 2011

JavaScript is the lingua franca of the web, and many developers are starting to use node.js to power their server-side applications. Riak is a flexible, scalable database that has a JavaScript-friendly interface, including MapReduce in JavaScript and an awesome client library called riak-js. Put the two together and you have lots of possibilities!

We invite you to join us for a free webinar on Tuesday, March 15 at 2:00PM Eastern Time (UTC-4) to talk about Riak with node.js. In this webinar, we’ll discuss:

  • Getting riak-js, the Riak client for node.js, into your application
  • Storing, retrieving, manipulating key-value data
  • Issuing MapReduce queries
  • Finding data with Riak Search
  • Testing your code with the TestServer

We’ll address the above topics in addition to looking at a sample application. The presentation will last 30 to 45 minutes, with time for questions at the end. Fill in the form below if you want to get started building node.js applications on top of Riak!

Follow Up to MapReducing Big Data With Luwak Webinar

February 18, 2011

Firstly, a big thanks goes out to everyone who attended yesterday’s MapReducing Big Data With Luwak Webinar. As promised, here is the screencast (below) from the webinar. It should be quite useful for those of you who weren’t able to attend or who would like to view the content again (it’s good enough to warrant multiple views).

If you prefer slides, there is a PDF version of the presentation available here.

If you have any questions or comments (webinar-related or otherwise), leave them below and we’ll get back to you.

Enjoy!

Bryan

 

MapReducing Big Data With Riak and Luwak from Basho Technologies on Vimeo.