Tag Archives: Riak

Riak Pipe – the New MapReduce Power

September 19, 2011

A few months ago, I announced the opening of Riak Pipe, as well as two goals for the project. With the upcoming 1.0 release of Riak, we have achieved the first goal: new clusters will use Riak Pipe to power their MapReduce processing. Existing clusters will also be able to migrate to using Riak Pipe, with no changes needed from the client perspective.

There are a few reasons you should be excited about running your MapReduce queries on Riak Pipe. First and foremost, Riak Pipe is designed as a work distribution system, and as such, it is better able to take advantage of the parallel resources available in the cluster. One small example of how Riak Pipe achieves this is simply by splitting the “map” phase processing into two steps: fetching the object from Riak KV, and transforming it. This allows the work of each step to happen in parallel; the next input will be fetched while the transformation of the last one is in progress.

Riak Pipe also recognizes that a cluster’s resources are finite, and that sometimes it’s better to delay one pile of work in order to make progress on another. Processing phases in Riak Pipe, called fittings, provide backpressure to fittings upstream from them by means of limiting the sizes of their input queues. The upstream fittings pause their processing when the downstream queues are full, freeing up system resources (or at least not increasing their consumption) to allow those downstream processes a chance to catch up.

Input queues are another example of Riak Pipe’s parallel resource use. Inter-phase results are delivered directly from a vnode running one stage to the vnode that will process them for the next stage. Since they are not forced through a single, central process, the memory of the entire cluster can be used to move them around, instead of requiring a single machine’s memory to handle them.

The KV-object fetching stage of the new Riak Pipe MapReduce system is also much more of a well-behaved KV user. That is, the requests it makes are much more fairly balanced with respect to regular Riak KV operations (get, put, etc.). This means MapReduce on Riak Pipe should have much less impact on the performance of rest of your Riak use.

Using Riak Pipe MapReduce is simple. Make sure that the setting {mapred_system, pipe} is in the riak_kv section of your cluster’s app.config, and then … just send MapReduce queries over HTTP or Protocol Buffers as you always have. The results should be the same. There are a few knobs you can tweak, which control batching of reduce phase evaluation, but the goal of this release was a 100% compatible implementation of the existing MapReduce functionality.

There is much more on the horizon for Riak Pipe, including more efficiency gains and exposing some of the new processing statistics it tracks, not to mention exposing more of its functionality beyond Riak KV’s MapReduce. We’re very excited about the future.

If you would like to learn more about Riak Pipe, in general, and get involved, I recommend paging through the README to get an idea of its structure, and then browsing the new Riak KV MapReduce code for some examples.

-Bryan

Secondary Indexes in Riak

September 14, 2011

Developers building an application on Riak typically have a love/hate relationship with Riak’s simple key/value-based approach to storing data. It’s great that anyone can grok the basics (3 simple operations, get/put/delete) quickly. It’s convenient that you can store anything imaginable as an object’s value: an integer, a blob of JSON data, an image, an MP3. And the distributed, scalable, failure-tolerant properties that a key/value storage model enables can be a lifesaver depending on your use case.

But things get much less rosy when faced with the challenge of representing alternate keys, one-to-many relationships, or many-to-many relationships in Riak. Historically, Riak has shifted these responsibilities to the application developer. The developer is forced to either find a way to fit their data into a key/value model, or to adopt a polyglot storage strategy, maintaining data in one system and relationships in another.

This adds complexity and technical risk, as the developer is burdened with writing additional bookkeeping code and/or learning and maintaining multiple systems.

That’s why we’re so happy about Secondary Indexes. Secondary Indexes are the first step toward solving these challenges, lifting the burden from the backs of developers, and enabling more complex data modeling in Riak. And the best part is that it ships in our 1.0 release, just a few weeks from now.

How Do Secondary Indexes Work?

Update: Secondary Indexes use the new style HTTP API. See the Riak Wiki for more details.

From an application developer’s perspective, Secondary Indexes allow you to tag a Riak object with some index metadata, and later retrieve the object by querying the index, rather than the object’s primary key.

For example, let’s say you want to store a user object, accessible by username, twitter handle, or email address. You might pick the username as the primary key, while indexing the twitter handle and email address. Below is a curl command to accomplish this through the HTTP interface of a local Riak node:

bash
curl -X POST
-H 'x-riak-index-twitter_bin: rustyio'
-H 'x-riak-index-email_bin: rusty@basho.com'
-d '...user data...'

http://localhost:8098/buckets/users/keys/rustyk

Previously, there was no simple way to access an object by anything other than the primary key, the username. The developer would be forced to “roll their own indexes.” With Secondary Indexes enabled, however, you can easily retrieve the data by querying the user’s twitter handle:

Query the twitter handle…

curl localhost:8098/buckets/users/index/twitter_bin/rustyio

Response…

{“keys”:["rustyk"]}

