Tag Archives: query

A Deeper Look At Riak's MapReduce Enhancements

January 6, 2010

We officially released Riak 0.14 yesterday. Some of the biggest enhancements were in and around Riak’s MapReduce functionality. Here’s a more in-depth look at what you can look forward to in 0.14 if you’re into Mapping and Reducing.

Key Filtering

Performing any type of sophisticated query on a strictly key/value store is notoriously hard. Past releases of Riak were limited to MapReduce-ing over either entire buckets or a discrete set of user-supplied inputs. The problem with these approaches is neither facilitated the kind of robust querying many applications required. For example, let’s examine an application which logs application events to a Riak bucket. Each entry is a JSON hash containing a timestamp, the user generating the event, and some information about the event. Part of our example application requires querying these log entries based on timestamp and user.

In current releases of Riak, the application would have to map over the entire bucket and examine each entry to find the relevant set. This type of query is usable when the bucket is small. But when the bucket gets bigger these types of queries begin to exhibit performance problems. Scanning the bucket and loading objects from disk only to discard them is an expensive proposition.

This is exactly the use case where key filtering can help. If the application can store meaningful data in the keys then key filtering can query just the keys and load only the objects whose keys pass the filter to be processed by the MapReduce job. For our example app we could combine the timestamp and user id to form each entry’s key like this: “1292943572.pjohnson”. Using key filters we can locate all the user entries for “pjohnson” between “12/1/2010″ and “12/7/2010 “and count them via MapReduce:

{“inputs”: {
“bucket”: “user_logs”,
“key_filters”: [["and", [["tokenize", ".", 1],
["between", 1291161600, 1291852799]],
[["tokenize", ".", 2],
["matches", "pjohnson"]]]]},
“query”: [{"map":{
"language": "javascript",
"source": "function(obj) { return [1]; }”}},
“language”: “javascript”,
“name”: “Riak.reduceSum”}]}

Key filtering will support boolean operators (and, or, not), url decoding, string tokenizing, regular expressions, and various string to numeric conversions. Client support will initially be limited to the Java and Ruby clients (and Python support is already being attacked by the community). More clients will be added in subsequent releases.

Next Steps

MapReduce Query Planner

One of the biggest obstacles to improving Riak’s MapReduce performance was the way map functions were scheduled around the cluster. The original implementation was fairly naive and scheduled map functions around the vnodes in the order listed in the replica list for each bucket/key pair. This approach resulted in vnode hotspots, especially in smaller clusters, as many bucket/key pairs would hash to the same vnode. We also sent each bucket/key pair to be mapped in a separate Erlang message which reduced throughput on larger jobs as they wound up generating significant messaging traffic.

The new planner addresses many of these problems. Each batch of 50 bucket/key pairs are analyzed and scheduled around the cluster to maximize vnode coverage. In other words, the planner schedules many bucket/key pairs onto a common vnode in a single message. This reduces the chattiness of jobs overall and also improves throughput as the underlying map dispatch code can operate in batches rather than single values.

Segregated Javascript VM Pools

Contention for Javascript VMs in a busy cluster can be a significant source of performance problems. The contention is caused by each cluster node having a single pool of Javascript VMs for all Javascript calls: map functions, reduce functions, and pre-commit hooks.

0.14 supports three separate pools of Javascript VMs to reduce overall contention. By tweaking a few lines of code in your app.config file, users will be able to tailor the size of each pool to their particular needs. Does your app use MapReduce and ignore hooks? Turn the hook pool size down to zero and save yourself some CPU and memory. Do you always submit MapReduce jobs to a particular node in the cluster? You can bump up the reduce pool size on the node receiving the jobs while setting it to zero on the other nodes. This uses the fact that reduce phases aren’t distributed to use resources where they are most needed in the cluster.

As you can see, we’ve put a lot of work into refining MapReduce in the latest release, and we’re dedicated to continuing this work in upcoming releases. If you want to get your hands dirty with MapReduce right now, check out:


The Basho Team

Riak 0.14 Released

January 1, 2011

Happy New Year, Happy Wednesday, and Happy Riak 0.14! It’s a new year and it’s time for a new version of Riak. We’ve been putting the final touches on the latest release candidate for the last few days and we are now ready to call Riak 0.14, a.k.a “Dakota,” official.

Here’s the rundown of the large improvements (for those of you who just want the release notes, stop reading and click here):

MapReduce Enhancements

As promised, we put some significant development time towards the robustness, performance, and stability of MapReduce in 0.14. There are three primary areas worth mentioning:

  1. Key Filtering – Until now, MapReduce jobs were only able to run over entire buckets or a specific set of user-supplied keys. This approach can result in some performance hiccups as your buckets and inputs grow. Key Filtering, which is new in this release, enables you to build meaningful data into your keys and then filter for a given set of keys before processing, thus focusing the inputs for job and increasing performance. Key filtering will support boolean operators (and, or, not), url decoding, string tokenizing, regular expressions, and various string to numeric conversions.
  2. MapReduce Query Planner – Scheduling functions is hard. Our approach to several components in the scheduling process in previous Riak releases was less than optimal, so we’ve done a lot of work to refine the process. The new query planner batches each set of 50 bucket/key pairs that are then analyzed and scheduled around the cluster to maximize vnode coverage. This yielded a nice reduction in cluster chattiness while improving throughput. Win Win™.
  3. Segregated Javascript VM Pools – 0.14 will support three separate pools of Javascript VMs to reduce overall contention. Why three separate pools? For the three different JS calls: map functions, reduce functions, and pre-commit hooks. This fine-grained level of tweaking will let you better allocate resources and improve cluster performance.

This slide deck talks more about these three enhancements. And, there is a lengthier blog post coming out tomorrow dedicated to these MapReduce improvements…

Cluster and Node Debugging

The ability to monitor and debug a running Riak cluster received some substantial enhancements in 0.14. This is because Riak is now shipping with two new applications: riak_err and cluster_info. Basho hacker Scott Fritchie posted a blog back in November with an extensive overview of what these two applications will make possible when running Riak. Read that for all the details. The short version is that a) riak_err improves Riak’s runtime robustness by strictly limiting the amount of RAM that is used while processing event log messages and b) cluster_info assists troubleshooting by automatically gathering lots of environment, configuration, and runtime statistics data into a single file.

