Tag Archives: Riak

Baseball Batting Average, Using Riak Map/Reduce

January 20, 2011

A few days ago, I announced a tool that I assembled last weekend, called luwak_mr. That tool extends Riak’s map/reduce functionality to “Luwak” files.

But what does that mean? What can it do?

Luwak is a tree-based block-storage library for Riak. Basically, you feed Luwak a large binary, and it splits the binary into chunks, and creates a tree representing how those chunks fit together. Each chunk (or “block”) is stored as a separate value in Riak, and the tree structure is stored under whatever “filename” you give it. Among other things, this allows for much more effecient access to ranges of the binary (in comparison to storing the entire binary as one value in Riak, forcing it to be read and written in its entirety).

The luwak_mr tool allows you to easily feed a chunked Luwak file into Riak’s map/reduce system. It will do this in such a way as to provide each chunk for map processing, individually. For example, if you had a Luwak file named “foo” made of ten blocks, the following map/reduce request would evaluate the “BarFun“ function ten times (once for each block):

“`erlang
C:mapred({modfun, luwak_mr, file, <<”foo”>>},
[{map, BarFun, none, true}]).
“`

So what’s that good for?

Partitioning distributed work is the boon of Luwak+luwak_mr. If you’re using a multi-node Riak cluster, Luwak has done the work of spreading pieces of your large binary across all of your nodes. The luwak_mr tool allows you to capitalize on that distribution by using Riak’s map/reduce system to analyze those pieces, in parallel, on the nodes where the pieces are stored.

How about a more concrete example? The common one is distributed grep, but I find that a little boring and contrived. How about something more fun … like baseball statistics.
[1]
[2]

I’ll use Retrosheet’s Play-by-Play Event Files as input. Specifically, I’ll use the regular season, by decade, 1950-1959. If you’d like to follow along download “1950seve.zip” and unzip to a directory called “1950s”

If you look at one of those files, say “1950BOS.EVA”, you’ll see that each event is a line of comma-separated values. I’m interested in the “play” records for this computation. The first one in that file is on line 52:

“`text
play,1,0,rizzp101,??,,K
“`

This says that in the first inning (1), the away (0) player “Phil Rizzuto” (rizzp101), struck out (K). For the purposes of the batting average calculation, this is one at-bat, no hit.

Using grep [3], I can find all of Phil’s “plays” in the 1950s like so:

“`bash
$ grep -e play,.,.,rizzp101 *.EV*
1950BOS.EVA:play,1,0,rizzp101,??,,K
1950BOS.EVA:play,3,0,rizzp101,??,,53
1950BOS.EVA:play,5,0,rizzp101,??,,6
…snip (3224 lines total)…
“`

What I need to do is pile these plays into two categories: those that designate an “at bat,” and those that designate a “hit.” That’s easily done with some extra regular expression, and a little counting:

“`bash
$ grep -E “play,.,.,rizzp101,.*,.*,(S[0-9]|D[0-9]|T[0-9]|H([^P]|$))” *.EV* | wc -l
562
$ grep -E “play,.,.,rizzp101,.*,.*,(NP|BK|CS|DI|OA|PB|WP|PO|SB|I?W|HP|SH)” *.EV* | wc -l
728
“`

The result of the first grep is the number of hits (singles, doubles, triples, home runs) found (562). The result of the second grep is the number of non-at-bat plays (substitutions, base
steals, walks, etc.; 728); if I subtract it from the total number of plays (3224), I get the number of at-bats (2496). Phil’s batting average is 562(hits)/2456(at-bats) (x1000), or 225.

Great, so now let’s parallelize. The first thing I’ll do is get the data stored in Riak. That’s as simple as attaching to any node’s console and running this function:

“`erlang
load_events(Directory) ->
true = filelib:is_dir(Directory),
Name = iolist_to_binary(filename:basename(Directory)),
{ok, Client} = riak:local_client(),
{ok, LuwakFile} = luwak_file:create(Client, Name, dict:new()),
LuwakStream = luwak_put_stream:start_link(Client, LuwakFile, 0, 5000),
filelib:fold_files(Directory,
“.*.EV?”, %% only events files
false, %% non-recursive
fun load_events_fold/2,
LuwakStream),
luwak_put_stream:close(LuwakStream),
ok.

load_events_fold(File, LuwakStream) ->
{ok, FileData} = file:read_file(File),
luwak_put_stream:send(LuwakStream, FileData),
LuwakStream.
“`