Or the user’s email address:

Query the email address…

curl localhost:8098/buckets/users/index/email_bin/rusty@basho.com

Response…

{“keys”:["rustyk"]}

You can change an object’s indexes by simply writing the object again with the updated index information. For example, to add an index on Github handle:

bash
curl -X POST
-H 'x-riak-index-twitter_bin: rustyio'
-H 'x-riak-index-email_bin: rusty@basho.com'
-H 'x-riak-index-github_bin: rustyio'
-d '...user data...'

http://localhost:8098/buckets/users/keys/rustyk

That’s all there is to it, but that’s enough to represent a variety of different relationships within Riak.

Above is an example of assigning an alternate key to an object. But imagine that instead of a twitter_bin field, our object had an employer_bin field that matched the primary key for an object in our employers bucket. We can now look up users by their employer.

Or imagine a role_bin field that matched the primary key for an object in our security_roles bucket. This allows us to look up all users that are assigned to a specific security role in the system.

Design Decisions

Secondary Indexes maintains Riak’s distributed, scalable, and failure tolerant nature by avoiding the need for a pre-defined schema, which would be shared state. Indexes are declared on a per-object basis, and the index type (binary or integer) is determined by the field’s suffix.

Indexing is real-time and atomic; the results show up in queries immediately after the write operation completes, and all indexing occurs on the partition where the object lives, so the object and its indexes stay in sync. Indexes can be stored and queried via the HTTP interface or the Protocol Buffers interface. Additionally, index results can feed directly into a Map/Reduce operation. And our Enterprise customers will be happy to know that Secondary Indexing plays well with multi data center replication.

Indexes are declared as metadata, rather than an object’s value, in order to preserve Riak’s view that the value of your object is as an opaque document. An object can have an unlimited number of index fields of any size (dependent upon system resources, of course.) We have stress tested with 1,000 index fields, though we expect most applications won’t need nearly that many. Indexes do contribute to the base size of the object, and they also take up their own disk space, but the overhead for each additional index entry is minimal: the vector clock information (and other metadata) is stored in the object, not in the index entry. Additionally, the LevelDB backend (and, likely, most index-capable backends) support prefix-compression, further shrinking ndex size.

This initial release does have some important limitations. Only single index queries are supported, and only for exact matches or range queries. The result order is undefined, and pagination is not supported. While this offers less in the way of ad-hoc querying than other datastores, it is a solid 80% solution that allows us to focus future energy where users and customers need it most. (Trust me, we have many plans and prototypes of potential features. Building something is easy, building the right thing is harder.)

Behind The Scenes

What is happening behind the scenes? A lot, actually.

At write time, the system pulls the index fields from the incoming object, parses and validates the fields, updates the object with the newly parsed fields, and then continues with the write operation. The replicas of the object are sent to virtual nodes where the object and its indexes are persisted to disk.

At query time, the system first calculates what we call a “covering” set of partitions. The system looks at how many replicas of our data are stored and determines the minimum number of partitions that it must examine to retrieve a full set of results, accounting for any offline nodes. By default, Riak is configured to store 3 replicas of all objects, so the system can generate a full result set if it reads from one-third of the system’s partitions, as long as it chooses the right set of partitions. The query is then broadcast to the selected partitions, which read the index data, generate a list of keys, and send them back to the coordinating node.

Storing index data is very different from storing key/value data: in general, any database that stores indexes on a disk would prefer to be able to store the index in a contiguous block and in the desired
order–basically getting as near to the final result set as possible. This minimizes disk movement and other work during a query, and provides faster read operations. The challenge is that index values rarely enter the system in the right order, so the database must do some shuffling at write time. Most databases delay this shuffling, they write to disk in a slightly sub-optimal format, then go back and “fix things up” at a later point in time.

None of Riak’s existing key/value-oriented backends were a good fit for index data; they all focused on fast key/value access. During the development of Secondary Indexes we explored other options. Coincidentally, the Basho team had already begun work to adapt LevelDB–a low-level storage library from Google–as a storage engine for Riak KV. LevelDB stores data in a defined order, exactly what Secondary Indexes needed, and it is actually versatile enough to manage both the index data AND the object’s value. Plus, it is very RAM friendly. You can learn more about LevelDB from this page on Google Code.

Want To Know More?

If you want to learn more about Secondary Indexes, you can read the slides from my talk at OSCON Data 2011: Querying Riak Just Got Easier. Alternatively, you can watch the video.

You can grab a pre-release version of Riak Version 1.0 on the Basho downloads site to try the examples above. Remember to change the storage backend to riak_kv_eleveldb_backend!

Finally keep an eye out for documentation that will land on the newly re-organized Basho Wiki within the next two weeks.

Rusty

A Preview Of Cluster Membership In Riak 1.0

September 9, 2011

