Tag Archives: Luwak

Follow Up to MapReducing Big Data With Luwak Webinar

February 18, 2011

Firstly, a big thanks goes out to everyone who attended yesterday’s MapReducing Big Data With Luwak Webinar. As promised, here is the screencast (below) from the webinar. It should be quite useful for those of you who weren’t able to attend or who would like to view the content again (it’s good enough to warrant multiple views).

If you prefer slides, there is a PDF version of the presentation available here.

If you have any questions or comments (webinar-related or otherwise), leave them below and we’ll get back to you.

Enjoy!

Bryan

 

MapReducing Big Data With Riak and Luwak from Basho Technologies on Vimeo.

MapReducing Big Data With Luwak Webinar

February 14, 2011

Basho Senior Engineer Bryan Fink has been doing some exceptional work with MapReduce and Luwak, Riak’s large-object storage interface. Recently, he wrote up two extensive blog posts on the specifics of Luwak and the powerful tool it makes when combined with Riak’s MapReduce engine:

We’ve seen a huge amount of Luwak usage since its release and, since these blog posts, a large amount of interest in running MapReduce queries over data stored in Riak via Luwak. So, we thought what better way to spread the word than through a free Webinar?

This Thursday, February 17th at 2PM EST, Bryan will be leading the MapReducing Big Data With Luwak Webinar. The planned agenda is as follows:

  • Overview of Riak MapReduce and its typical usage
  • Gotchas and troubleshooting
  • Usage Recommendations and Best Practices
  • An Introduction to Luwak, Riak’s Large File Storage Interface
  • Luwak MapReduce in Action

Registration is now closed.

Hope to see you there.

The Basho Team

 

Fixing the Count

January 26, 2011

Many thanks to commenter Mike for taking up the challenge I offered in my last post. The flaw I was referring to was, indeed, the possibility that Luwak would split one of my records across two blocks.

I can check to see if Luwak has split any records with another simple map function:

“`text
(riak@127.0.0.1)2> Fun = fun(L,O,_) ->
(riak@127.0.0.1)2> D = luwak_block:data(L),
(riak@127.0.0.1)2> S = re:run(D, “^([^r]*)”,
(riak@127.0.0.1)2> [{capture, all_but_first, binary}]),
(riak@127.0.0.1)2> P = re:run(D, “n([^r]*)$”,
(riak@127.0.0.1)2> [{capture, all_but_first, binary}]),
(riak@127.0.0.1)2> [{O, S, P}]
(riak@127.0.0.1)2> end.
“`

This one will return a 3-element tuple consisting of the block offset, anything before the first carriage return, and anything after the last linefeed. Running that function via map/reduce on my data, I see that it’s not only possible for Luwak to split a record across a block boundary, it’s also extremely likely:

“`text
(riak@127.0.0.1)3> {ok, R} = C:mapred({modfun, luwak_mr, file, <<“1950s”>>},
(riak@127.0.0.1)3> [{map, {qfun, Fun}, none, true}]).

(riak@127.0.0.1)4> lists:keysort(1, R).
[{0,
{match,[<<"BAL,A,Baltimore,Orioles">>]},
{match,[<<"play,4,0,pignj101">>]}},
{1000000,
{match,[<<",00,,NP">>]},
{match,[<<"play,3,1,math">>]}},
{2000000,
{match,[<<"e101,??,,S7/G">>]},
{match,[<<"play,7,1,kue">>]}},
{3000000,
{match,[<<"nh101,??,,4/L">>]},
{match,[<<"start,walll101,"Lee Walls",1,7,">>]}},
…snip…
“`

There are play records at the ends of the first, second, and third blocks (as well as others that I cut off above). This means that Joe Pignatano, Eddie Mathews, and Harvey Kuenn are each missing a play in their batting average calculation, since my map function only gets to operate on the data in one block at a time.

Luckily, there are pretty well-known ways to fix this trouble. The rest of this post will describe two: chunk merging and fixed-length records.

Chunk Merging

If you’ve watched Guy Steel’s recent talk about parallel programming, or read through the example luwak_mr file luwak_mr_words.erl, you already know how chunk-merging works.

The basic idea behind chunk-merging is that a map function should return information about data that it didn’t know how to handle, as well as an answer for what it did know how to handle. A second processing step (a subsequent reduce function in this case) can then match up those bits of unhandled data from all of the different map evaluations, and get answers for them as well.

I’ve updated baseball.erl to do just this. The map function now uses regexes much like those earlier in this post to produce “suffix” and “prefix” results for unhandled data at the start and end of the block. The reduce function then combines these chunks and produces additional hit:at-bat results that can be summed with the normal map output.

For example, instead of the simple count tuple a map used to produce:

“`erlang
[{5, 50}]
“`