I’ve put this code in a module named “baseball”, so running it is as simple as:

“`text
(riak@10.0.0.1) 1> baseball:load_events(“/home/bryan/baseball/1950s”).
“`

This will create one large Luwak file (approximately 48MB) named “1950s” by concatenating all 160 event files. Default Luwak settings are for 1MB blocks, so I’ll have 48 of them linked from my tree.

Mapping those blocks is quite simple. All I have to do is count the hits and at-bats for each block. The code to do so looks like this:

“`erlang
ba_map(LuwakBlock, _, PlayerId) ->
Data = luwak_block:data(LuwakBlock),
[count_at_bats(Data, PlayerId)].

count_at_bats(Data, PlayerId) ->
Re = [<<"^play,.,.,">>,PlayerId,<<",.*,.*,(.*)$">>], %”>>],
case re:run(Data, iolist_to_binary(Re),
[{capture, all_but_first, binary},
global, multiline, {newline, crlf}]) of
{match, Plays} ->
lists:foldl(fun count_at_bats_fold/2, {0,0}, Plays);
nomatch ->
{0, 0}
end.

count_at_bats_fold([Event], {Hits, AtBats}) ->
{case is_hit(Event) of
true -> Hits+1;
false -> Hits
end,
case is_at_bat(Event) of
true -> AtBats+1;
false -> AtBats
end}.

is_hit(Event) ->
match == re:run(Event,
“^(”
“S[0-9]” % single
“|D[0-9]” % double
“|T[0-9]” % triple
“|H([^P]|$)” % home run
“)”,
[{capture, none}]).

is_at_bat(Event) ->
nomatch == re:run(Event,
“^(”
“NP” % no-play
“|BK” % balk
“|CS” % caught stealing
“|DI” % defensive interference
“|OA” % base runner advance
“|PB” % passed ball
“|WP” % wild pitch
“|PO” % picked off
“|SB” % stole base
“|I?W” % walk
“|HP” % hit by pitch
“|SH” % sacrifice (but)
“)”,
[{capture, none}]).
“`

When the ba_map/3 function runs on a block, it produces a 2-element tuple. The first element of that tuple is the number of hits in the block, and the second is the number of at-bats. Combining them is even easier:

“`erlang
ba_reduce(Counts, _) ->
{HitList, AtBatList} = lists:unzip(Counts),
[{lists:sum(HitList), lists:sum(AtBatList)}].
“`

The ba_reduce/2 function expects a list of tuples produced by map function evaluations. It produces a single 2-element tuple whose first element is the sum of the first elements of all of the inputs (the total hits), and whose second; the second elements (the total at-bats).

These functions live in the same baseball module, so using them is simple:

“`erlang
Client:mapred({modfun, luwak_mr, file, Filename},
[{map, {modfun, baseball, ba_map}, PlayerID, false},
{reduce, {modfun, baseball, ba_reduce}, none, true}]),
“`

I’ve exposed that call as batting_average/2 function, so finding Phil Rizzuto’s batting average in the 1950s is as simple as typing at the Riak console:

“`text
(riak@10.0.0.1) 2> baseball:batting_average(<<”1950s”>>, <<”rizzp101″>>).
225
“`

Tada! Parallel processing power! But, you couldn’t possibly let me get away without a micro-benchmark, could you? Here’s what I saw:

Environment Time
grep [4] 0.060 + 0.081 + 0.074 = 0.215s (0.002*3 = 0.006s cached)
Riak, 1 node [5] 0.307s (0.012s cached)
Riak, 4 nodes 0.163s (0.024s cached)

All of the disclaimers about micro-benchmarks apply: disc caches play games, opening and closing files takes time, this isn’t a large enough dataset to highlight the really interesting cases, etc. But, I’m fairly certain that these numbers show two things. The first is that since the Riak times aren’t orders of magnitude off of the grep times, the Riak approach is not fundamentally flawed. The second is that since the amount of time decreases with added nodes, some parallelism is being exploited.

There is, of course, at least one flaw in this specific implementation, though, and also several ways to improve it. Anyone want to pontificate?

-Bryan