Being a distributed company, we make a lot of videos at Basho that are intended for internal consumption and used to educate everyone on new features, functionality, etc. Every once and a while someone makes a video that’s so valuable it’s hard not to share it with the greater community. This is one of those.

This screencast is a bit on the long side, but it’s entirely worth it. Basho Software Engineer Joe Blomstedt put it together to educate all of Basho on the new cluster membership code, features, and functionality coming in the Riak 1.0 release (due out at the end of the month). We aim to make Riak as operationally-simple as possible to operate at scale, and the choices we make and code we write around cluster membership form the crux of this simplicity.

At the end of this you’ll have a better idea of what Riak’s cluster membership is all about, its major components, how it works in production, new commands that are present Riak 1.0, and much, much more.

And, if you want to dig deeper into what Riak and cluster membership is all about, start here:

* Download Riak 1.0 Pre-release 1
* Riak Core on GitHub
* Where To Start With Riak Core
* Join the Riak Mailing List

It should be noted again that this was intended for internal consumption at Basho, so Joe’s tone and language reflect that in a few sections.

Enjoy, and thanks for being a part of Riak.

The Basho Team

Riak at Formspring

August 8, 2011

Several weeks back at the San Francisco Riak Meetup, Tim Bart of Formspring delivered a great talk to a packed house all about how he and his team are using Riak for two new features they were rolling out to their more than 24 million users. I’m happy to report that the video is online, edited, and ready for viewing.

This one runs just over 28 minutes and is an excellent overview of what Formpsring is all about, why they chose Riak, and lessons learned (both good and bad) when developing against Riak. Tim also touches on what plans they have for Riak in the future and, as any thorough user of a given technology should, makes some suggestions for functionality he and his team would like to see in Riak.

A few notes:

  • At just over a minute into the talk, Tim references the “Riak and Scala at Yammer” talk.
  • There were several questions at the end of the talk but there was a problem with the microphone so we had to cut them off. If you have any questions, leave them in the comments and Tim will answer them when he has the opportunity. (Thanks, Tim!)

Enjoy.

Mark

What's New In Riak's Python Client?

August 4, 2011

There’s a whole slew of new and noteworthy features in today’s release of the Python client for Riak, and I thought it’d be a good idea for us to sit down and look at a bunch of them so I can add more detail to what is already in the release notes.

Test Server

Ripple has had an in-memory test server for a while now, and I thought the Python client should have something similar too. By the way, a lot of the features here draw heavy inspiration from Ripple in general, so credit where credit is due.

The basic idea is that instead of using a locally installed Riak instance with file system storage you use one that stores data in memory instead. This is not only faster than storing everything on disk, it makes it much easier to just wipe all the data and start from scratch, without having to restart the service. In short, this is a neat way to integrate Riak into your test suite.

All the test server requires is a local installation to use the libraries from and to steal some files to build a second Riak installation in a temporary directory. Let’s look at an example:

“`python
from riak.test_server import TestServer

server = TestServer()
server.prepare()
server.start()
“`

This will start a Riak instance in the background, with the Python part interacting with it through the Erlang console. That allows you to do things like wiping all data to have a minty fresh and empty Riak installation for the next test run:

python
server.recycle()

The TestServer class has a default of where to look for a Riak installation, but the path could be anywhere you put a Riak build you made from an official release archive. Just point it to that Riak installation’s bin directory, and you’re good to go:

python
server = TestServer(bin_dir="/usr/local/riak/0.14.2/bin")

You can also overwrite the default settings used to generate the app.config file for the in-memory Riak instance. Just specify a keyword pointing to a dictionary for every section in the app.config like so:

python
server = TestServer(riak_core={"web_port": 8080})

By default the test server listens on ports 9000 (HTTP) and 9001 (Protocol buffers), so make sure you adapt your test code accordingly.

Using Riak Search’s Solr-compatible HTTP Interface

One of the nice things about Riak Search is its Solr-compatible HTTP interface. So far, you were only able to use Riak Search through MapReduce. New in release 1.3 of the Python client is support to directly index and query documents using Riak Search’s HTTP interface.

The upside is that you can use Riak Search with a Python app as a scalable full-text search without having to store data in Riak KV for them to be indexed.

The interface is as simple as it is straight forward, we’ve added a new method to the RiakClient class called solr() that returns a small façade object. That in turn allows you to interact with the Solr interface, e.g. to add documents to the index:

python
client.solr().add("superheroes",
{"id": "hulk",
"name": "hulk"
"skill": "Hulksmash!"})

You just specify an index and a document, which must contain a key-value pair for the id, and that’s it.

The beauty about using the Solr interface is that you can use all the available parameters for sorting, limiting result sets and setting default fields to query on, without having to do that with a set of reduce functions.

python
client.solr().query("superheroes", "hulk", df="name")

Be sure to check our documentation for the full set of supported parameters. Just pass in a set of keyword arguments for all valid parameters.

