Tag Archives: Riak

Why MapReduce is Easy

March 30, 2011

There’s something about MapReduce that makes it seem rather scary. It almost has this Big Data aura surrounding it, making it seem like it should only be used to analyze a large amount of data in a distributed fashion. It’s one of the pieces that makes Riak a pretty versatile key-value store. Feed a bunch of keys into it, and do some analytics on the objects, quite handy.

But when you narrow it down to just the basics, MapReduce is pretty simple. I’m almost 100% certain even that you’ve used it in one way or another in an application you’ve written. So before we go all distributed, let’s break MapReduce down into something small that you can use every day. That certainly has helped me understand it much better.

For our webinar on Riak and Node.js we built a little application with Node.js and Riak Search to store and search syslog messages. It’s called Riaktant and handily converts and stores syslog messages in a way that’s friendlier for both Riak Search and MapReduce. We’ll base this on examples we used in building the application.

MapReduce is easy because it works on simple data

MapReduce loves simple data structures. Why? Because when there are no deep, nested relationships between say, objects, distributing data for parallel processing is a breeze. But I’m getting a little ahead of myself.

Let’s take the data Riaktant stores in Riak and see how easy it is to sift through it without even having to go distributed. It uses a JavaScript library called glossy to parse a syslog message and turn it into this nice JSON data structure.

javascript
message = {
"originalMessage": "<35>1 2011-02-14T11:10:25.137+01:00 lb1.basho.com ftpd 7003 - Client disconnected",
"time": "2011-02-14T10:10:25.137Z",
"severityID": 3,
"facility": "auth",
"version": 1,
"prival": 35,
"host": "lb1.basho.com",
"facilityID": 4,
"message": "7003 - Client disconnected",
"severity": "err"
}

MapReduce is easy because you use it every day

I’m almost 100% certain you use MapReduce every day. If not daily, then at least once a week. Whenever you have a list of items that you loop or iterate over and transform into something else one by one, if only to extract a single attribute, there’s your map function.

Keeping with JavaScript, here’s how you’d extract the host from the above JSON, for a whole list:

“`javascript
messages = [message];

messages.map(function(message) {
return message.host
}))
“`

Or, if you insist, here’s the Ruby equivalent:

ruby
messages.map do |message|
message[:host]
end

If you must ask, here’s Python, using a list comprehension, for added functional programming sugar:

python
[message['hello'] for message in messages]

There, so simple, right? Halfway there to some full-fledged MapReduce action.

MapReduce is easy because it’s just code

Before we continue, let’s add another syslog message.

javascript
message2 = {
"originalMessage": "<35>1 2011-02-14T11:10:25.137+01:00 web2.basho.com ftpd 7003 - Client disconnected",
"time": "2011-02-14T10:12:37.137Z",
"severityID": 3,
"facility": "http",
"version": 1,
"prival": 35,
"host": "web2.basho.com",
"facilityID": 4,
"message": "7003 - Client disconnected",
"severity": "warn"
}
messages.push(message2)

We can take the above example even further (still using JavaScript), and perform some additional operations like result sorting, for example.

javascript
messages.map(function(message) {
return message.host
}).sort()

This gives us a nice sorted list of hosts. Coincidentally, sorting happens to be the second step in traditional MapReduce. Isn’t it nice how easily this is coming together?

The third and last step involves, you guessed it, more code. I don’t know about you, but I love things that involve code. Let’s reduce the list of hosts and count the occurrences of each host, (and if this reminds you of an SQL query that involves GROUP BY, you’re right on track).

“`
var reduce = function(total, host) {
if (host in total) {
total[host] += 1
} else {
total[host] = 1
}
return total
}

messages.map(function(message) {
return message.host
}).sort().reduce(reduce, {})
“`

There’s one tiny bit missing for this to be as close to MapReduce as we can get without going distributed. We need to slice up the list before we hand it to the map function. As JavaScript doesn’t have a built-in function to partition a list we’ll whip up our own real quick. After all, we’ve come this far.

function chunk(list, chunkSize) {
for(var position, i = 0, chunk = -1, chunks = []; i < list.length; i++) {
if (position = i % chunkSize) {
chunks[chunk][position] = list[i]
} else {
chunk++;
chunks[chunk] = [list[i]]
}
}
return chunks;
}