[1] Astute readers
will note that this is really distrubted grep, entangled with some
processing, but at least it gives more interesting input and
output data.

[2] Forgive me if I botch these calculations. I stopped playing baseball when the coach was still pitching. The closest I’ve come since is intramural softball. But, it’s all numbers, and I can handle numbers. Wikipedia has been my guide, for better or worse.

[3] Okay, maybe not so much astuteness is necessary.

[4] My method for aquiring the grep stat was, admitedly, lame. Specifically, I ran the grep commands as given above, using the “time” utility (i.e. “time grep -E …”). I added the time for the three runs together, and estimated the time for the final sum and division at 0.

[5] Timing the map/reduce was simple, by using timer:tc(baseball, batting_average, [<<"1950s">>, <<"rizzp101">>])..

A Deeper Look At Riak's MapReduce Enhancements

January 6, 2010

We officially released Riak 0.14 yesterday. Some of the biggest enhancements were in and around Riak’s MapReduce functionality. Here’s a more in-depth look at what you can look forward to in 0.14 if you’re into Mapping and Reducing.

Key Filtering

Performing any type of sophisticated query on a strictly key/value store is notoriously hard. Past releases of Riak were limited to MapReduce-ing over either entire buckets or a discrete set of user-supplied inputs. The problem with these approaches is neither facilitated the kind of robust querying many applications required. For example, let’s examine an application which logs application events to a Riak bucket. Each entry is a JSON hash containing a timestamp, the user generating the event, and some information about the event. Part of our example application requires querying these log entries based on timestamp and user.

In current releases of Riak, the application would have to map over the entire bucket and examine each entry to find the relevant set. This type of query is usable when the bucket is small. But when the bucket gets bigger these types of queries begin to exhibit performance problems. Scanning the bucket and loading objects from disk only to discard them is an expensive proposition.

This is exactly the use case where key filtering can help. If the application can store meaningful data in the keys then key filtering can query just the keys and load only the objects whose keys pass the filter to be processed by the MapReduce job. For our example app we could combine the timestamp and user id to form each entry’s key like this: “1292943572.pjohnson”. Using key filters we can locate all the user entries for “pjohnson” between “12/1/2010″ and “12/7/2010 “and count them via MapReduce:

“`javascript
{“inputs”: {
“bucket”: “user_logs”,
“key_filters”: [["and", [["tokenize", ".", 1],
["string_to_int"],
["between", 1291161600, 1291852799]],
[["tokenize", ".", 2],
["matches", "pjohnson"]]]]},
“query”: [{"map":{
"language": "javascript",
"source": "function(obj) { return [1]; }”}},
“reduce”:{
“language”: “javascript”,
“name”: “Riak.reduceSum”}]}
“`

Key filtering will support boolean operators (and, or, not), url decoding, string tokenizing, regular expressions, and various string to numeric conversions. Client support will initially be limited to the Java and Ruby clients (and Python support is already being attacked by the community). More clients will be added in subsequent releases.

Next Steps

MapReduce Query Planner

One of the biggest obstacles to improving Riak’s MapReduce performance was the way map functions were scheduled around the cluster. The original implementation was fairly naive and scheduled map functions around the vnodes in the order listed in the replica list for each bucket/key pair. This approach resulted in vnode hotspots, especially in smaller clusters, as many bucket/key pairs would hash to the same vnode. We also sent each bucket/key pair to be mapped in a separate Erlang message which reduced throughput on larger jobs as they wound up generating significant messaging traffic.

The new planner addresses many of these problems. Each batch of 50 bucket/key pairs are analyzed and scheduled around the cluster to maximize vnode coverage. In other words, the planner schedules many bucket/key pairs onto a common vnode in a single message. This reduces the chattiness of jobs overall and also improves throughput as the underlying map dispatch code can operate in batches rather than single values.

Segregated Javascript VM Pools

Contention for Javascript VMs in a busy cluster can be a significant source of performance problems. The contention is caused by each cluster node having a single pool of Javascript VMs for all Javascript calls: map functions, reduce functions, and pre-commit hooks.