Something else that’s new on the search integration front is the ability to programmatically enable and disable indexing on a bucket by installing or removing the relevant pre-commit hook.

python
bucket = client.bucket("superheroes")
if not bucket.search_enabled():
bucket.enable_search()

Storing Large Files With Luwak

When building Riagi, the application showcased in the recent webinar, I missed Luwak support in the Python client. Luwak is Riak’s way of storing large files, chunked into smaller bits and stored across your Riak cluster. So we added it. The API consists of three simple functions, store_file, get_file, and delete_file.

“`python
client.store_file(“hulk”, “hulk.jpg”)

client.get_file(“hulk”)

client.delete_file(“hulk”)
“`

Connection Caching for Protocol Buffers and HTTP

Thanks to the fine folks at Formspring the Python client now sports easier ways to reuse protocol buffer and even HTTP connections, and to make their use more efficient. All of them are useful if you’re doing lots of requests or want to reuse connections across several requests, e.g. in the context of a single web request.

Here’s a summary of the new transports added in the new release, all of them accept the same parameters as the original transport classes for HTTP and PBC:

  • riak.transports.pbc.RiakPbcCachedTransport
    A cache that reuses a set of protocol buffer connections. You can set a boundary of connections kept in the cache by specifying a maxsize attribute when creating the object.
  • riak.transports.http.RiakHttpReuseTransport
    This transport is more efficient when reusing HTTP connections by setting SO_REUSEADDR on the underlying TCP socket. That allows the TCP stack to reuse connections before the TIME_WAIT state has passed.
  • riak.transports.http.RiakHttpPoolTransport
    Use the urllib3 connection pool to pool connections to the same host.

We’re always looking for contributors to the Python client, so keep those pull requests coming!

Mathias

Follow Up To Riak And Python Webinar

August 3, 2011

Thanks to everyone who attended yesterday’s webinar on Riak and Python. If you missed the webinar, we’ve got you covered. Find a screencast of the webinar below, or check it out directly on Vimeo. Sorry that it’s missing the questions I answered at the end. I recapped the questions in written form below to make up for it.

We made the slides available for your viewing pleasure as well. But most importantly, have a look at our Python library for Riak, which got a lot of feature love lately, and at the source code for Riagi, the sample Django application. It utilizes Riak for session and file storage, and Riak Search storing user and image metadata.

Thanks to dotCloud for providing the hosting for Riagi. A little birdie told us they’re working on supporting Riak as part of their official stack soon.

The Python client for Riak wouldn’t exist without the community, so we want to say thank you for your contributions, and we’re always ready for more pull requests!

Keep an eye out for a new release of the Python client this week, including several of the new features shown in the webinar!

The two questions asked at the end were:

  • Is there a Scala client for Riak? Links relevant to the answer in the video: Outdated Scala Library, a more recent fork, and our official Java client.
  • Is Protocol Buffers more efficient than using HTTP? Answer is detailed in the webinar video.

Basho and Riak Invade OSCON Next Week

July, 22, 2011

This year’s OSCON has lots of chances for you to hear about Riak. And, the Basho team will be presenting some great new Riak features as well. OSCON Data is a special track of the conference, dedicated to all things open and data-oriented – sure to be a hotbed of cool new open source projects and updates on more established platforms like Riak.

Be sure to check out the following sessions:

Querying Riak Just Got Easier – Introducing Secondary Indices

Consistency or Bust: Breaking a Riak Cluster

Also – be sure to stay on the lookout for Basho community manager Mark Phillips – he’s always up to hear about how people are usuing Riak, directing the curious to the right resources – oh, and he always has some cool Riak schwag on hand.

See you there!

Introducing Lager – A New Logging Framework for Erlang/OTP

July 20, 2011

Hi. My name is Andrew Thompson and I’ve been working on Riak at Basho since March. I’ve been focused primary on various technical debt issues within Riak since starting. The largest project I’ve undertaken is Lager, a new logging framework for Erlang/OTP. Lager is actually my second logging framework for Erlang, and I’ve used my previous experience to make this one even better. I think Lager has some compelling features to offer not seen in other erlang logging frameworks and I’ll go over the highlights in this post.

So, why write another log framework for Erlang? There’s already several; error_logger itself, SASL, log4erl and riak_err to name a few. One of the key goals was to make logging friendlier; both to the end-user and to the sysadmin. Lager tries very hard to hide the traditional “giant error tuple of doom” from the user (unless they go looking for it). For example, here’s what happens when a gen_server dies with a badmatch error (with SASL running):