We’ve also added some new documentation to the wiki on the Command Line Tools page (at the bottom) with some more details on what cluster_info is all about and how use it.

Windowed Merges for Bitcask

The default storage backend for Riak is Bitcask, and we are increasingly seeing users select this for their production clusters thanks to (among other things) its low, predictable latencies and high throughput. Bitcask saw numerous enhancements and bug fixes in 0.14, the most significant of which is something called “windowed merges.” Bitcask performs periodic merges over all non-active files to compact the space being occupied by old versions of stored data. In certain situations this can cause some memory and CPU spikes on the Riak node where the merge is taking place. To that end, we’ve added the ability to specify when Bitcask will perform merges. So, for instance, if you know that you typically see the lowest load on your cluster between 2 and 4 AM, you can set this time frame as your acceptable start and stop time for merges. This is set in your bitcask.app file.

Other Noteworthy Enhancements

Other noteworthy enhancements include support for HTTPS and multiple HTTP IPs, packaging scripts for building debs, rpms and Solaris packages, and the ability to list buckets through the REST API. Check out the release notes for a complete list of new features and bug fixes.

Contributors for 0.14

Aside from the core Basho Devs, here is the list[1] of people (in no particular order) who contributed code, bug fixes and other bits between 0.13 and 0.14 (across all the OTP apps that come bundled with Riak):

Tuncer Ayaz, Jebu Ittiachen, Ben Black, Jesper Louis Andersen, Fernando Benavides, Magnus Klaar, Mihai Balea, Joseph Wayne Norton, Anthony Ramine, David Reid, Benjamin Nortier, Alexey Romanov, Adam Kocoloski, Juhani Rankimies, Andrew Thompson, Misha Gorodnitzky, Daniel Néri, andekar, Kostis Sagonas, Phil Pirozhkov, Benjamin Bock, Peter Lemenkov.