0.14 supports three separate pools of Javascript VMs to reduce overall contention. By tweaking a few lines of code in your app.config file, users will be able to tailor the size of each pool to their particular needs. Does your app use MapReduce and ignore hooks? Turn the hook pool size down to zero and save yourself some CPU and memory. Do you always submit MapReduce jobs to a particular node in the cluster? You can bump up the reduce pool size on the node receiving the jobs while setting it to zero on the other nodes. This uses the fact that reduce phases aren’t distributed to use resources where they are most needed in the cluster.

As you can see, we’ve put a lot of work into refining MapReduce in the latest release, and we’re dedicated to continuing this work in upcoming releases. If you want to get your hands dirty with MapReduce right now, check out:

Enjoy!

The Basho Team

Riak 0.14 Released

January 1, 2011

Happy New Year, Happy Wednesday, and Happy Riak 0.14! It’s a new year and it’s time for a new version of Riak. We’ve been putting the final touches on the latest release candidate for the last few days and we are now ready to call Riak 0.14, a.k.a “Dakota,” official.

Here’s the rundown of the large improvements (for those of you who just want the release notes, stop reading and click here):

MapReduce Enhancements

As promised, we put some significant development time towards the robustness, performance, and stability of MapReduce in 0.14. There are three primary areas worth mentioning:

  1. Key Filtering – Until now, MapReduce jobs were only able to run over entire buckets or a specific set of user-supplied keys. This approach can result in some performance hiccups as your buckets and inputs grow. Key Filtering, which is new in this release, enables you to build meaningful data into your keys and then filter for a given set of keys before processing, thus focusing the inputs for job and increasing performance. Key filtering will support boolean operators (and, or, not), url decoding, string tokenizing, regular expressions, and various string to numeric conversions.
  2. MapReduce Query Planner – Scheduling functions is hard. Our approach to several components in the scheduling process in previous Riak releases was less than optimal, so we’ve done a lot of work to refine the process. The new query planner batches each set of 50 bucket/key pairs that are then analyzed and scheduled around the cluster to maximize vnode coverage. This yielded a nice reduction in cluster chattiness while improving throughput. Win Win™.
  3. Segregated Javascript VM Pools – 0.14 will support three separate pools of Javascript VMs to reduce overall contention. Why three separate pools? For the three different JS calls: map functions, reduce functions, and pre-commit hooks. This fine-grained level of tweaking will let you better allocate resources and improve cluster performance.

This slide deck talks more about these three enhancements. And, there is a lengthier blog post coming out tomorrow dedicated to these MapReduce improvements…

Cluster and Node Debugging

The ability to monitor and debug a running Riak cluster received some substantial enhancements in 0.14. This is because Riak is now shipping with two new applications: riak_err and cluster_info. Basho hacker Scott Fritchie posted a blog back in November with an extensive overview of what these two applications will make possible when running Riak. Read that for all the details. The short version is that a) riak_err improves Riak’s runtime robustness by strictly limiting the amount of RAM that is used while processing event log messages and b) cluster_info assists troubleshooting by automatically gathering lots of environment, configuration, and runtime statistics data into a single file.

We’ve also added some new documentation to the wiki on the Command Line Tools page (at the bottom) with some more details on what cluster_info is all about and how use it.

Windowed Merges for Bitcask

The default storage backend for Riak is Bitcask, and we are increasingly seeing users select this for their production clusters thanks to (among other things) its low, predictable latencies and high throughput. Bitcask saw numerous enhancements and bug fixes in 0.14, the most significant of which is something called “windowed merges.” Bitcask performs periodic merges over all non-active files to compact the space being occupied by old versions of stored data. In certain situations this can cause some memory and CPU spikes on the Riak node where the merge is taking place. To that end, we’ve added the ability to specify when Bitcask will perform merges. So, for instance, if you know that you typically see the lowest load on your cluster between 2 and 4 AM, you can set this time frame as your acceptable start and stop time for merges. This is set in your bitcask.app file.

Other Noteworthy Enhancements

Other noteworthy enhancements include support for HTTPS and multiple HTTP IPs, packaging scripts for building debs, rpms and Solaris packages, and the ability to list buckets through the REST API. Check out the release notes for a complete list of new features and bug fixes.

Contributors for 0.14

Aside from the core Basho Devs, here is the list[1] of people (in no particular order) who contributed code, bug fixes and other bits between 0.13 and 0.14 (across all the OTP apps that come bundled with Riak):