“`text
=ERROR REPORT==== 19-Jul-2011::17:50:10 ===
** Generic server crash terminating
** Last message in was badmatch
** When Server state == {}
** Reason for termination ==
** {{badmatch,{}},
[{crash,handle_call,3},
{gen_server,handle_msg,5},
{proc_lib,init_p_do_apply,3}]}

=CRASH REPORT==== 19-Jul-2011::17:50:10 ===
crasher:
initial call: crash:init/1
pid:
registered_name: crash
exception exit: {{badmatch,{}},
[{crash,handle_call,3},
{gen_server,handle_msg,5},
{proc_lib,init_p_do_apply,3}]}
in function gen_server:terminate/6
ancestors: []
messages: []
links: []
dictionary: []
trap_exit: false
status: running
heap_size: 377
stack_size: 24
reductions: 127
neighbours:
“`

A little scary, no? Conversely, here’s what Lager displays when that happens:

“`text
2011-07-19 17:51:21 [error] gen_server crash terminated with reason: no match of right hand value {} in crash:handle_call/3
2011-07-19 17:51:22 [error] CRASH REPORT Process crash with 0 neighbours crashed with reason: no match of right hand value {} in crash:handle_call/3<
“`

A little more readable, eh? Now, there’s times when all that extra information is useful, and Lager doesn’t throw it away. Instead, it has a “crash log” where those messages go, and you’re free to dig through this file for any additional information you might need. Lager also borrows heavily from riak_err, such that printing large crash messages are safe. (I actually found a bug in riak_err, so Lager is even safer).

Now, those were messages coming out of error_logger, which is fine for legacy or library code, but Lager also has its own logging API that you can use. It’s actually implemented via a parse_transform so that Lager can capture the current module, function, line number and pid for inclusion in the log message. All this is done automatically, and the logging call in the code looks like this:

“`erlang

lager:error(“oh no!”)
lager:warning(“~s, ~s and ~s, oh my!”, [lions, tigers, bears])

“`

Which will be displayed like:

“`text
2011-07-19 18:02:02 [error] @test2:start:8 oh no!
2011-07-19 18:02:02 [warning] @test2:start:9 lions, tigers and bears, oh my!
“`

Note that you can easily see where the error occurred just by glancing at the line. Also notice that you don’t need to stick a newline on the end of the log message. Lager automatically (and happily) does that for you.

Why did I use a parse transform? I was originally going to use the traditional macro approach, capturing ?MODULE and ?LINE but I had a talk with Steve Vinoski, who also has some prior experience with Erlang logging, and he suggested a parse transform. A parse transform is handy in a couple different ways; we can do some compile time calculations, we can capture the current function name and in some ways its more flexible than a macro. Of course, Lager could also easily be extended to provide more traditional logging macros as well.

Now, Lager is architected much like error_logger in that its a gen_event with multiple handlers installed. Right now, there are only two provided: a console handler and a file handler. The file version supports multiple files at different levels, so you can have a log of only errors and above, and a log with informational messages as well. The loglevels are adjustable at runtime so you can turn the logging up/down for a particular backend:

“`erlang

lager:set_loglevel(lager_console_backend, debug)
lager:set_loglevel(lager_file_backend, “error.log”, warning)

“`

The first call would tell the console to display all messages at the debug level and above, and the second tells the file backend to set the “error.log” file to log any messages warning and above. You can of course set the defaults in Riak’s app.config.

Lager keeps track of which backends are consuming which levels and will very efficiently discard messages that would not be logged anywhere (they aren’t even sent to the gen_event). This means that if you have no backends consuming debug messages, you can log a million debug messages in less than half a second; they’re effectively free. Therefore you can add lots of debug messages to your code and not have to worry they’re slowing things down if you’re not looking at them.

Lager also plays well with log rotation. You simply move or delete the log file and Lager will notice and reopen the file or recreate it. This means it will work out of the box with tools like logrotate or newsyslog. It also handles situations like out of disk space or permission errors gracefully and when the situation is resolved it will resume logging.

Some further enhancements I plan to make are:

  • Internal log rotation by size and time
  • Syslog (remote and local) backends
  • Ability to filter messages by module/pid (think enabling debug messages for just a single module instead of globally)

Needless to say, I’m pretty excited about releasing this code. Lager should be merged mainline in Riak sometime this week once the integration work has been reviewed. That means that it will be part of the next major release, as well. Please let me know what you think. As usual, patches or suggestions are welcomed and encouraged.

Andrew

Boosting Riak Search Query Performance With Inline Fields

July 18, 2011

(This was originally posted on Ryan Zezeski’s working blog “Try Try Try)

In this post I want to give a quick overview of inline fields, a recent addition to Riak Search that allows you to trade-off disk space for a considerable performance bump in query execution and throughput. I’m going to assume the reader is already familiar with Search. In the future I may do a Search overview. If you would like that then ping me on twitter.

The Goal