Thanks for your contributions! Keep them coming.

1 – If I forgot or misspelled your name, email mark@basho.com and we’ll add/fix it ASAP.

Hey, what about Riak Search?!

We’ve got a few release-related loose ends to tie up with Riak Search. But don’t worry. This release was very significant for Search, and we’re shooting to have it tagged and released next week.

So what should you do now?

We’re already hard at work on the next release. We’re calling it “Elgin.” (Bonus Riak T shirt for anyone who can find the pattern behind the naming scheme; Dakota and Elgin should be enough info to go on.) If you want to get involved with Riak, join the mailing list or come hang out in the Riak channel on IRC to get your feet wet.

Other than that, thanks for using Riak!

The Basho Team

Free Webinar – Schema Design for Riak – Dec 7 at 2PM Eastern

December 1, 2010

Moving applications to Riak involves a number of changes from the status quo of RDBMS systems, one of which is taking greater control over your schema design. You’ll have questions like: How do you structure data when you don’t have tables and foreign keys? When should you denormalize, add links, or create MapReduce queries? Where will Riak be a natural fit and where will it be challenging?

We invite you to join us for a free webinar on Tuesday, December 7 at 2:00PM Eastern Time to talk about Schema Design for Riak. We’ll discuss:

  • Freeing yourself of the architectural constraints of the “relational” mindset
  • Gaining a fuller understanding of your existing schema and its queries
  • Strategies and patterns for structuring your data in Riak
  • Tradeoffs of various solutions

We’ll address the above topics and more as we design a new Riak-powered schema for a web application currently powered by MySQL. The presentation will last 30 to 45 minutes, with time for questions at the end.

If you missed the previous version of this webinar in July, here’s your chance to see it! We’ll also use a different example this time, so even if you attended last time, you’ll probably learn something new.

Fill in the form below if you want to get started building applications on top of Riak!

Sorry, registration is closed! Video of the presentation will be posted on Vimeo after the webinar has ended.

The Basho Team

Free Webinar – Riak with Rails – August 5 at 2PM Eastern

July 29, 2010

Ruby on Rails is a powerful web framework that focuses on developer productivity. Riak is a friendly key value store that is simple, flexible and scalable. Put them together and you have lots of exciting possibilities!

We invite you to join us for a free webinar on Thursday, August 5 at 2:00PM Eastern Time (UTC-4) to talk about Riak with Rails. In this hands-on webinar, we’ll discuss:

  • Setting up a new Rails 3 project for Riak
  • Storing, retrieving, manipulating key-value data from Ruby
  • Issuing map-reduce queries
  • Creating rich document models with Ripple
  • Using Riak as a distributed cache and session store

The presentation will last 30 to 45 minutes, with time for questions at the end. Fill in the
form below if you want to get started building Rails applications on top of Riak!

Sorry, registration is closed.

The Basho Team

Free Webinar – MapReduce Querying in Riak – July 22 at 2PM

July 15, 2010

Map-Reduce is a flexible and powerful alternative to declarative query languages like SQL that takes advantage of Riak’s distributed architecture. However, it requires a whole new way of thinking about how to collect, process, and report your data, and is tightly coupled to how your data is stored in Riak.

We invite you to join us for a free webinar on Thursday, July 22 at 2:00PM Eastern Time (UTC-4) to talk about Map-Reduce Querying in Riak. We’ll discuss:

  • How Riak’s Map-Reduce differs from other systems and query languages
  • How to construct and submit Map-Reduce queries
  • Filtering, extracting, transforming, aggregating, and sorting data
  • Understanding the efficiency of various types of queries
  • Building and deploying reusable Map-Reduce function libraries

We’ll cover the above topics in conjunction with practical examples from sample applications. The presentation will last 30 to 45 minutes, with time for questions at the end.

Fill in the form below if you want to get started building applications with Map/Reduce on top of Riak!

Sorry, registration has closed!

The Basho Team

Webinar Recap – MapReduce Querying in Riak

July 7, 2010

Thank you to all who attended the webinar last Thursday, it was a great turnout with awesome engagement. Like before, we’re recapping the questions below for everyone’s sake (in no particular order).