Tuncer Ayaz, Jebu Ittiachen, Ben Black, Jesper Louis Andersen, Fernando Benavides, Magnus Klaar, Mihai Balea, Joseph Wayne Norton, Anthony Ramine, David Reid, Benjamin Nortier, Alexey Romanov, Adam Kocoloski, Juhani Rankimies, Andrew Thompson, Misha Gorodnitzky, Daniel Néri, andekar, Kostis Sagonas, Phil Pirozhkov, Benjamin Bock, Peter Lemenkov.

Thanks for your contributions! Keep them coming.

1 – If I forgot or misspelled your name, email mark@basho.com and we’ll add/fix it ASAP.

Hey, what about Riak Search?!

We’ve got a few release-related loose ends to tie up with Riak Search. But don’t worry. This release was very significant for Search, and we’re shooting to have it tagged and released next week.

So what should you do now?

We’re already hard at work on the next release. We’re calling it “Elgin.” (Bonus Riak T shirt for anyone who can find the pattern behind the naming scheme; Dakota and Elgin should be enough info to go on.) If you want to get involved with Riak, join the mailing list or come hang out in the Riak channel on IRC to get your feet wet.

Other than that, thanks for using Riak!

The Basho Team

A Short Survey For Developers

December 29, 2010

The Dev team here at Basho is in the process of prioritizing some code and new feature development. So, we wanted your opinion on it. We threw together a short, simple survey to get some feedback on where we should be spending our time.

Whether you’re running Riak in production right now or only considering it for a future app, we want your feedback. It shouldn’t take you more than three minutes and it will greatly help us over the coming months.

Let us know if have any questions and thanks for participating.

The Basho Team

Riak Wiki Now Running On Gollum

December 28, 2010

Those of you who are familiar with the Riak Wiki may have been surprised by what you saw at wiki.basho.com starting yesterday. That’s because we just gave it a facelift. More importantly, the team at Basho has spent a fair amount of time converting the Riak Wiki to use Gollum. Gollum is a “simple, Git-powered wiki with a sweet API and local frontend” written and maintained by the awesome team at GitHub.

Those of you who are accustomed to the “old wiki” will find the same great content with a revamped design, something that more closely resembles the look and feel of basho.com and the Riak Function Contrib.

Arguably the best feature of using Gollum is that contributing to the Riak Wiki is now dead simple, and is no different than contributing code to any repo on GitHub. Riak users are, after all, developers. Why not let them work with a wiki the same way they work with Riak code itself?

This is one of several website enhancements you’ll be seeing from the team at Basho over the coming months, so stay tuned.

In the meantime, peruse the new wiki and of course, go download Riak if you haven’t already.

The Basho Team

A Few Noteworthy Contributions to Riak

December 16, 2010

The community contributions to Riak have been increasing at an exciting rate. The pull requests are starting to roll in, and I wanted to take a moment and recognize several of the many contributions we’ve received over the past months and weeks (and days).

Ripple

Anyone who uses Riak with Ruby knows about Ripple. This is Basho’s canonical Ruby driver and its development has been spearheaded by Basho hacker Sean Cribbs. Not long after Sean started developing this code, he saw a significant influx in Rubyists who were interested in using Riak with Ruby and wanted to lend a hand in the driver’s development. Sean was happy to oblige and, as a result, there are now 15 developers in addition to Sean who have contributed to Ripple in a significant way. Special recognition should also be given to Duff Omelia and Adam Hunter who have made significant contributions to the code and use it in production.

Riak-js

Francisco Treacy and the team at Widescript made it known many months ago that they were looking into Riak to power part of their application. They, along with several other community members, were experimenting with Riak and Node.js. There were a few Node clients for Riak, but they were primarily experimental and immature. Basho had plans to write one but development time was stretched and a node client was several months off.

So, they rolled their own. Francisco, along with Alexander Sicular, James Sadler, Jakub Stastny, and Rick Olson developed and released riak-js. Since its release, it has picked up a ton of users and is being used in applications all over the place. (We liked it so much we even decided to build an app on it… more on this later.).

Thanks, guys, for the node client and helping to kickstart the Riak+Node.js community.

Riak Support in Spring Data

VMware’s Spring Data project is an ambitious one, and it has huge implications for the proliferation of new database technologies in application stacks everywhere. VMware made it known that Riak was slated for integration, needing only someone to take the time to write the code to connect the two. Jon Brisbin took up the task and never looked back.