Recently on the Riak Users Mailing List there was a discussion about improving the performance of Search when executing intersection (i.e. AND) queries where one term has a low frequency and the other has a high frequency. This can pose a problem because Search needs to run through all the results on both sides in order to provide the correct result. Therefore, the query is always bounded by the highest frequency term. This is exasperated further by the fact that Search uses a global index, or in other words partitions the index by term. This effectively means that all results for a particular term are pulled sequentially from one node. This is opposed to a local index, or partitioning by document, which effectively allows you to parallelize the query across all nodes. There are trade-offs for either method and I don’t want to discuss them in this blog post. However, it’s good to keep in mind 1. My goal with this post is to show how you can improve the performance of this type of query with the current version of Search 2.

What’s an “Inline” Field, Anyways?

To properly understand inline fields you need to understand the inverted index data structure 3. As a quick refresher the gist is that the index is a map from words to a list of document reference/weight pairs. For each word 4 the index tells you in which documents it occurs and its “weight” in relation to that document, e.g. how many times it occurs. Search adds a little twist to this data structure by allowing an arbitrary list of properties to be tacked onto each of these pairs. For example, Search tracks the position of each occurrence of a term in a document.

Inline fields allow you to take advantage 5 of this fact and store the terms of a field directly in the inverted index entries 6. Going back to my hypothetical query you could mark the field with the frequently occurring term as inline and change the AND query to a query and a filter. A filter is simply an extra argument to the Search API that uses the same syntax as a regular query but makes use of the inline field. This has the potential to drop your latency dramatically as you avoid pulling the massive posting 7 altogether.

WARNING: Inline fields are not free! Think carefully about what I just described and you’ll realize that this list of inline terms will be added to every single posting for that index. If your field contains many terms or you have many inline fields this could become costly in terms of disk space. As always, benchmarking with real hardware on a real production data set is recommended.

The Corpus

I’ll be using a set of ~63K tweets that occurred in reaction to the the devastating earthquake that took place in Haiti during January of 2010. The reason I choose this data-set is because it’s guaranteed to have frequently occurring terms such as “earthquake” but also has low occurring terms 7 such as the time the tweets were created.

The Rig

All benchmarks were run on a 2GHz i7 MBP with an SSD 8. An initial run is performed to prime all systems. Essentially, everything should be coming from FS cache meaning I’ll mostly be testing processing time. My guess is disk I/O would only amplify the results. I’ll be using Basho Bench and running it on the same machine as my cluster. My cluster consists of four Riak nodes (obviously, on the same machine) which I built from master 9.

If you’d like to run the benchmarks on your own hardware please see the RUN_BENCHMARKS.md file.

Naive Query

"text:earthquake"

The naive query asks for every document id 10 that includes the word earthquake. This should return 62805 results every time.

Naive

Scoped Query

"text:earthquake AND created_at:[20100113T032200 TO 20100113T032500]"

The scoped query still searches for all documents with the term earthquake but restricts this set further to only those that were created in the provided three minute time span.

Scoped

Scoped Query With Filtering

"created_at:[20100113T032200 TO 20100113T032500]" "text:earthquake"

This is the same as the scoped query except earthquake is now a filter, not a query. Notice, unlike the previous two queries, there are two strings. The first is the query the second is the filter. You could read that in English as:

Execute the query to find all tweets created in this three minute range. Then filter that set using the inline field “text” where it contains the term “earthquake.”

Scoped & Filter

Wait One Second!

Just before I was about to consider this post wrapped up I realized my comparison of inline vs. non-inline wasn’t quite fair. As currently implemented, when returning postings the inline field’s value is included. I’m not sure if this is of any practical use outside the filtering mechanism but this means that in the case of the naive and scoped queries the cluster is taking an additional disk and network hit by carrying all that extra baggage. A more fair comparison would be to run the naive and scoped queries with no inline fields. I adjusted my scripts and did just that.

Naive With No Inlining

Naive No Inline

Scoped With No Inlining

Scoped No Inline

Conclusions

In this first table I summarize the absolute values for throughput, 99.9th percentile and average latencies.

Stat Naive (I) Naive Scoped (I) Scoped Scoped Filter
Thru (op/s) 2.5 3.5 3 5 15
99.9% (ms) 875 490 575 350 42
Avg (ms) 800 440 530 310 25

In this benchmark I don’t care so much about the absolute numbers as I do how they relate to each other. In the following table I show the performance increase of using the scoped filter query versus the other queries. For example, the scoped filter query has three times the throughput and returns in 1/12th of the time, on average, as compared to the scoped query. That is, even its closest competitor has a latency profile that is an order of magnitude worse. You may find it odd that I included the naive queries in this comparison but I wanted to show just how great the difference can be when you don’t limit your result set. Making a similar table comparing naive vs. scoped might be useful as well but I leave it as an exercise to the reader.

Stat Naive (I) Naive Scoped (I) Scoped
Thru 6x 4x 5x 3x
99.9% 20x 11x 13x 8x
Avg 32x 17x 21x 12x