Q: Say I want to perform two-fold link walking but don’t want to keep the “walk-through” results, including the initial one. Can I do something to keep only the last result?

In a MapReduce query, you can specify any number of phases to keep or ignore using the “keep” parameter on the phase. Usually you only want to keep the final phase. If you’re using the link-walker resource, it’ll return results from any phases whose specs end in “1″. See the REST API wiki page for more information on link-walking.

Q: Will Riak Search work along with MapReduce, for example, to avoid queries over entire bucket?Will there be a webinar about Riak Search?

Yes, we intend to have this feature in the Generally Available release of Riak Search. We will definitely have a webinar about Riak Search close to its public release.

Q: Are there still problems with executing “qfun” functions from Erlang during MapReduce?

“qfun” phases (that use anonymous Erlang functions) will work on a one-node cluster, but not across a multi-node cluster. You can use them in development but it’s best to switch to a compiled module function or Javascript function when moving to production.

Q: Although streams weren’t mentioned, do you have any recommendations on when to use streaming map/reduce versus normal map/reduce?

Streaming MapReduce sends results back as they get produced from the last phase, in a multipart/mixed format. To invoke this, add ?chunked=true to the URL when you submit the job. Streaming might be appropriate when you expect the result set to be very large and have constructed your application such that incomplete results are useful to it. For example, in an AJAX web application, it might make sense to send some results to the browser before the entire query is complete.

Q: Which way is faster: storing a lot of links or storing the target keys in the value as a list? Are there any limits to the maximum number of links on a key?

How the links are stored will likely not have a huge impact on performance. If you choose to store a key list in a document, both methods would work. There are two relevant operations that would be performed with the key list document (updating and traversal).

The update process would involve retrieving the list, adding a value, and saving the list. If you are using the REST interface you will need to be aware of limitations in the number of allowed headers and the allowed header length. Mochiweb restricts the number of allowed headers to 1000. Header length is limited to 8192 characters. This imposes an upper limit for the number of Links that can be set through the REST interface.

The best method for updating a key list would be to write a post commit hook that performed the update. This avoids the need to access the key list using the REST interface so header limitations are no longer a concern. However, the post-commit hook could become a bottleneck in your update path if number of links grows large.

Traversal involves retrieving the key list document, collecting the related keys, and outputting a bucket/key list to be used in proceeding map phases. A built-in function is provided to process links. If you were to store keys in the value you would need to write a custom function to parse the keys and generate a bucket/key list.

Q: What’s the benefit of passing an arg to a map or reduce phase? Couldn’t you just send the function body with the arg value filled in? Can I pass in a list of args or an arbitrary number of args?

When you have a lot of queries that are similar but with minor differences, you might be able to generalize a map or reduce function so that it can vary based on the ‘arg’ parameter. Then you could store that function in a built-ins library (see the question below) so it’s preloaded rather than evaluated at query-time. The arg parameter can be any valid JSON value.

Q: What’s the behavior if the map function is missing from one or more nodes but present on others?

The entire query will fail. It’s best to make sure, perhaps via automated deployment, that all of your functions are available on all nodes. Alternatively, you can store Javascript functions directly in Riak and use them in a phase with “bucket” and “key” instead of “source” or “name”.

Q: If there are 2 map phases, for example, then does that mean that both phases will be run back to back on each individual node and *then* it’s all sent back for reduce? Or is there some back and forth between phases?

It’s more like a pipeline, one phase feeds the next. All results from one phase are sent back to the coordinating node, which then initiates the subsequent phase once all participating nodes have replied.

Q: Would it be possible to send a function which acts as both a map predicate and an updater?

In general we don’t recommend modifying objects as part of a MapReduce job because it can add latency to the request. However, you may be able to implement this with a map function in Erlang. Erlang MapReduce functions have full access to Riak including being able to read and write data.

%% Inside your own Erlang module
map_predicate_with_update(Value,_KeyData,_Arg) ->
case predicate(Value) of
true -> [update_passed_value(Value)];
_ -> []