It loops through the list, splitting it up into equally sized chunks, returning them neatly wrapped in a list.

Now we can chunk the initial list of messages, and boom, we have our own little MapReduce going, without magic, just code. Let’s put the new chunk function to good use.

javascript
var mapResults = [];
chunk(messages, 2).forEach(function(chunk) {
var messages = chunk.map(function(message) {
return message.host
})
mapResults = mapResults.concat(messages)
})
mapResults.sort().reduce(reduce, {})

We split up the messages into two chunks, run the map function for each chunk, collecting the results as we go. Then we sort the results and feed them into the reduce function. That’s MapReduce in eight lines of JavaScript code. Easy, right?

That’s all there’s to MapReduce. You use it every day, whether you’re aware of it or not. It works nicely with simple data structures, and it’s just code.

Unfortunately, things get complicated as soon as you go distributed, for example in a Riak cluster. But we’ll save that for the next post, where we’ll examine why MapReduce is hard.

Mathias

Riak and Scala at Yammer

March 28, 2011

What’s the best way to start off the week? With an awesome presentation from some very talented engineers about building a Riak-backed service.

This video, which runs about 40 minutes, was recorded last week at the San Francisco Riak Meetup and is worth every minute of your time. Coda Hale and Ryan Kennedy of Yammer give an excellent and in depth look into how they built “Streamie”, why Riak was the right choice, and the lessons learned in the process.

Also:

* The PDF of the slide presentation can be viewed here
* Around the five minute mark Coda references a paper called “The Declarative Imperative: Experiences and Conjectures in Distributed Logic.”
* If you are interested in talking about your Riak usage, get in touch with mark@basho.com and we’ll get the word out.

Enjoy.

Mark

There weren’t too many questions asked at the end of the presentation so we decided to cut them out of the recording in the interest of time. Apologies for this. Here they are:

  • What local storage backend are you using? Bitcask.
  • How many keys are you currently storing? Around 5 million.
  • What is the average value size? Under 10K
  • Can you share your hardware specs? Bare-metal, standard servers: 8-core, 16GB RAM, SATA drives.

Riak and Scala at Yammer from Basho Technologies on Vimeo.

Follow Up To Riak and Node.js Webinar

March 18, 2011

Thanks to all who attended Wednesday’s webinar on Riak (Search) and Node.js. If you couldn’t make it you can find a screencast of the webinar below. You can also check out the slides directly.

We hope we could give you a good idea what you can use the winning combination of Riak and Node.js for, by showing you our little syslog-emulating sample application, Riaktant. We made the source code available, so if you feel like running your own syslog replacement, go right ahead and let us know how things go. Of course you can just dig into the code and see how nicely Node.js and Riak play together too.

If you want to get some practical ideas how we utilized Riak’s MapReduce to analyze the log data, have a look at the functions used by the web interface. You can throw these right into the Node.js console and try them out yourself, since riak-js, the Node.js client for Riak, accepts JavaScript functions, so you don’t have to serialize them into a string yourself.

Thanks to Joyent for providing us with SmartMachines running Riak, and for offering No.de, their great hosting service for Node.js applications, where we deployed our little app with great ease.

Sean and Mathias

Free Webinar – Riak with Node.js – March 15 @ 2PM Eastern

March 8, 2011

JavaScript is the lingua franca of the web, and many developers are starting to use node.js to power their server-side applications. Riak is a flexible, scalable database that has a JavaScript-friendly interface, including MapReduce in JavaScript and an awesome client library called riak-js. Put the two together and you have lots of possibilities!

We invite you to join us for a free webinar on Tuesday, March 15 at 2:00PM Eastern Time (UTC-4) to talk about Riak with node.js. In this webinar, we’ll discuss:

  • Getting riak-js, the Riak client for node.js, into your application
  • Storing, retrieving, manipulating key-value data
  • Issuing MapReduce queries
  • Finding data with Riak Search
  • Testing your code with the TestServer

We’ll address the above topics in addition to looking at a sample application. The presentation will last 30 to 45 minutes, with time for questions at the end. Fill in the form below if you want to get started building node.js applications on top of Riak!

KillDashNine March Happening on Wednesday

March 5, 2011