In conclusion I’ve done a drive-by benchmark showing that there are potentially great gains to be had by making use of inline fields. I say “potentially” because inline fields are not free and you should take the time to understand your data-set and analyze what trade-offs you might be making by using this feature. In my example I’m inlining the text field of a twitter stream so it would be useful to gather some statistics such as what are the average number of terms per tweet and what is the average size of each term? Armed with that info you then might determine how many tweets you plan to store, how many results a typical query will match and how much extra I/O overhead that inline field is going to add. Finally, run your own benchmarks on your own hardware with real data while profiling your system’s I/O, CPU, and memory usage. Doing anything else is just pissing in the wind.

Ryan

References

1: If you’d like to know more you could start by reading Distributed Query Processing Using Partitioned Inverted Files.

2: Inline fields were added in 14.2, but my benchmarks were run against master.

3: I like the introduction in Effect of Inverted Index Partitioning Schemes on Performance of Query Processing in Parallel Text Retrieval Systems.

4: In search parlance a word is called a term and the entire list of terms is called the vocabulary.

5: Or abuse, depending on your disposition.

6: Entries in an inverted index are also called postings by some people.

7: Or high cardinality, depending on how you want to look at it.

8: Just like when dynoing a car it’s constant conditions and relative improvement that matter. Once you’re out of the shop those absolute numbers don’t mean much.

9: The exact commit is 3cd22741bed9b198dc52e4ddda43579266a85017.

10: BTW, in this case “document” is a Riak object indexed by the Search Pre-commit hook.

An Overview Of The All New Riak Java Client

July 14, 2011

Hi. My name is Russell Brown and since March, I’ve been working on the Riak Java Client (making me the lone Java developer in an Erlang shop). This past week I merged a large, backwards-compatible branch with some enhancements and long-awaited fixes and refinements. In this post I want to introduce the changes I’ve made and the motivations behind them. At Basho we firmly believe that Riak’s Java interface is on track to be the among the best there is for Java developers who need a rock solid, production-ready database, so it’s time you get to know it if you don’t already.

First, Some History

When Riak was first released, it was only equipped with an HTTP API, so it followed that the Java client was a REST client. Later a Protocol Buffers Interface was added to Riak and Kresten Krab-Thorup and the team at Trifork contributed a Protocol Buffer’s interface for the Java library. Later still, around version 0.14, the Trifork PB Client was merged into the official Basho Riak Java Client. With this added interface, however, came a problem: both clients work well but they don’t share any interfaces or types. I started working for Basho in March 2011, my first task was to fix any issues with the existing clients and refactor them to a common, idiomatic interface. Some way into that task I was exposed to the rather brilliant Riak and Scala at Yammer talk given by Coda Hale and Ryan Kennedy at a Riak Meetup in San Francisco. This opened my eyes, and I’m very thankful to Coda and Ryan for sharing their expert understandings so freely. If you meet either of these two gentlemen, I urge you to buy them drinks.

A Common Interface

Having a common interface should be a no-brainer. Developers shouldn’t have to chose upfront about a low-level transport and then have all their subsequent code shaped by that choice. To that end, I added a RawClient interface to the library that describes the set of operations you can perform with Riak. I also adapted each of the original clients to this interface. If all you want to do is pump data in, or pull raw data out of Riak, the PB RawClient adapter is for you. There are some figures on the Riak Wiki that show it’s quite snappy. If you need to write a non-blocking client, or simply have to use the Jetty HTTP library, implementing this interface is the way to go.

There is some bad news here: I had to deprecate a lot of the original client and move that code to new packages. This will look a tad ugly in your IDE for a release or two, but it is better to make the changes than be stuck with odd packages for ever. There will be a code cull of the deprecated classes before the client goes v1.0.

The next task on the list for this raw package is to move the interfaces into a separate core project/package to avoid any circular dependency issues that will arise if you create your own RawClient implementation.The RawClient solves the common/idiomatic interface problem, but it doesn’t solve the main new challenge that an eventually consistent, fault-tolerant datastore brings to the client: siblings.

Sibling Values

Before we move on, if you have the time please take a moment to read the excellent Vector Clocks page on the Riak wiki (but make sure you come back). Thanks to Vector Clocks Riak does all that it can to save you from dealing with conflicting values, but this doesn’t guarantee they won’t occur. The RawClient presents you with a Vector Clock and an array of sibling values, and you need to create a single, correct value to work with (and even write back to Riak as the one true value.) The new, higher-level client API in the Java Client makes this easier.

Conflict Resolution

Conflict resolution is going to depend on your domain. Your data is opaque to Riak, which is why conflict resolution is a read time problem for the client. The canonical example (from the Dynamo Paper) is a shopping cart. If you have sibling shopping carts you can merge them (with a set-union operation, for example) to get a single cart with the values from all carts present. (Yes, you can re-instate a removed item, but that is far better than losing items. Ask Amazon.) Keep the idea of a shopping cart fresh in your mind for the remainder of this post as it figures in some of the examples I’ve used.