The function will now produce something like:

“`erlang
[{5, 50},
{suffix, 2000000, <<"e101,??,,S7/G">>},
{prefix, 3000000, <<"play,7,1,kue">>}]
“`

Fixed-length Records

Another way to deal with boundary-crossing records is to avoid them entirely. If every record is exactly the same length, then it’s possible to specify a block size that is an even multiple of the record length, such that record boundaries will align with block boundaries.

I’ve added baseball_flr.erl to the baseball project to demonstrate using fixed-length records. The two records needed from the “play” record for the batting average calculation are the player’s Retrosheet ID (the third field in the CSV format) and the play description (the sixth CSV field). The player ID is easy to handle: it’s already a fixed length of eight characters. The play description is, unfortunately, variable in length.

I’ve elected to solve the variable-length field problem with the time-honored solution of choosing a fixed length larger than the largest variation I have on record, and padding all smaller values out to that length. In this case, 50 bytes will handle the play descriptions for the 1950s. Another option would have been to truncate all play descriptions to the first two bytes, since that’s all the batting average calculation needs.

So, the file contents are no longer:

“`text
play,3,1,mathe101,??,,S7/G
play,7,1,kuenh101,??,,4/L
“`

but are now:

“`text
mathe101S7/G………………………………..
kuenh1014/L…………………………………
“`

(though a zero is used instead of a ‘.’ in the actual format, and there are also no line breaks).

Setting up the block size is done at load time in baseball_flr:load_events/1. The map function to calculate the batting average on this format has to change the way in which it extracts each record from the block, but the analysis of the play data remains the same, and there is no need to worry about partial records. The reduce
function is exactly the same as it was before learning about chunks (though the chunk-handling version would also work; it just wouldn’t find any chunks to merge).

Using this method does require reloading the data to get it in the proper format in Riak, but this format can have benefits beyond alleviating the boundary problem. Most notably, analyzing fixed-length records is usually much faster than analyzing variable-length, comma-separated records, since the record-splitter doesn’t have to search for the end of a record — it knows exactly where to find each one in advance.

“Fixed”

Now that I have solutions to the boundary problems, I can correctly award Harvey Kuenn’s 1950s batting average as:

“`text
(riak@127.0.0.1)8> baseball:batting_average(<<“1950s”>>, <<“kuenh101″>>).
284
(riak@127.0.0.1)9> baseball_flr:batting_average(<<“1950s_flr”>>, <<“kuenh101″>>).
284
“`

instead of the incorrect value given by the old, boundary-confused code:

“`text
(riak@127.0.0.1)7> baseball:batting_average(<<“1950s”>>, <<“kuenh101″>>).
284
“`

… wait. Did I forget to reload something? Maybe I better check the counts before division. New code:

“`text
(riak@127.0.0.1)20> C:mapred({modfun, luwak_mr, file, <<“1950s_flr”>>},
(riak@127.0.0.1)20> [{map, {modfun, baseball_flr, ba_map},
(riak@127.0.0.1)20> <<"kuenh101">>, false},
(riak@127.0.0.1)20> {reduce, {modfun, baseball_flr, ba_reduce},
(riak@127.0.0.1)20> none, true}]).
{ok,[{1231,4322}]}
“`

old code:

“`text
(riak@127.0.0.1)19> C:mapred({modfun, luwak_mr, file, <<“1950s”>>},
(riak@127.0.0.1)19> [{map, {modfun, baseball, ba_map},
(riak@127.0.0.1)19> <<"kuenh101">>, false},
(riak@127.0.0.1)19> {reduce, {modfun, baseball, ba_reduce},
(riak@127.0.0.1)19> none, true}]).
{ok,[{1231,4321}]}
“`

Aha: 1231 hits from both, but the new code found an extra at-bat — 4322 instead of 4321. The division says 0.28482 instead of 0.28488. I introduced more error by coding bad math (truncating instead of rounding) than I did by missing a record!

This result highlights a third method of dealing with record splits: ignore them. If the data you are combing through is statistically large, a single missing record will not change your answer significantly. If completely ignoring them makes you too squeemish, consider adding a simple “unknowns” counter to your calculation, so you can compute later how far off your answer might have been.

For example, instead of returning “suffix” and “prefix” information, I might have returned a simpler “unknown” count every time a block had a broken record at one of its ends (instead of a hit:at-bat tuple, a hit:at-bat:unknowns tuple). Summing these would have given me 47, if every boundary in my 48-block file broke a record. With that, I can say that if every one of those broken records was a hit for Harvey, then his batting average might have been as high as (1231+47)/(4321+47)=0.2926. Similarly, if every one of those broken records was a non-hit at-bat for Harvey, then his batting average might have been as low as 1231/(4321+47)=0.2818.