Jon’s twitter stream is essentially a running narrative of how his work on Riak developed and, as you can see, it took about a month to build support for Riak into the Grails framework, the culmination of which was the 1.0.0.M1 release of the Riak Support in Spring Data.

So, if you’re using Riak with Spring Data, you have Jon Brisbin to thank for the code that made it possible. Thanks, Jon.

Python Docs

I met Daniel Lindsley at StrangeLoop in October. Rusty Klophaus and I were helping him debug a somewhat punishing benchmarking test he was running against a three node Riak cluster on his laptop (during a Cassandra talk) using Basho’s Python client. About a month later Daniel wrote a fantastic blog post called Getting Started With Riak & Python. Though his impressions of Riak were positive on the whole, one of the main points of pain for Daniel was that the Python library had poor documentation. At the time, this was true. Though the library was quite mature as far as functionality goes, the docs had been neglected. I got in touch with Daniel, thanked him for the post, and let him know we were working on the docs. He mentioned he would take a stab at updating the docs if he had a free moment. Shortly thereafter Daniel sent over a huge pull request. He rewrote all of the Python documentation! And it’s beautiful. Check them out here.

Thanks to Daniel and the rest of the team at Pragmatic Badger, we have robust Python documentation. Thanks for the contribution.

Want to contribute to Riak? There is still much code to be written and the Riak community is a great place to work and play. Download the code, join us on IRC, or take a look at GitHub repos to get started.

Mark

Intro to Riak Webcast this Thursday

**October 22, 2012**

If you missed our last ‘Intro to Riak’ webcast, not to fear, we’re doing another. This Thursday (11am PT / 2pm ET) , join Shanley, Basho’s director of product marketing, and Mark, director of coumminity, for a 30 minute webcast introducing Riak’s architecture, use cases, user stories, operations and data model.

Register for the webcast [here](http://info.basho.com/IntroToRiakOct25.html).

[Shanley](http://twitter.com/shanley)

[Mark](http://twitter.com/#!/pharkmillups)

Webcast- Intro to Riak, Thursday January 17

**January 10, 2013**

Join us on Thursday, January 17 at 11 am PT / 2 pm ET for an intro to Riak webcast with Shanley Kane, director of product management and Mark Phillips, director of community management.

In 30 minutes, we’ll cover:

* Good and bad use cases for Riak
* User stores of note, including social, content, advertising and session storage
* Riak’s architecture and mechanisms for remaining highly available in failure conditions
* APIs, data model and client libraries
* Features for querying and searching data
* What’s new in the latest version of Riak and what’s next

[Sign up for the webcast here](http://info.basho.com/IntroToRiakJan17.html).

[Basho](http://twitter.com/basho)

From Relational to Riak (Webcast)

**January 02, 2013**

New to Riak? Thinking about using Riak instead of a relational database? Join Basho chief architect Andy Gross and director of product management Shanley Kane for an intro this Thursday (11am PT/2pm ET). In about 30 minutes, we’ll cover the basics of:

* Scalability benefits of Riak, including an examination of limitations around master/slave architectures and sharding, and what Riak does differently
* A look at the operational aspects of Riak and where they differ from relational approaches
* Riak’s data model and benefits for developers, as well as the tradeoffs and limitations of a key/value approach
* Migration considerations, including where to start when migrating existing apps
* Riak’s eventually consistent design
* Multi-site replication options in Riak

Register for the webcast [here](http://info.basho.com/RelationalToRiakJan3.html).

[Shanley](http://twitter.com/shanley)

[Andy](https://twitter.com/argv0)

Intro to Riak Webcast

October 1, 2012

New to Riak? Join us this Thursday for an intro to Riak webcast with Mark Phillips, Basho director of community, and Shanley Kane, director of product management. In this 30 minute talk we’ll cover the basics of:

Good and bad use cases for Riak

  • Some user stories of note
  • Riak’s architecture: consistent hashing, hinted handoff, replication, gossip protocol and more
  • APIs and client libraries
  • Features for searching and aggregating data: Riak Search, Secondary Indexes and Map Reduce
  • What’s new in the latest version of Riak

Register for the webcast here.

Shanley

Mark