A Few Words On Domain Conversion

You use a Bucket to get key/values pairs from Riak.

Bucket b = client.createBucket(bucketName)
    .nVal(1)
    .allowSiblings(true)
    .execute();

IRiakObject fetched = b.fetch("k").execute();
b.store("k", "my new value").execute();
b.delete("k").execute();

The Bucket is a factory for RiakOperations, and a Riak Operation is a fluent builder that, when executed, calls out out to Riak. “Fetch” and “Store” Riak Operations accept a Converter and ConflictResolution implementation from you so that the data Riak returns can be deserialised into a domain object and any siblings can be resolved. The library provides a Jackson-based JSONConverter that will convert the JSON payload of a Riak data item into an instance of some domain class; think of it as a bit like an ORM (but maybe without the “R”).

final Bucket carts = client.createBucket(bucketName).allowSiblings(true).execute();

final ShoppingCart cart = new ShoppingCart(userId);

cart.addItem("fixie");
cart.addItem("moleskine");

carts.store(cart).returnBody(false).retrier(DefaultRetrier.attempts(2)).execute();

Adding your own converters is trivial and I plan to provide a Jackson XML based one soon. Look at this test for a complete example.

Conflict Resolver

Once the data is marshalled into domain instances, your logic is run to resolve any conflicts. A trivial shopping cart example is provided in the tests here. The ConflictResolver interface has a single method that takes an array of domain instances and returns a single, resolved value.

T resolve(final Collection<T> siblings) throws UnresolvedConflictException;

It throws the checked UnresolvedConflictException if you need to bail out. Your code can catch this and make the siblings available to a user (for example) for resolution as a last resort. I am considering making this a runtime exception, and would like to hear what you think about that.

Mutation

To talk about mutation I’m going to stick with the shopping cart example. Imagine you’re creating a new cart for a visiting shopper. You create a ShoppingCart instance, add the toaster add the flambe set, and persist it. Meanwhile a network partition occurred and your user already added a steak knife set to a different cart. You’re not really creating a new value, but you weren’t to know. If you save this value you have a conflict to be resolved at a later date. Instead, the high level client executes a store operation as a fetch, convert, resolve siblings, apply a mutation and then store. In the case of the shopping cart that mutation would again be to merge the values of your new ShoppingCart with the resolved value fetched from Riak.

You provide an implementation of Mutation to any store operation. You never really know if you are creating a new value or updating an old one, so it is safer to model your write as a mutation to an existing value that results in a new value. This can be as simple as incrementing a number or adding the items in your Cart to the fetched Cart.

By default the library provides a ClobberMutator (it ignores the old value and overwrites it with a new one) but this is simply a default behaviour and not the best in most situations. It is better to provide your own Mutation implementation on a store operation. If you can model your values as logically monotonic or as transformations to existing values, then creating mutation implementations is a lot simpler.

Noise

As your project matures, you will firm up your ConflictResolvers, Mutations, and Converters into concrete classes, and at this point adding them for each operation is a lot more typing and code noise than you need (especially if you were using anonymous classes for your Mutation/ConflictResolver/Converter).

bucket.store(o)
    .withConverter(converter)
    .withMutator(mutation)
    .withResolver(resolver)
    .r(r)
    .w(w)
    .dw(dw)
    .retrier(retrier)
    .returnBody(false)
.execute();

The library provides the DomainBucket class as a wrapper around the Bucket. DomainBuckets are constructed with a ConflictResolver, Mutation, and Converter and thereafter use those implementations for each operation. DomainBuckets are a convenient way to get a strongly typed view of a Bucket and only store/fetch values of that type. They are a touch of sugar that reduce noise and I advise you use them once your domain is established. This test illustrates the usage.

The Next Steps

That’s about it. There is a Retrier interface and a default try-3-times-with-a-short-wait implementation (if the database is fault-tolerant,the client should be too, right?) but I’m going to push that down the stack to the RawClient layer so we can add cluster awareness to the client (with load balancing and all that good stuff).

I haven’t covered querying (MapReduce and Link Walking) but I plan to in the next post (“Why Map/Reduce is easy with Java”, maybe?). I can say that is one aspect that has hardly changed from the original client. The original versions used a fluent builder and so does this client. The main difference is the common API and the ability to convert M/R results into Java Collections or domain specific objects (again, thanks to Jackson). Please read the README on the repo for details and the integration tests for examples.

At the moment the code is in the master branch on GitHub. If you get the chance to work with it I’d love to hear your feedback. The Riak Mailing List is the best place to make your feelings and opinions known. There are a few wrinkles to iron out before the next release of the Java Client, and your input will shape the future direction of this code so please, don’t be shy. We are on the lookout for contributors…

And go download Riak if you haven’t already.

Russell