So, three options for you: recombine split records, avoid split records, or ignore split records. Do what your data needs. Happy map/reducing!

-Bryan

Baseball Batting Average, Using Riak Map/Reduce

January 20, 2011

A few days ago, I announced a tool that I assembled last weekend, called luwak_mr. That tool extends Riak’s map/reduce functionality to “Luwak” files.

But what does that mean? What can it do?

Luwak is a tree-based block-storage library for Riak. Basically, you feed Luwak a large binary, and it splits the binary into chunks, and creates a tree representing how those chunks fit together. Each chunk (or “block”) is stored as a separate value in Riak, and the tree structure is stored under whatever “filename” you give it. Among other things, this allows for much more effecient access to ranges of the binary (in comparison to storing the entire binary as one value in Riak, forcing it to be read and written in its entirety).

The luwak_mr tool allows you to easily feed a chunked Luwak file into Riak’s map/reduce system. It will do this in such a way as to provide each chunk for map processing, individually. For example, if you had a Luwak file named “foo” made of ten blocks, the following map/reduce request would evaluate the “BarFun“ function ten times (once for each block):

“`erlang
C:mapred({modfun, luwak_mr, file, <<“foo”>>},
[{map, BarFun, none, true}]).
“`

So what’s that good for?

Partitioning distributed work is the boon of Luwak+luwak_mr. If you’re using a multi-node Riak cluster, Luwak has done the work of spreading pieces of your large binary across all of your nodes. The luwak_mr tool allows you to capitalize on that distribution by using Riak’s map/reduce system to analyze those pieces, in parallel, on the nodes where the pieces are stored.

How about a more concrete example? The common one is distributed grep, but I find that a little boring and contrived. How about something more fun … like baseball statistics.
[1]
[2]

I’ll use Retrosheet’s Play-by-Play Event Files as input. Specifically, I’ll use the regular season, by decade, 1950-1959. If you’d like to follow along download “1950seve.zip” and unzip to a directory called “1950s”

If you look at one of those files, say “1950BOS.EVA”, you’ll see that each event is a line of comma-separated values. I’m interested in the “play” records for this computation. The first one in that file is on line 52:

“`text
play,1,0,rizzp101,??,,K
“`

This says that in the first inning (1), the away (0) player “Phil Rizzuto” (rizzp101), struck out (K). For the purposes of the batting average calculation, this is one at-bat, no hit.

Using grep [3], I can find all of Phil’s “plays” in the 1950s like so:

“`bash
$ grep -e play,.,.,rizzp101 *.EV*
1950BOS.EVA:play,1,0,rizzp101,??,,K
1950BOS.EVA:play,3,0,rizzp101,??,,53
1950BOS.EVA:play,5,0,rizzp101,??,,6
…snip (3224 lines total)…
“`

What I need to do is pile these plays into two categories: those that designate an “at bat,” and those that designate a “hit.” That’s easily done with some extra regular expression, and a little counting:

“`bash
$ grep -E “play,.,.,rizzp101,.*,.*,(S[0-9]|D[0-9]|T[0-9]|H([^P]|$))” *.EV* | wc -l
562
$ grep -E “play,.,.,rizzp101,.*,.*,(NP|BK|CS|DI|OA|PB|WP|PO|SB|I?W|HP|SH)” *.EV* | wc -l
728
“`

The result of the first grep is the number of hits (singles, doubles, triples, home runs) found (562). The result of the second grep is the number of non-at-bat plays (substitutions, base
steals, walks, etc.; 728); if I subtract it from the total number of plays (3224), I get the number of at-bats (2496). Phil’s batting average is 562(hits)/2456(at-bats) (x1000), or 225.

Great, so now let’s parallelize. The first thing I’ll do is get the data stored in Riak. That’s as simple as attaching to any node’s console and running this function:

“`erlang
load_events(Directory) ->
true = filelib:is_dir(Directory),
Name = iolist_to_binary(filename:basename(Directory)),
{ok, Client} = riak:local_client(),
{ok, LuwakFile} = luwak_file:create(Client, Name, dict:new()),
LuwakStream = luwak_put_stream:start_link(Client, LuwakFile, 0, 5000),
filelib:fold_files(Directory,
“.*.EV?”, %% only events files
false, %% non-recursive
fun load_events_fold/2,
LuwakStream),
luwak_put_stream:close(LuwakStream),
ok.

load_events_fold(File, LuwakStream) ->
{ok, FileData} = file:read_file(File),
luwak_put_stream:send(LuwakStream, FileData),
LuwakStream.
“`

I’ve put this code in a module named “baseball”, so running it is as simple as:

“`text
(riak@10.0.0.1) 1> baseball:load_events(“/home/bryan/baseball/1950s”).
“`