update_passed_value(Value) ->
{ok, C} = riak:local_client(),
%% modify your object here, store with C:put

This could come in handy for large updates instead of having to pull each object, update it and store it.

Q: Are Erlang named functions or JS named functions more performant? Which are faster — JS or Erlang functions?

There is a slight overhead when encoding the Riak object to JSON but otherwise the performance is comparable.

Q: Is there a way to use namespacing to define named Javascript functions? In other words, if I had a bunch of app-specific functions, what’s the best way to handle that?

Yes, checkout the built-in Javascript MapReduce functions for an example.

Q: Can you specify how data is distributed among the cluster?

In short, no. Riak consistently hashes keys to determine where in the cluster data is located. This article explains how data is replicated and distributed throughout the cluster. In most production situations, your data will be evenly distributed.

Q: What is the reason for the nested list of inputs to a MapReduce query?

The nested list lets you specify multiple keys as inputs to your query, rather than a single bucket name or key. From the Erlang client, inputs are expressed as lists of tuples (fixed-length arrays) which have length of 2 (for bucket/key) or 3 (bucket/key/key-specific-data). Since JSON has no tuple type, we have to express the inputs as arrays of length 2 or 3 within an array.

Q: Is there a syntax requirement of JSON for Riak?

JSON is only required for the MapReduce query when submitted via HTTP, the objects you store can be in any format that your application will understand. JSON also happens to be a convenient format for MapReduce processing because it is accessible to both Erlang and Javascript. However, it is fairly common for Erlang-native applications to store data in Riak as serialized Erlang datatypes.

Q: Is there any significance to the name of file for how Riak finds the saved functions? I assume you can leave other languages in the same folder and it would be ignored as long as language is set to javascript? Additionally, is it possible/does it make sense to combine all your languages into a single folder?

Riak only looks for “*.js” files in the js_source_dir folder (see Configuration Files on the wiki). Erlang modules that contain map and reduce functions need to be on the code path, which could be completely separate from where the Javascript files are located.

Q: Would you point us to any best practices around matrix computations in Riak? I don’t see any references to matrix in the riak wiki…

We don’t have any specific support for matrix computations. We encourage you to find an appropriate Javascript or Erlang library to support your application.

Dan and Sean

Link Walking By Example

February 24, 2010

Riak has a notion of “links” as part of the metadata of its objects. We talk about traversing, or “walking”, links, but what do the queries for doing so actually look like?

Let’s put four objects in riak:

  1. hb/first will link to hb/second and hb/third
  2. hb/second will link to hb/fourth
  3. hb/third will also link to hb/fourth
  4. hb/fouth doesn’t link anywhere
$ curl -X PUT -H "content-type:text/plain" 
  -H "Link: </riak/hb/second>; riaktag="foo", </riak/hb/third>; riaktag="bar"" 
  http://localhost:8098/riak/hb/first --data "hello"

$ curl -X PUT -H "content-type: text/plain" 
  -H "Link:</riak/hb/fourth>; riaktag="foo"" 
  http://localhost:8098/riak/hb/second --data "the second"

$ curl -X PUT -H "content-type: text/plain" 
  -H "Link:</riak/hb/fourth>; riaktag="foo"" 
  http://localhost:8098/riak/hb/third --data "the third"

$ curl -X PUT -H "content-type: text/plain" 
  http://localhost:8098/riak/hb/fourth --data "the fourth"

Now, say we wanted to start at hb/first, and follow all of its outbound links. The easiest way to do this is with the link-walker URL syntax:

$ curl http://localhost:8098/riak/hb/first/_,_,_

The response will be a multipart/mixed body with two parts: the hb/second object in one, and the hb/third object in the other:

Content-Type: multipart/mixed; boundary=3ai6VRl4aLli3dKw8tG9unUeznT

X-Riak-Vclock: a85hYGBgzGDKBVIsTKLLozOYEhnzWBn+H/h5hC8LAA==
Location: /riak/hb/third
Content-Type: text/plain
Link: </riak/hb>; rel="up", </riak/hb/fourth>; riaktag="foo"
Etag: 5Fs0VskZWx7Y25tf1oQsvS
Last-Modified: Wed, 24 Feb 2010 15:25:51 GMT