In February we kicked off the KillDashNine drinkup. It was a huge success (turns out we aren’t the only ones who care about durability) and, as promised, we’ll be having another drinkup this month. On Wednesday, 3/9, we will be clinking glasses and sharing data loss horror stories at Bloodhound, located at 1145 Folsom Street here in San Francisco.

This month’s chosen cocktail is the *Data Eraser*, and it’s simple to make: 2 oz Vodka, 2 Oz Coffee Liqueur, 2 oz Tonic, and a dash of bitter frustration, anguish, and confusion (which is more or less how one feels when their data just disappears). And if you can’t make it, be sure to pour yourself a Data Eraser on 3/9 to take part in the festivities from wherever you happen to find yourself (or you can run your own local KillDashNine like Marten Gustafson did in Stockholm last month.

Registration details for the event are here, so be sure to RSVP if you’re planning to join us. In the mean time, spin up a few nodes of your favorite database and try your hand at terminating some processes with the help of our favorite command: _kill-9_.

Long Live Durability!

Basho

 

Announcing KevBurnsJr as a PHP Client Committer

February 28, 2011

We just added Kevin Burns, who goes by KevBurnsJr on Github, Twitter and the #riak IRC room on irc.freenode.net, as a committer to the Basho-supported Riak PHP Client.

Kevin has been hard at work over the past few weeks adding some great functionality to the PHP client and has even kicked off porting Ripple, Basho’s Ruby Client ODM, to PHP. Suffice it to say that we at Basho are excited about Kevin’s participation and involvement with Riak and our PHP code.

Some relevant code:

* Riak’s PHP Client
* Port of Ripple to PHP

Thank, Kev! We are looking forward to your contributions.

Mark

MapReducing Big Data With Luwak Webinar

February 14, 2011

Basho Senior Engineer Bryan Fink has been doing some exceptional work with MapReduce and Luwak, Riak’s large-object storage interface. Recently, he wrote up two extensive blog posts on the specifics of Luwak and the powerful tool it makes when combined with Riak’s MapReduce engine:

We’ve seen a huge amount of Luwak usage since its release and, since these blog posts, a large amount of interest in running MapReduce queries over data stored in Riak via Luwak. So, we thought what better way to spread the word than through a free Webinar?

This Thursday, February 17th at 2PM EST, Bryan will be leading the MapReducing Big Data With Luwak Webinar. The planned agenda is as follows:

  • Overview of Riak MapReduce and its typical usage
  • Gotchas and troubleshooting
  • Usage Recommendations and Best Practices
  • An Introduction to Luwak, Riak’s Large File Storage Interface
  • Luwak MapReduce in Action

Registration is now closed.

Hope to see you there.

The Basho Team

 

Creating a Local Riak Cluster with Vagrant and Chef

February 4, 2011

The “Riak Fast Track” has been around for at least nine months now, and lots of developers have gotten to know Riak that way, building their own local clusters from the Riak source. But there’s always been something that has bothered me about that process, namely, that the developer has to build Riak herself. Basho provides pre-built packages on downloads.basho.com for several Linux distributions, Solaris, and Mac OS/X, but these have the limitation of only letting you run one node on a machine.

I’ve been a long-time fan of Chef the systems and configuration management tool by Opscode, especially for the wealth of community recipes and vibrant participation. It’s also incredibly easy to get started with small Chef deployments with Opscode’s Platform, which is free for up to 5 managed machines.

Anyway, as part of updating Riak’s Chef recipe last month to work with the 0.14.0 release, I discovered the easiest way to test the recipe — without incurring the costs of Amazon EC2 — was to deploy local virtual machines with Vagrant. So this blog post will be a tutorial on how to create your own local 3-node Riak cluster with Chef and Vagrant, suitable for doing the rest of the Fast Track.

Before we start, I’d like to thank Joshua Timberman and Seth Chisamore from Opscode who helped me immensely in preparing this.

Step 1: Install VirtualBox

Under the covers, Vagrant uses VirtualBox, which is a free virtualization product, originally created at Sun. Go ahead and download and install the version appropriate for your platform:

Step 2: Install Vagrant and Chef

Now that we have VirtualBox installed, let’s get Vagrant and Chef. You’ll need Ruby and Rubygems installed for this. Mac OS/X comes with these pre-installed, but they’re easy to get on most platforms.

Now that you’ve got them both installed, you need to get a virtual machine image to run Riak from. Luckily, Opscode “has provided some images for us that have the 0.9.12 Chef gems preinstalled. Download the Ubuntu 10.04 image and add it to your local collection:

Step 3: Configure Local Chef

Head on over to Opscode and sign up for a free Platform account if you haven’t already. This gives you access to the cookbooks site as well as the Chef admin UI. Make sure to collect your “knife config” and “validation key” from the “Organizations” page of the admin UI, and your personal “private key” from your profile page. These help you connect your local working space to the server.

Now let’s get our Chef workspace set up. You need a directory that has specific files and subdirectories in it, also known as a “Chef repository”. Again Opscode has made this easy on us, we can just clone their skeleton repository:

Now let’s put the canonical Opscode cookbooks (including the Riak one) in our repository:

Finally, put the Platform credentials we downloaded above inside the repository (the .pem files will be named differently for you):

Step 4: Configure Chef Server

Now we’re going to prep the Chef Server (provided by Opscode Platform) to serve out the recipes needed by our local cluster nodes. The first step is to upload the two cookbooks we need using the *knife* command-line tool, shown in the snippet below the next paragraph. I’ve left out the output since it can get long.

Then we’ll create a “role” — essentially a collection of recipes and attributes — that will represent our local cluster nodes, and call it “riak-vagrant”. Using knife role create will open your configured EDITOR (mine happens to be emacs) with the JSON representation of the role. The role will be posted to the Chef server when you save and close your editor.

The key things to note about what we’re editing in the role below are the “run list” and the “override attributes” sections. The “run list” tells what recipes to execute on a machine that receives the role. We configure iptables to run with Riak, and of course the relevant Riak recipes. The “override attributes” change default settings that come with the cookbooks. I’ve put explanations inline, but to summarize, we want to bind Riak to all network interfaces, and put it in a cluster named “vagrant” which will be used by the “riak::autoconf” recipe to automatically join our nodes together.

Step 5: Setup Vagrant VM

Now that we’re ready on the Chef side of things, let’s get Vagrant going. Make three directories inside your Chef repository called dev1, dev2, and dev3, just like from the Fast Track. Change directory inside dev and run vagrant init. This will create a Vagrantfile which you should edit to look like this one (explanations inline again):

Remember: change any place where it says ORGNAME to match your Opscode Platform organization.

Step 6: Start up dev1 Now we’re ready to see if all our preparation has paid off:

If you see lines at the end of the output like the ones above, it worked! If it doesn’t work the first time, try running vagrant provision from the command line to invoke Chef again. Let’s see if our Riak node is functional:

Awesome!

Step 7: Repeat with dev2, dev3

Now let’s get the other nodes set up. Since we’ve done the hard parts already, we just need to copy the Vagrantfile from dev1/ into the other two directories and modify them slightly.

The easiest way to describe the modifications is in a table:

| Line | dev2 | dev3 | Explanation |
| 7 | “33.33.33.12″ | “33.33.33.13″ | Unique IP addresses |
| 11 (last number) | 8092 | 8093 | HTTP port forwarding |
| 12 (last number) | 8082 | 8083 | PBC port forwarding |
| 40 | “riak-fast-track-2″ | “riak-fast-track-3″ | Unique chef node name |
| 48 | “riak@33.33.33.12″ | “riak@33.33.33.13″ | Unique Riak node name |

With those modified, start up dev2 (run vagrant up inside dev2/) and watch it connect to the cluster automatically. Then repeat with dev3 and enjoy your local Riak cluster!

Conclusions

Beyond just being a demonstration of cool technology like Chef and Vagrant, you’ve now got a developer setup that is isolated and reproducible. If one of the VMs gets too messed up, you can easily recreate the whole cluster. It’s also easy to get new developers in your organization started using Riak since all they have to do is boot up some virtual machines that automatically configure themselves. This Chef configuration, slightly modified, could later be used to launch staging and production clusters on other hardware (including cloud providers). All in all, it’s a great tool to have in your toolbelt.

Sean

Fixing the Count

January 26, 2011

Many thanks to commenter Mike for taking up the challenge I offered in my last post. The flaw I was referring to was, indeed, the possibility that Luwak would split one of my records across two blocks.

I can check to see if Luwak has split any records with another simple map function:

“`text
(riak@127.0.0.1)2> Fun = fun(L,O,_) ->
(riak@127.0.0.1)2> D = luwak_block:data(L),
(riak@127.0.0.1)2> S = re:run(D, “^([^r]*)”,
(riak@127.0.0.1)2> [{capture, all_but_first, binary}]),
(riak@127.0.0.1)2> P = re:run(D, “n([^r]*)$”,
(riak@127.0.0.1)2> [{capture, all_but_first, binary}]),
(riak@127.0.0.1)2> [{O, S, P}]
(riak@127.0.0.1)2> end.
“`

This one will return a 3-element tuple consisting of the block offset, anything before the first carriage return, and anything after the last linefeed. Running that function via map/reduce on my data, I see that it’s not only possible for Luwak to split a record across a block boundary, it’s also extremely likely:

“`text
(riak@127.0.0.1)3> {ok, R} = C:mapred({modfun, luwak_mr, file, <<”1950s”>>},
(riak@127.0.0.1)3> [{map, {qfun, Fun}, none, true}]).

(riak@127.0.0.1)4> lists:keysort(1, R).
[{0,
{match,[<<"BAL,A,Baltimore,Orioles">>]},
{match,[<<"play,4,0,pignj101">>]}},
{1000000,
{match,[<<",00,,NP">>]},
{match,[<<"play,3,1,math">>]}},
{2000000,
{match,[<<"e101,??,,S7/G">>]},
{match,[<<"play,7,1,kue">>]}},
{3000000,
{match,[<<"nh101,??,,4/L">>]},
{match,[<<"start,walll101,"Lee Walls",1,7,">>]}},
…snip…
“`

There are play records at the ends of the first, second, and third blocks (as well as others that I cut off above). This means that Joe Pignatano, Eddie Mathews, and Harvey Kuenn are each missing a play in their batting average calculation, since my map function only gets to operate on the data in one block at a time.

Luckily, there are pretty well-known ways to fix this trouble. The rest of this post will describe two: chunk merging and fixed-length records.

Chunk Merging

If you’ve watched Guy Steel’s recent talk about parallel programming, or read through the example luwak_mr file luwak_mr_words.erl, you already know how chunk-merging works.

The basic idea behind chunk-merging is that a map function should return information about data that it didn’t know how to handle, as well as an answer for what it did know how to handle. A second processing step (a subsequent reduce function in this case) can then match up those bits of unhandled data from all of the different map evaluations, and get answers for them as well.

I’ve updated baseball.erl to do just this. The map function now uses regexes much like those earlier in this post to produce “suffix” and “prefix” results for unhandled data at the start and end of the block. The reduce function then combines these chunks and produces additional hit:at-bat results that can be summed with the normal map output.

For example, instead of the simple count tuple a map used to produce:

“`erlang
[{5, 50}]
“`

The function will now produce something like:

“`erlang
[{5, 50},
{suffix, 2000000, <<"e101,??,,S7/G">>},
{prefix, 3000000, <<"play,7,1,kue">>}]
“`

Fixed-length Records

Another way to deal with boundary-crossing records is to avoid them entirely. If every record is exactly the same length, then it’s possible to specify a block size that is an even multiple of the record length, such that record boundaries will align with block boundaries.

I’ve added baseball_flr.erl to the baseball project to demonstrate using fixed-length records. The two records needed from the “play” record for the batting average calculation are the player’s Retrosheet ID (the third field in the CSV format) and the play description (the sixth CSV field). The player ID is easy to handle: it’s already a fixed length of eight characters. The play description is, unfortunately, variable in length.

I’ve elected to solve the variable-length field problem with the time-honored solution of choosing a fixed length larger than the largest variation I have on record, and padding all smaller values out to that length. In this case, 50 bytes will handle the play descriptions for the 1950s. Another option would have been to truncate all play descriptions to the first two bytes, since that’s all the batting average calculation needs.

So, the file contents are no longer:

“`text
play,3,1,mathe101,??,,S7/G
play,7,1,kuenh101,??,,4/L
“`

but are now:

“`text
mathe101S7/G………………………………..
kuenh1014/L…………………………………
“`

(though a zero is used instead of a ‘.’ in the actual format, and there are also no line breaks).

Setting up the block size is done at load time in baseball_flr:load_events/1. The map function to calculate the batting average on this format has to change the way in which it extracts each record from the block, but the analysis of the play data remains the same, and there is no need to worry about partial records. The reduce
function is exactly the same as it was before learning about chunks (though the chunk-handling version would also work; it just wouldn’t find any chunks to merge).

Using this method does require reloading the data to get it in the proper format in Riak, but this format can have benefits beyond alleviating the boundary problem. Most notably, analyzing fixed-length records is usually much faster than analyzing variable-length, comma-separated records, since the record-splitter doesn’t have to search for the end of a record — it knows exactly where to find each one in advance.

“Fixed”

Now that I have solutions to the boundary problems, I can correctly award Harvey Kuenn’s 1950s batting average as:

“`text
(riak@127.0.0.1)8> baseball:batting_average(<<”1950s”>>, <<”kuenh101″>>).
284
(riak@127.0.0.1)9> baseball_flr:batting_average(<<”1950s_flr”>>, <<”kuenh101″>>).
284
“`

instead of the incorrect value given by the old, boundary-confused code:

“`text
(riak@127.0.0.1)7> baseball:batting_average(<<”1950s”>>, <<”kuenh101″>>).
284
“`

… wait. Did I forget to reload something? Maybe I better check the counts before division. New code:

“`text
(riak@127.0.0.1)20> C:mapred({modfun, luwak_mr, file, <<”1950s_flr”>>},
(riak@127.0.0.1)20> [{map, {modfun, baseball_flr, ba_map},
(riak@127.0.0.1)20> <<"kuenh101">>, false},
(riak@127.0.0.1)20> {reduce, {modfun, baseball_flr, ba_reduce},
(riak@127.0.0.1)20> none, true}]).
{ok,[{1231,4322}]}
“`

old code:

“`text
(riak@127.0.0.1)19> C:mapred({modfun, luwak_mr, file, <<”1950s”>>},
(riak@127.0.0.1)19> [{map, {modfun, baseball, ba_map},
(riak@127.0.0.1)19> <<"kuenh101">>, false},
(riak@127.0.0.1)19> {reduce, {modfun, baseball, ba_reduce},
(riak@127.0.0.1)19> none, true}]).
{ok,[{1231,4321}]}
“`

Aha: 1231 hits from both, but the new code found an extra at-bat — 4322 instead of 4321. The division says 0.28482 instead of 0.28488. I introduced more error by coding bad math (truncating instead of rounding) than I did by missing a record!

This result highlights a third method of dealing with record splits: ignore them. If the data you are combing through is statistically large, a single missing record will not change your answer significantly. If completely ignoring them makes you too squeemish, consider adding a simple “unknowns” counter to your calculation, so you can compute later how far off your answer might have been.

For example, instead of returning “suffix” and “prefix” information, I might have returned a simpler “unknown” count every time a block had a broken record at one of its ends (instead of a hit:at-bat tuple, a hit:at-bat:unknowns tuple). Summing these would have given me 47, if every boundary in my 48-block file broke a record. With that, I can say that if every one of those broken records was a hit for Harvey, then his batting average might have been as high as (1231+47)/(4321+47)=0.2926. Similarly, if every one of those broken records was a non-hit at-bat for Harvey, then his batting average might have been as low as 1231/(4321+47)=0.2818.

So, three options for you: recombine split records, avoid split records, or ignore split records. Do what your data needs. Happy map/reducing!

-Bryan

Baseball Batting Average, Using Riak Map/Reduce

January 20, 2011

A few days ago, I announced a tool that I assembled last weekend, called luwak_mr. That tool extends Riak’s map/reduce functionality to “Luwak” files.

But what does that mean? What can it do?

Luwak is a tree-based block-storage library for Riak. Basically, you feed Luwak a large binary, and it splits the binary into chunks, and creates a tree representing how those chunks fit together. Each chunk (or “block”) is stored as a separate value in Riak, and the tree structure is stored under whatever “filename” you give it. Among other things, this allows for much more effecient access to ranges of the binary (in comparison to storing the entire binary as one value in Riak, forcing it to be read and written in its entirety).

The luwak_mr tool allows you to easily feed a chunked Luwak file into Riak’s map/reduce system. It will do this in such a way as to provide each chunk for map processing, individually. For example, if you had a Luwak file named “foo” made of ten blocks, the following map/reduce request would evaluate the “BarFun“ function ten times (once for each block):

“`erlang
C:mapred({modfun, luwak_mr, file, <<”foo”>>},
[{map, BarFun, none, true}]).
“`

So what’s that good for?

Partitioning distributed work is the boon of Luwak+luwak_mr. If you’re using a multi-node Riak cluster, Luwak has done the work of spreading pieces of your large binary across all of your nodes. The luwak_mr tool allows you to capitalize on that distribution by using Riak’s map/reduce system to analyze those pieces, in parallel, on the nodes where the pieces are stored.

How about a more concrete example? The common one is distributed grep, but I find that a little boring and contrived. How about something more fun … like baseball statistics.
[1]
[2]

I’ll use Retrosheet’s Play-by-Play Event Files as input. Specifically, I’ll use the regular season, by decade, 1950-1959. If you’d like to follow along download “1950seve.zip” and unzip to a directory called “1950s”

If you look at one of those files, say “1950BOS.EVA”, you’ll see that each event is a line of comma-separated values. I’m interested in the “play” records for this computation. The first one in that file is on line 52:

“`text
play,1,0,rizzp101,??,,K
“`

This says that in the first inning (1), the away (0) player “Phil Rizzuto” (rizzp101), struck out (K). For the purposes of the batting average calculation, this is one at-bat, no hit.

Using grep [3], I can find all of Phil’s “plays” in the 1950s like so:

“`bash
$ grep -e play,.,.,rizzp101 *.EV*
1950BOS.EVA:play,1,0,rizzp101,??,,K
1950BOS.EVA:play,3,0,rizzp101,??,,53
1950BOS.EVA:play,5,0,rizzp101,??,,6
…snip (3224 lines total)…
“`

What I need to do is pile these plays into two categories: those that designate an “at bat,” and those that designate a “hit.” That’s easily done with some extra regular expression, and a little counting:

“`bash
$ grep -E “play,.,.,rizzp101,.*,.*,(S[0-9]|D[0-9]|T[0-9]|H([^P]|$))” *.EV* | wc -l
562
$ grep -E “play,.,.,rizzp101,.*,.*,(NP|BK|CS|DI|OA|PB|WP|PO|SB|I?W|HP|SH)” *.EV* | wc -l
728
“`

The result of the first grep is the number of hits (singles, doubles, triples, home runs) found (562). The result of the second grep is the number of non-at-bat plays (substitutions, base
steals, walks, etc.; 728); if I subtract it from the total number of plays (3224), I get the number of at-bats (2496). Phil’s batting average is 562(hits)/2456(at-bats) (x1000), or 225.

Great, so now let’s parallelize. The first thing I’ll do is get the data stored in Riak. That’s as simple as attaching to any node’s console and running this function:

“`erlang
load_events(Directory) ->
true = filelib:is_dir(Directory),
Name = iolist_to_binary(filename:basename(Directory)),
{ok, Client} = riak:local_client(),
{ok, LuwakFile} = luwak_file:create(Client, Name, dict:new()),
LuwakStream = luwak_put_stream:start_link(Client, LuwakFile, 0, 5000),
filelib:fold_files(Directory,
“.*.EV?”, %% only events files
false, %% non-recursive
fun load_events_fold/2,
LuwakStream),
luwak_put_stream:close(LuwakStream),
ok.

load_events_fold(File, LuwakStream) ->
{ok, FileData} = file:read_file(File),
luwak_put_stream:send(LuwakStream, FileData),
LuwakStream.
“`

I’ve put this code in a module named “baseball”, so running it is as simple as:

“`text
(riak@10.0.0.1) 1> baseball:load_events(“/home/bryan/baseball/1950s”).
“`

This will create one large Luwak file (approximately 48MB) named “1950s” by concatenating all 160 event files. Default Luwak settings are for 1MB blocks, so I’ll have 48 of them linked from my tree.

Mapping those blocks is quite simple. All I have to do is count the hits and at-bats for each block. The code to do so looks like this:

“`erlang
ba_map(LuwakBlock, _, PlayerId) ->
Data = luwak_block:data(LuwakBlock),
[count_at_bats(Data, PlayerId)].

count_at_bats(Data, PlayerId) ->
Re = [<<"^play,.,.,">>,PlayerId,<<",.*,.*,(.*)$">>], %”>>],
case re:run(Data, iolist_to_binary(Re),
[{capture, all_but_first, binary},
global, multiline, {newline, crlf}]) of
{match, Plays} ->
lists:foldl(fun count_at_bats_fold/2, {0,0}, Plays);
nomatch ->
{0, 0}
end.

count_at_bats_fold([Event], {Hits, AtBats}) ->
{case is_hit(Event) of
true -> Hits+1;
false -> Hits
end,
case is_at_bat(Event) of
true -> AtBats+1;
false -> AtBats
end}.

is_hit(Event) ->
match == re:run(Event,
“^(”
“S[0-9]” % single
“|D[0-9]” % double
“|T[0-9]” % triple
“|H([^P]|$)” % home run
“)”,
[{capture, none}]).

is_at_bat(Event) ->
nomatch == re:run(Event,
“^(”
“NP” % no-play
“|BK” % balk
“|CS” % caught stealing
“|DI” % defensive interference
“|OA” % base runner advance
“|PB” % passed ball
“|WP” % wild pitch
“|PO” % picked off
“|SB” % stole base
“|I?W” % walk
“|HP” % hit by pitch
“|SH” % sacrifice (but)
“)”,
[{capture, none}]).
“`

When the ba_map/3 function runs on a block, it produces a 2-element tuple. The first element of that tuple is the number of hits in the block, and the second is the number of at-bats. Combining them is even easier:

“`erlang
ba_reduce(Counts, _) ->
{HitList, AtBatList} = lists:unzip(Counts),
[{lists:sum(HitList), lists:sum(AtBatList)}].
“`

The ba_reduce/2 function expects a list of tuples produced by map function evaluations. It produces a single 2-element tuple whose first element is the sum of the first elements of all of the inputs (the total hits), and whose second; the second elements (the total at-bats).

These functions live in the same baseball module, so using them is simple:

“`erlang
Client:mapred({modfun, luwak_mr, file, Filename},
[{map, {modfun, baseball, ba_map}, PlayerID, false},
{reduce, {modfun, baseball, ba_reduce}, none, true}]),
“`

I’ve exposed that call as batting_average/2 function, so finding Phil Rizzuto’s batting average in the 1950s is as simple as typing at the Riak console:

“`text
(riak@10.0.0.1) 2> baseball:batting_average(<<”1950s”>>, <<”rizzp101″>>).
225
“`

Tada! Parallel processing power! But, you couldn’t possibly let me get away without a micro-benchmark, could you? Here’s what I saw:

Environment Time
grep [4] 0.060 + 0.081 + 0.074 = 0.215s (0.002*3 = 0.006s cached)
Riak, 1 node [5] 0.307s (0.012s cached)
Riak, 4 nodes 0.163s (0.024s cached)

All of the disclaimers about micro-benchmarks apply: disc caches play games, opening and closing files takes time, this isn’t a large enough dataset to highlight the really interesting cases, etc. But, I’m fairly certain that these numbers show two things. The first is that since the Riak times aren’t orders of magnitude off of the grep times, the Riak approach is not fundamentally flawed. The second is that since the amount of time decreases with added nodes, some parallelism is being exploited.

There is, of course, at least one flaw in this specific implementation, though, and also several ways to improve it. Anyone want to pontificate?

-Bryan

[1] Astute readers
will note that this is really distrubted grep, entangled with some
processing, but at least it gives more interesting input and
output data.

[2] Forgive me if I botch these calculations. I stopped playing baseball when the coach was still pitching. The closest I’ve come since is intramural softball. But, it’s all numbers, and I can handle numbers. Wikipedia has been my guide, for better or worse.

[3] Okay, maybe not so much astuteness is necessary.

[4] My method for aquiring the grep stat was, admitedly, lame. Specifically, I ran the grep commands as given above, using the “time” utility (i.e. “time grep -E …”). I added the time for the three runs together, and estimated the time for the final sum and division at 0.

[5] Timing the map/reduce was simple, by using timer:tc(baseball, batting_average, [<<"1950s">>, <<"rizzp101">>])..