This will create one large Luwak file (approximately 48MB) named “1950s” by concatenating all 160 event files. Default Luwak settings are for 1MB blocks, so I’ll have 48 of them linked from my tree.

Mapping those blocks is quite simple. All I have to do is count the hits and at-bats for each block. The code to do so looks like this:

“`erlang
ba_map(LuwakBlock, _, PlayerId) ->
Data = luwak_block:data(LuwakBlock),
[count_at_bats(Data, PlayerId)].

count_at_bats(Data, PlayerId) ->
Re = [<<"^play,.,.,">>,PlayerId,<<",.*,.*,(.*)$">>], %”>>],
case re:run(Data, iolist_to_binary(Re),
[{capture, all_but_first, binary},
global, multiline, {newline, crlf}]) of
{match, Plays} ->
lists:foldl(fun count_at_bats_fold/2, {0,0}, Plays);
nomatch ->
{0, 0}
end.

count_at_bats_fold([Event], {Hits, AtBats}) ->
{case is_hit(Event) of
true -> Hits+1;
false -> Hits
end,
case is_at_bat(Event) of
true -> AtBats+1;
false -> AtBats
end}.

is_hit(Event) ->
match == re:run(Event,
“^(”
“S[0-9]” % single
“|D[0-9]” % double
“|T[0-9]” % triple
“|H([^P]|$)” % home run
“)”,
[{capture, none}]).

is_at_bat(Event) ->
nomatch == re:run(Event,
“^(”
“NP” % no-play
“|BK” % balk
“|CS” % caught stealing
“|DI” % defensive interference
“|OA” % base runner advance
“|PB” % passed ball
“|WP” % wild pitch
“|PO” % picked off
“|SB” % stole base
“|I?W” % walk
“|HP” % hit by pitch
“|SH” % sacrifice (but)
“)”,
[{capture, none}]).
“`

When the ba_map/3 function runs on a block, it produces a 2-element tuple. The first element of that tuple is the number of hits in the block, and the second is the number of at-bats. Combining them is even easier:

“`erlang
ba_reduce(Counts, _) ->
{HitList, AtBatList} = lists:unzip(Counts),
[{lists:sum(HitList), lists:sum(AtBatList)}].
“`

The ba_reduce/2 function expects a list of tuples produced by map function evaluations. It produces a single 2-element tuple whose first element is the sum of the first elements of all of the inputs (the total hits), and whose second; the second elements (the total at-bats).

These functions live in the same baseball module, so using them is simple:

“`erlang
Client:mapred({modfun, luwak_mr, file, Filename},
[{map, {modfun, baseball, ba_map}, PlayerID, false},
{reduce, {modfun, baseball, ba_reduce}, none, true}]),
“`

I’ve exposed that call as batting_average/2 function, so finding Phil Rizzuto’s batting average in the 1950s is as simple as typing at the Riak console:

“`text
(riak@10.0.0.1) 2> baseball:batting_average(<<“1950s”>>, <<“rizzp101″>>).
225
“`

Tada! Parallel processing power! But, you couldn’t possibly let me get away without a micro-benchmark, could you? Here’s what I saw:

Environment Time
grep [4] 0.060 + 0.081 + 0.074 = 0.215s (0.002*3 = 0.006s cached)
Riak, 1 node [5] 0.307s (0.012s cached)
Riak, 4 nodes 0.163s (0.024s cached)

All of the disclaimers about micro-benchmarks apply: disc caches play games, opening and closing files takes time, this isn’t a large enough dataset to highlight the really interesting cases, etc. But, I’m fairly certain that these numbers show two things. The first is that since the Riak times aren’t orders of magnitude off of the grep times, the Riak approach is not fundamentally flawed. The second is that since the amount of time decreases with added nodes, some parallelism is being exploited.

There is, of course, at least one flaw in this specific implementation, though, and also several ways to improve it. Anyone want to pontificate?

-Bryan

[1] Astute readers
will note that this is really distrubted grep, entangled with some
processing, but at least it gives more interesting input and
output data.

[2] Forgive me if I botch these calculations. I stopped playing baseball when the coach was still pitching. The closest I’ve come since is intramural softball. But, it’s all numbers, and I can handle numbers. Wikipedia has been my guide, for better or worse.

[3] Okay, maybe not so much astuteness is necessary.

[4] My method for aquiring the grep stat was, admitedly, lame. Specifically, I ran the grep commands as given above, using the “time” utility (i.e. “time grep -E …”). I added the time for the three runs together, and estimated the time for the final sum and division at 0.

[5] Timing the map/reduce was simple, by using timer:tc(baseball, batting_average, [<<"1950s">>, <<"rizzp101">>])..