the third
Location: /riak/hb/second
Content-Type: text/plain
Link: </riak/hb>; rel="up", </riak/hb/fourth>; riaktag="foo"
Etag: 2ZKEJ2gaT57NT7xhLDPCQz
Last-Modified: Wed, 24 Feb 2010 15:24:11 GMT

the second


It’s also possible to express the same query in map-reduce, directly:

$ curl -X POST -H "content-type:application/json" 
  http://localhost:8098/mapred --data @-
{ return [v]; }"}}]}

That’s the exact same query. The content type of the response is different. It’s now a JSON array with two elements: a JSON encoding of the hb/second object, and a JSON encoding of the hb/third object. (Pretty-printed here, for clarity.)

        "bucket": "hb",
        "key": "second",
        "vclock": "a85hYGBgzGDKBVIsLEHbN2YwJTLmsTLMPvDzCF8WAA==",
        "values": [
                "metadata": {
                    "Links": [
                    "X-Riak-VTag": "2ZKEJ2gaT57NT7xhLDPCQz",
                    "content-type": "text/plain",
                    "X-Riak-Last-Modified": "Wed, 24 Feb 2010 15:24:11 GMT",
                    "X-Riak-Meta": []
                "data": "the second"
        "bucket": "hb",
        "key": "third",
        "vclock": "a85hYGBgzGDKBVIsTKLLozOYEhnzWBn+H/h5hC8LAA==",
        "values": [
                "metadata": {
                    "Links": [
                    "X-Riak-VTag": "5Fs0VskZWx7Y25tf1oQsvS",
                    "content-type": "text/plain",
                    "X-Riak-Last-Modified": "Wed, 24 Feb 2010 15:25:51 GMT",
                    "X-Riak-Meta": []
                "data": "the third"

Another interesting query is “follow only links that are tagged foo.” For that, just add a tag field to the link phase spec:

$ curl -X POST -H "content-type:application/json" 
  http://localhost:8098/mapred --data @-
{ return [v]; }"}}]}

Here you should get a JSON array with one element: a JSON encoding of the hb/second object. The link to the hb/third object was tagged bar, so that link was not followed. The equivalent URL syntax is:

$ curl http://localhost:8098/riak/hb/first/_,foo,_

It’s also possible to filter links by bucket by adding a bucket field to the link phase spec, or by replacing the first underscore with a bucket name in the URL format. But, all of our example links point to the same bucket, so hb is the only interesting setting here.

Link phases may also be chained together (or put after other phases if those phases produce bucket/key lists). For example, we could follow the links all the way from hb/first to hb/fourth with:

$ curl -X POST -H "content-type:application/json" 
  http://localhost:8098/mapred --data @-
{ return [v]; }"}}]}

(Notice the added link phase.) If you run that, you’ll find that you get two copies of the hb/fourth object in the response. This is because we didn’t bother uniquifying the results of the link extraction, and both hb/second and hb/third link to hb/fourth. A reduce phase is fairly easy to add:

$ curl -X POST -H "content-type:application/json" 
  http://localhost:8098/mapred --data @-
{ return [v]; }"}}]}

The resource handling the URL link-walking format does just this:

$ curl http://localhost:8098/riak/hb/first/_,_,_/_,_,_

That should get you just one copy of the hb/fourth object.

So why choose either map/reduce or URL-syntax? The advantage of URL syntax is that if you’re starting from just one object, and just want to get the objects at the ends of the links, and you can handle multipart/mixed encoding, then URL syntax is much simpler and more compact. Map/reduce with link phases should be your choice if you want to start from multiple objects at once, or you want to get some processed or aggregated form of the objects, or you want the result to be JSON-encoded.

Riak version 0.8 note: In Riak 0.8, the format of the result of ‘link’ map/reduce phases was not able to be transformed into JSON. This meant both that it was not possible to put a Javascript reduce phase right after a link phase, and also that it was not possible to end an HTTP map/reduce query with a link phase. Those issues have been resolved in the tip of the source repository, and will be part of the 0.9 release.