Tag Archives: Riak

Boosting Riak Search Query Performance With Inline Fields

July 18, 2011

(This was originally posted on Ryan Zezeski’s working blog “Try Try Try)

In this post I want to give a quick overview of inline fields, a recent addition to Riak Search that allows you to trade-off disk space for a considerable performance bump in query execution and throughput. I’m going to assume the reader is already familiar with Search. In the future I may do a Search overview. If you would like that then ping me on twitter.

The Goal

Recently on the Riak Users Mailing List there was a discussion about improving the performance of Search when executing intersection (i.e. AND) queries where one term has a low frequency and the other has a high frequency. This can pose a problem because Search needs to run through all the results on both sides in order to provide the correct result. Therefore, the query is always bounded by the highest frequency term. This is exasperated further by the fact that Search uses a global index, or in other words partitions the index by term. This effectively means that all results for a particular term are pulled sequentially from one node. This is opposed to a local index, or partitioning by document, which effectively allows you to parallelize the query across all nodes. There are trade-offs for either method and I don’t want to discuss them in this blog post. However, it’s good to keep in mind 1. My goal with this post is to show how you can improve the performance of this type of query with the current version of Search 2.

What’s an “Inline” Field, Anyways?

To properly understand inline fields you need to understand the inverted index data structure 3. As a quick refresher the gist is that the index is a map from words to a list of document reference/weight pairs. For each word 4 the index tells you in which documents it occurs and its “weight” in relation to that document, e.g. how many times it occurs. Search adds a little twist to this data structure by allowing an arbitrary list of properties to be tacked onto each of these pairs. For example, Search tracks the position of each occurrence of a term in a document.

Inline fields allow you to take advantage 5 of this fact and store the terms of a field directly in the inverted index entries 6. Going back to my hypothetical query you could mark the field with the frequently occurring term as inline and change the AND query to a query and a filter. A filter is simply an extra argument to the Search API that uses the same syntax as a regular query but makes use of the inline field. This has the potential to drop your latency dramatically as you avoid pulling the massive posting 7 altogether.

WARNING: Inline fields are not free! Think carefully about what I just described and you’ll realize that this list of inline terms will be added to every single posting for that index. If your field contains many terms or you have many inline fields this could become costly in terms of disk space. As always, benchmarking with real hardware on a real production data set is recommended.

The Corpus

I’ll be using a set of ~63K tweets that occurred in reaction to the the devastating earthquake that took place in Haiti during January of 2010. The reason I choose this data-set is because it’s guaranteed to have frequently occurring terms such as “earthquake” but also has low occurring terms 7 such as the time the tweets were created.

The Rig

All benchmarks were run on a 2GHz i7 MBP with an SSD 8. An initial run is performed to prime all systems. Essentially, everything should be coming from FS cache meaning I’ll mostly be testing processing time. My guess is disk I/O would only amplify the results. I’ll be using Basho Bench and running it on the same machine as my cluster. My cluster consists of four Riak nodes (obviously, on the same machine) which I built from master 9.

If you’d like to run the benchmarks on your own hardware please see the RUN_BENCHMARKS.md file.

Naive Query

"text:earthquake"

The naive query asks for every document id 10 that includes the word earthquake. This should return 62805 results every time.

Naive

Scoped Query

"text:earthquake AND created_at:[20100113T032200 TO 20100113T032500]"

The scoped query still searches for all documents with the term earthquake but restricts this set further to only those that were created in the provided three minute time span.

Scoped

Scoped Query With Filtering

"created_at:[20100113T032200 TO 20100113T032500]" "text:earthquake"

This is the same as the scoped query except earthquake is now a filter, not a query. Notice, unlike the previous two queries, there are two strings. The first is the query the second is the filter. You could read that in English as:

Execute the query to find all tweets created in this three minute range. Then filter that set using the inline field “text” where it contains the term “earthquake.”

Scoped & Filter

Wait One Second!

Just before I was about to consider this post wrapped up I realized my comparison of inline vs. non-inline wasn’t quite fair. As currently implemented, when returning postings the inline field’s value is included. I’m not sure if this is of any practical use outside the filtering mechanism but this means that in the case of the naive and scoped queries the cluster is taking an additional disk and network hit by carrying all that extra baggage. A more fair comparison would be to run the naive and scoped queries with no inline fields. I adjusted my scripts and did just that.

Naive With No Inlining

Naive No Inline

Scoped With No Inlining

Scoped No Inline

Conclusions

In this first table I summarize the absolute values for throughput, 99.9th percentile and average latencies.

Stat Naive (I) Naive Scoped (I) Scoped Scoped Filter
Thru (op/s) 2.5 3.5 3 5 15
99.9% (ms) 875 490 575 350 42
Avg (ms) 800 440 530 310 25

In this benchmark I don’t care so much about the absolute numbers as I do how they relate to each other. In the following table I show the performance increase of using the scoped filter query versus the other queries. For example, the scoped filter query has three times the throughput and returns in 1/12th of the time, on average, as compared to the scoped query. That is, even its closest competitor has a latency profile that is an order of magnitude worse. You may find it odd that I included the naive queries in this comparison but I wanted to show just how great the difference can be when you don’t limit your result set. Making a similar table comparing naive vs. scoped might be useful as well but I leave it as an exercise to the reader.

Stat Naive (I) Naive Scoped (I) Scoped
Thru 6x 4x 5x 3x
99.9% 20x 11x 13x 8x
Avg 32x 17x 21x 12x

In conclusion I’ve done a drive-by benchmark showing that there are potentially great gains to be had by making use of inline fields. I say “potentially” because inline fields are not free and you should take the time to understand your data-set and analyze what trade-offs you might be making by using this feature. In my example I’m inlining the text field of a twitter stream so it would be useful to gather some statistics such as what are the average number of terms per tweet and what is the average size of each term? Armed with that info you then might determine how many tweets you plan to store, how many results a typical query will match and how much extra I/O overhead that inline field is going to add. Finally, run your own benchmarks on your own hardware with real data while profiling your system’s I/O, CPU, and memory usage. Doing anything else is just pissing in the wind.

Ryan

References

1: If you’d like to know more you could start by reading Distributed Query Processing Using Partitioned Inverted Files.

2: Inline fields were added in 14.2, but my benchmarks were run against master.

3: I like the introduction in Effect of Inverted Index Partitioning Schemes on Performance of Query Processing in Parallel Text Retrieval Systems.

4: In search parlance a word is called a term and the entire list of terms is called the vocabulary.

5: Or abuse, depending on your disposition.

6: Entries in an inverted index are also called postings by some people.

7: Or high cardinality, depending on how you want to look at it.

8: Just like when dynoing a car it’s constant conditions and relative improvement that matter. Once you’re out of the shop those absolute numbers don’t mean much.

9: The exact commit is 3cd22741bed9b198dc52e4ddda43579266a85017.

10: BTW, in this case “document” is a Riak object indexed by the Search Pre-commit hook.

An Overview Of The All New Riak Java Client

July 14, 2011

Hi. My name is Russell Brown and since March, I’ve been working on the Riak Java Client (making me the lone Java developer in an Erlang shop). This past week I merged a large, backwards-compatible branch with some enhancements and long-awaited fixes and refinements. In this post I want to introduce the changes I’ve made and the motivations behind them. At Basho we firmly believe that Riak’s Java interface is on track to be the among the best there is for Java developers who need a rock solid, production-ready database, so it’s time you get to know it if you don’t already.

First, Some History

When Riak was first released, it was only equipped with an HTTP API, so it followed that the Java client was a REST client. Later a Protocol Buffers Interface was added to Riak and Kresten Krab-Thorup and the team at Trifork contributed a Protocol Buffer’s interface for the Java library. Later still, around version 0.14, the Trifork PB Client was merged into the official Basho Riak Java Client. With this added interface, however, came a problem: both clients work well but they don’t share any interfaces or types. I started working for Basho in March 2011, my first task was to fix any issues with the existing clients and refactor them to a common, idiomatic interface. Some way into that task I was exposed to the rather brilliant Riak and Scala at Yammer talk given by Coda Hale and Ryan Kennedy at a Riak Meetup in San Francisco. This opened my eyes, and I’m very thankful to Coda and Ryan for sharing their expert understandings so freely. If you meet either of these two gentlemen, I urge you to buy them drinks.

A Common Interface

Having a common interface should be a no-brainer. Developers shouldn’t have to chose upfront about a low-level transport and then have all their subsequent code shaped by that choice. To that end, I added a RawClient interface to the library that describes the set of operations you can perform with Riak. I also adapted each of the original clients to this interface. If all you want to do is pump data in, or pull raw data out of Riak, the PB RawClient adapter is for you. There are some figures on the Riak Wiki that show it’s quite snappy. If you need to write a non-blocking client, or simply have to use the Jetty HTTP library, implementing this interface is the way to go.

There is some bad news here: I had to deprecate a lot of the original client and move that code to new packages. This will look a tad ugly in your IDE for a release or two, but it is better to make the changes than be stuck with odd packages for ever. There will be a code cull of the deprecated classes before the client goes v1.0.

The next task on the list for this raw package is to move the interfaces into a separate core project/package to avoid any circular dependency issues that will arise if you create your own RawClient implementation.The RawClient solves the common/idiomatic interface problem, but it doesn’t solve the main new challenge that an eventually consistent, fault-tolerant datastore brings to the client: siblings.

Sibling Values

Before we move on, if you have the time please take a moment to read the excellent Vector Clocks page on the Riak wiki (but make sure you come back). Thanks to Vector Clocks Riak does all that it can to save you from dealing with conflicting values, but this doesn’t guarantee they won’t occur. The RawClient presents you with a Vector Clock and an array of sibling values, and you need to create a single, correct value to work with (and even write back to Riak as the one true value.) The new, higher-level client API in the Java Client makes this easier.

Conflict Resolution

Conflict resolution is going to depend on your domain. Your data is opaque to Riak, which is why conflict resolution is a read time problem for the client. The canonical example (from the Dynamo Paper) is a shopping cart. If you have sibling shopping carts you can merge them (with a set-union operation, for example) to get a single cart with the values from all carts present. (Yes, you can re-instate a removed item, but that is far better than losing items. Ask Amazon.) Keep the idea of a shopping cart fresh in your mind for the remainder of this post as it figures in some of the examples I’ve used.

A Few Words On Domain Conversion

You use a Bucket to get key/values pairs from Riak.

Bucket b = client.createBucket(bucketName)
    .nVal(1)
    .allowSiblings(true)
    .execute();

IRiakObject fetched = b.fetch("k").execute();
b.store("k", "my new value").execute();
b.delete("k").execute();

The Bucket is a factory for RiakOperations, and a Riak Operation is a fluent builder that, when executed, calls out out to Riak. “Fetch” and “Store” Riak Operations accept a Converter and ConflictResolution implementation from you so that the data Riak returns can be deserialised into a domain object and any siblings can be resolved. The library provides a Jackson-based JSONConverter that will convert the JSON payload of a Riak data item into an instance of some domain class; think of it as a bit like an ORM (but maybe without the “R”).

final Bucket carts = client.createBucket(bucketName).allowSiblings(true).execute();

final ShoppingCart cart = new ShoppingCart(userId);

cart.addItem("fixie");
cart.addItem("moleskine");

carts.store(cart).returnBody(false).retrier(DefaultRetrier.attempts(2)).execute();

Adding your own converters is trivial and I plan to provide a Jackson XML based one soon. Look at this test for a complete example.

Conflict Resolver

Once the data is marshalled into domain instances, your logic is run to resolve any conflicts. A trivial shopping cart example is provided in the tests here. The ConflictResolver interface has a single method that takes an array of domain instances and returns a single, resolved value.

T resolve(final Collection<T> siblings) throws UnresolvedConflictException;

It throws the checked UnresolvedConflictException if you need to bail out. Your code can catch this and make the siblings available to a user (for example) for resolution as a last resort. I am considering making this a runtime exception, and would like to hear what you think about that.

Mutation

To talk about mutation I’m going to stick with the shopping cart example. Imagine you’re creating a new cart for a visiting shopper. You create a ShoppingCart instance, add the toaster add the flambe set, and persist it. Meanwhile a network partition occurred and your user already added a steak knife set to a different cart. You’re not really creating a new value, but you weren’t to know. If you save this value you have a conflict to be resolved at a later date. Instead, the high level client executes a store operation as a fetch, convert, resolve siblings, apply a mutation and then store. In the case of the shopping cart that mutation would again be to merge the values of your new ShoppingCart with the resolved value fetched from Riak.

You provide an implementation of Mutation to any store operation. You never really know if you are creating a new value or updating an old one, so it is safer to model your write as a mutation to an existing value that results in a new value. This can be as simple as incrementing a number or adding the items in your Cart to the fetched Cart.

By default the library provides a ClobberMutator (it ignores the old value and overwrites it with a new one) but this is simply a default behaviour and not the best in most situations. It is better to provide your own Mutation implementation on a store operation. If you can model your values as logically monotonic or as transformations to existing values, then creating mutation implementations is a lot simpler.

Noise

As your project matures, you will firm up your ConflictResolvers, Mutations, and Converters into concrete classes, and at this point adding them for each operation is a lot more typing and code noise than you need (especially if you were using anonymous classes for your Mutation/ConflictResolver/Converter).

bucket.store(o)
    .withConverter(converter)
    .withMutator(mutation)
    .withResolver(resolver)
    .r(r)
    .w(w)
    .dw(dw)
    .retrier(retrier)
    .returnBody(false)
.execute();

The library provides the DomainBucket class as a wrapper around the Bucket. DomainBuckets are constructed with a ConflictResolver, Mutation, and Converter and thereafter use those implementations for each operation. DomainBuckets are a convenient way to get a strongly typed view of a Bucket and only store/fetch values of that type. They are a touch of sugar that reduce noise and I advise you use them once your domain is established. This test illustrates the usage.

The Next Steps

That’s about it. There is a Retrier interface and a default try-3-times-with-a-short-wait implementation (if the database is fault-tolerant,the client should be too, right?) but I’m going to push that down the stack to the RawClient layer so we can add cluster awareness to the client (with load balancing and all that good stuff).

I haven’t covered querying (MapReduce and Link Walking) but I plan to in the next post (“Why Map/Reduce is easy with Java”, maybe?). I can say that is one aspect that has hardly changed from the original client. The original versions used a fluent builder and so does this client. The main difference is the common API and the ability to convert M/R results into Java Collections or domain specific objects (again, thanks to Jackson). Please read the README on the repo for details and the integration tests for examples.

At the moment the code is in the master branch on GitHub. If you get the chance to work with it I’d love to hear your feedback. The Riak Mailing List is the best place to make your feelings and opinions known. There are a few wrinkles to iron out before the next release of the Java Client, and your input will shape the future direction of this code so please, don’t be shy. We are on the lookout for contributors…

And go download Riak if you haven’t already.

Russell

Leveling the Field

July 1, 2011

For most Riak users, Bitcask is the obvious right storage engine to use. It provides low latency, solid predictability, is robust in the face of crashes, and is friendly from a filesystem backup point of view. However, it has one notable limitation: total RAM use depends linearly (though via a small constant) on the total number of objects stored. For this reason, Riak users that need to store billions of entries per machine sometimes use Innostore, (our wrapper around embedded InnoDB) as their storage engine instead. InnoDB is a robust and well-known storage engine, and uses a more traditional design than Bitcask which allows it to tolerate a higher maximum number of items stored on a given host.

However, there are a number of reasons that people may wish for something other than Innostore when they find that they are in this situation. It is less comfortable to back up than bitcask, imposes a higher minimum overhead on disk space, only performs well when both heavily tuned (and given multiple spindles), and comes with a more restrictive license. For all of these reasons we have been paying close attention to LevelDB, which was recently released by Google. LevelDB’s storage architecture is more like BigTable’s memtable/sstable model than it is like either Bitcask or InnoDB. This design and implementation brings the possibility of a storage engine without Bitcask’s RAM limitation and also without any of the above drawbacks of InnoDB. Our early hypothesis after reading the text and code was that LevelDB might fill an InnoDB-like role for Riak users, without some of the downsides. As some of the early bugs in LevelDB were fixed and stability improved, our hopes rose further.

In order to begin testing this possibility, we have begun to perform some simple performance comparisons between LevelDB and InnoDB using basho_bench and a few different usage patterns. All of these comparisons were performed on the exact same machine, a fairly basic 2-CPU Linux server with 4G of RAM, mid-range SATA disks, and so on — a fairly typical commodity system. Note that this set of tests are not intended to provide useful absolute numbers for either database, but rather to allow some preliminary comparisons between the two. We tried to be as fair as possible. For instance, InnoDB was given an independent disk for its journaling.

The first comparison was a sequential load into an empty database. We inserted one hundred million items with numerically-sorted keys, using fairly small values of 100 bytes per item.

The database created by this insert test was used as the starting point for all subsequent tests. Each subsequent test was run in steady-state for one hour on each of the two databases. Longer runs will be important for us to gauge stability, but an hour per test seemed like a useful starting point.

For the second comparison, we did a read-only scenario with a pareto distribution. This means that a minority of the items in the database would see the vast majority of requests, which means that there will be relatively high churn but also a higher percentage of cache hits in a typical system.

The third comparison used exactly the same pareto pattern for key distribution, but instead of being pure reads it was a 90/10 read/write ratio.

The fourth comparison was intended to see how the two systems compared to each other in a very-high-churn setting. It used the same dataset, but write-only, and in an extremely narrow pareto distribution such that nearly all writes would be within a narrow set of keys, causing a relatively small number of items to be overwritten many times.

In each of these tests, LevelDB showed a higher throughput than InnoDB and a similar or lower latency than InnoDB. Our goal in this initial round was to explore basic feasibility, and that has now been established.

This exercise has not been an attempt to provide comprehensive general-purpose benchmarks for either of these two storage engines. A number of choices made do not represent any particular generic usage pattern but were instead made to quickly put the systems under stress and to minimize the number of variables being considered. There are certainly many scenarios where either of these two storage systems can certainly be made to perform differently (sometimes much better) than they did here. In some earlier tests, we saw InnoDB provide a narrower variance of latency (such as lower values in the 99th percentile) but we have not seen that reproduced in this set of tests. Among the other things not done in this quick set of tests: using the storage engines through Riak, deeply examining their I/O behavior, observing their stability over very long periods of time, comparing their response to different concurrency patterns, or comparing them to a wider range of embedded storage engines. All of these directions (and more) are good ideas for continued work in the future, and we will certainly do some of that.

Despite everything we haven’t yet done, this early work has validated one early hope and hypothesis. It appears that LevelDB may become a preferred choice for Riak users whose data set has massive numbers of keys and therefore is a poor match with Bitcask’s model. Performance aside, it compares favorably to InnoDB on other issues such as permissive license and operational usability. We are now going ahead with the work and continued testing needed to keep exploring this hypothesis and to improve both Riak and LevelDB in order to make their combined use an option for our customers and open source community.

Some issues still remain that we are working to resolve before LevelDB can be a first-class storage engine under Riak. One such issue that we are working on (with the LevelDB maintainers at Google) is making the LevelDB code portable to all of the same platforms that Riak is supported on. We are confident that these issues will be resolved in the very near future. Accordingly, we are moving ahead with a backend enabling Riak’s use of LevelDB.

Justin

Building and Maintaining Community and Culture at a Distributed Company

June 27, 2011

This was originally posted on themarkphillips.com. Please use the original post for all comments.

When someone asks me, “Where is Basho located?”, I usually respond with something along the lines of: “Much like Riak, we are completely distributed.” Some three years ago our team was all working out of Cambridge, MA (which is still our headquarters). Slowly but surely the team grew in size, but it quickly became apparent that requiring all employees to work in the same geographic location would result in us missing out on some talented and downright brilliant people. So we resolved to “hire where the talent is.”

As it stands right now we have physical offices in Cambridge, MA and San Francisco. The team, however, is now completely distributed; in addition to Cambridge and San Francisco (and several other CA cities), we have people in Oregon, Oklahoma (various locations), Florida, Colorado (various locations), New Jersey, North Carolina, Minnesota, Virginia (various locations), Maryland (various locations), Idaho, New York, Germany, and the UK. The latest tally put our entire team at just over thirty people.

Hiring where the talent is means we don’t sacrifice great hires for location, but it also presents various hurdles when attempting to build culture and community. Anyone who works at a startup or as part of a small team can speak to the importance of culture. It’s crucial that distributed employees feel as though they are part of a tight-knit crew. If you show up every day and your engagement with your coworkers doesn’t go much beyond a few passing phrases in a chat client, you should be doing more. The leadership at Basho made it clear many moons ago that we were going to work hard to build culture and community. Just because you’re committing code 1000 miles from your nearest colleague doesn’t mean you need to feel like they are 1000 miles away.

I spend most of my time pursuing ways to strengthen and extend the various external communities that are growing out Basho’s open source software, but I thought it might be useful to examine what we do internally to build community and culture. As should be apparent, we’re not doing anything too crazy or innovative with the ways we connect and collaborate across states and countries. But it’s the little things that matter when culture is concerned at a distributed company, and I think we do a lot of the little things well.

Real-Time Chat

For as long as I can remember, Basho has used Jabber for real-time chat collaboration. This is where we spend most of our time conversing, and the entire company idles in one room we call “bashochat.” At any given time you can find any number of conversations happening concurrently; several developers might be chasing down a finicky bug while several others are discussing the merits of the latest cat meme. Hundreds (if not thousands) of messages fly through here daily. At times it can get a bit distracting, so signing off to focus is encouraged and done often. We also just started logging bashochat to make sure that those who are out for the day or signed off to chase a corner case can stay in the loop.

In addition to Jabber, the Client Services Team also uses Campfire as their chat software of choice (for various reasons). There’s certainly no reason why multiple chat programs can’t co-exist under the same corporate umbrella. Basho is flexible, and if it works for your team, go with it.

Skype

Interacting via Skype serves as a great compliment to what happens in Jabber (even if Skype itself offers less than five nines of uptime). Everyone uses Skype at least once daily for our morning status call. We are still small enough where getting the majority of the company on the phone for a 10 minute status call is feasible, so we do it. Topics range from “What’s the current status of bug fix X for customer Y?” to “Did you get any questions at yesterday’s meetup talk that you couldn’t answer?” Video chats are also invaluable, and jumping on Skype to speak “face-to-face” for even five minutes is incredibly worthwhile and serves to reinforce the team feel (especially when a new hire is coming aboard).

Yammer

Yammer is a great piece of software, and it recently worked its way into our suite of collaboration tools. When it was first introduced to our team (around the beginning of this year) I was a bit skeptical of how well it was going to work for us. We already use Jabber quite heavily. How would the two co-exist? Since then Yammer has become the home for low-volume, high quality messages that deserve more than just a passing glance or ephemeral response. In other words, the signal to noise ratio in Yammer is much higher; links to blog posts about Riak (or our competition), results of a long running benchmark (complete with graphs), or links to a new GitHub repo are all typical of what appears on Yammer. That said, the message volume has been growing steadily over the past months, and I’m curious and interested to see how this tool evolves for us.

Quarterly Meetups

At some point you have to actually meet and physically interact with your colleagues. To this end, we’ve been doing quarterly developer meetups for about six quarters now. These are 3-5 day gatherings of the entire team where it’s business as usual, with the exception of some team building activities scattered throughout the week. Lots of amazing ideas and and moments are born at these meetups, and we all look forward to them.

Current Status

Basho is firing on all cylinders right now (fixing more bugs, writing more features, closing more deals, resolving more tickets, etc.), and I believe that our dedication to building a distributed culture and community internally has had a lot to do with it. Though Basho’s “system” is still a work in progress, in my opinion we’ve managed to build a strong internal community and culture that lends itself to heightened levels of productivity and overall happiness. We are still relatively small (right around 30, as I stated above) and making this scale will surely be a challenge. And I’m sure that the tools we use will change, too, to accommodate our needs (speaking of which, where is the Skype replacement already?).

You can’t force community and culture. It starts with how you hire and is tested every day (whether you’re working in the same physical location or not). Build (or seek out) a team that is committed to making something special across the board. Collaboration tools and processes will follow according, and they should compliment and enhance the way you work, not dictate it.

Mark

statebox, an eventually consistent data model for Erlang (and Riak)

May 13, 2011

This was originally posted by Bob Ippolito on May 9th on the Mochi Media Labs Blog. If you’re planning to comment, please do so on the original post.

A few weeks ago when I was on call at work I was chasing down a bug in friendwad [1] and I realized that we had made a big mistake. The data model was broken, it could only work with transactions but we were using Riak. The original prototype was built with Mnesia, which would’ve been able to satisfy this constraint, but when it was refactored for an eventually consistent data model it just wasn’t correct anymore. Given just a little bit of concurrency, such as a popular user, it would produce inconsistent data. Soon after this discovery, I found another service built with the same invalid premise and I also realized that a general solution to this problem would allow us to migrate several applications from Mnesia to Riak.

When you choose an eventually consistent data store you’re prioritizing availability and partition tolerance over consistency, but this doesn’t mean your application has to be inconsistent. What it does mean is that you have to move your conflict resolution from writes to reads. Riak does almost all of the hard work for you [2], but if it’s not acceptable to discard some writes then you will have to set allow_mult to true on your bucket(s) and handle siblings [3] from your application. In some cases, this might be trivial. For example, if you have a set and only support adding to that set, then a merge operation is just the union of those two sets.

statebox is my solution to this problem. It bundles the value with repeatable operations [4] and provides a means to automatically resolve conflicts. Usage of statebox feels much more declarative than imperative. Instead of modifying the values yourself, you provide statebox with a list of operations and it will apply them to create a new statebox. This is necessary because it may apply this operation again at a later time when resolving a conflict between siblings on read.

Design goals (and non-goals):

  • The intended use case is for data structures such as dictionaries and sets
  • Direct support for counters is not required
  • Applications must be able to control the growth of a statebox so that it does not grow indefinitely over time
  • The implementation need not support platforms other than Erlang and the data does not need to be portable to nodes that do not share code
  • It should be easy to use with Riak, but not be dependent on it (clear separation of concerns)
  • Must be comprehensively tested, mistakes at this level are very expensive
  • It is ok to require that the servers’ clocks are in sync with NTP (but it should be aware that timestamps can be in the future or past)

Here’s what typical statebox usage looks like for a trivial application (note: Riak metadata is not merged [5]). In this case we are storing an orddict in our statebox, and this orddict has the keys following and followers.

“`erlang
-module(friends).
-export([add_friend/2, get_friends/1]).

-define(BUCKET, <<“friends”>>).
-define(STATEBOX_MAX_QUEUE, 16). %% Cap on max event queue of statebox
-define(STATEBOX_EXPIRE_MS, 300000). %% Expire events older than 5 minutes
-define(RIAK_HOST, “127.0.0.1”).
-define(RIAK_PORT, 8087).

-type user_id() :: atom().
-type orddict(T) :: [T].
-type ordsets(T) :: [T].
-type friend_pair() :: {followers, ordsets(user_id())} |
{following, ordsets(user_id())}.

-spec add_friend(user_id(), user_id()) -> ok.
add_friend(FollowerId, FolloweeId) ->
statebox_riak:apply_bucket_ops(
?BUCKET,
[{[friend_id_to_key(FollowerId)],
statebox_orddict:f_union(following, [FolloweeId])},
{[friend_id_to_key(FolloweeId)],
statebox_orddict:f_union(followers, [FollowerId])}],
connect()).

-spec get_friends(user_id()) -> [] | orddict(friend_pair()).
get_friends(Id) ->
statebox_riak:get_value(?BUCKET, friend_id_to_key(Id), connect()).

%% Internal API

connect() ->
{ok, Pid} = riakc_pb_client:start_link(?RIAK_HOST, ?RIAK_PORT),
connect(Pid).

connect(Pid) ->
statebox_riak:new([{riakc_pb_client, Pid},
{max_queue, ?STATEBOX_MAX_QUEUE},
{expire_ms, ?STATEBOX_EXPIRE_MS},
{from_values, fun statebox_orddict:from_values/1}]).

friend_id_to_key(FriendId) when is_atom(FriendId) ->
%% NOTE: You shouldn’t use atoms for this purpose, but it makes the
%% example easier to read!
atom_to_binary(FriendId, utf8).
“`

To show how this works a bit more clearly, we’ll use the following sequence of operations:

“`erlang
add_friend(alice, bob), %% AB
add_friend(bob, alice), %% BA
add_friend(alice, charlie). %% AC
“`

Each of these add_friend calls can be broken up into four separate atomic operations, demonstrated in this pseudocode:

“`erlang
%% add_friend(alice, bob)
Alice = get(alice),
put(update(Alice, following, [bob])),
Bob = get(bob),
put(update(Bob, followers, [alice])).

“`

Realistically, these operations may happen with some concurrency and cause conflict. For demonstration purposes we will have AB happen concurrently with BA and the conflict will be resolved during AC. For simplicity, I’ll only show the operations that modify the key for
alice.

“`erlang
AB = get(alice), %% AB (Timestamp: 1)
BA = get(alice), %% BA (Timestamp: 2)
put(update(AB, following, [bob])), %% AB (Timestamp: 3)
put(update(BA, followers, [bob])), %% BA (Timestamp: 4)
AC = get(alice), %% AC (Timestamp: 5)
put(update(AC, following, [charlie])). %% AC (Timestamp: 6)
“`

Timestamp 1:

There is no data for alice in Riak yet, so
statebox_riak:from_values([]) is called and we get a statebox
with an empty orddict.

“`erlang
Value = [],
Queue = [].
“`

Timestamp 2:

There is no data for alice in Riak yet, so
statebox_riak:from_values([]) is called and we get a statebox
with an empty orddict.

“`erlang
Value = [],
Queue = [].
“`

Timestamp 3:

Put the updated AB statebox to Riak with the updated value.

“`erlang
Value = [{following, [bob]}],
Queue = [{3, {fun op_union/2, following, [bob]}}].
“`

Timestamp 4:

Put the updated BA statebox to Riak with the updated value. Note
that this will be a sibling of the value stored by AB.

“`erlang
Value = [{followers, [bob]}],
Queue = [{4, {fun op_union/2, followers, [bob]}}].
“`

Timestamp 5:

Uh oh, there are two stateboxes in Riak now… so
statebox_riak:from_values([AB, BA]) is called. This will apply
all of the operations from both of the event queues to one of the
current values and we will get a single statebox as a result.

“`erlang
Value = [{followers, [bob]},
{following, [bob]}],
Queue = [{3, {fun op_union/2, following, [bob]}},
{4, {fun op_union/2, followers, [bob]}}].
“`

Timestamp 6:

Put the updated AC statebox to Riak. This will resolve siblings
created at Timestamp 3 by BA.

“`erlang
Value = [{followers, [bob]},
{following, [bob, charlie]}],
Queue = [{3, {fun op_union/2, following, [bob]}},
{4, {fun op_union/2, followers, [bob]}},
{6, {fun op_union/2, following, [charlie]}}].
“`

Well, that’s about it! alice is following both bob and charlie despite the concurrency. No locks were harmed during this experiment, and we’ve arrived at eventual consistency by using statebox_riak, statebox, and Riak without having to write any conflict resolution code of our own.

Bob

And if you’re at all interested in getting paid to do stuff like this, Mochi is hiring.

References

[1] friendwad manages our social graph for Mochi Social and MochiGames.
It is also evidence that naming things is a hard problem in
computer science.
[2] See Basho’s articles on Why Vector Clocks are Easy and
Why Vector Clocks are Hard.
[3] When multiple writes happen to the same place and they have
branching history, you’ll get multiple values back on read.
These are called siblings in Riak.
[4] An operation F is repeatable if and only if F(V) = F(F(V)).
You could also call this an idempotent unary operation.
[5] The default conflict resolution algorithm in statebox_riak
chooses metadata from one sibling arbitrarily. If you use
metadata, you’ll need to come up with a clever way to merge it
(such as putting it in the statebox and specifying a custom
resolve_metadatas in your call to statebox_riak:new/1).

It's Time to Drop the "F" Bomb – or "Lies, Damn Lies, and NoSQL."

May 11, 2011

I’m on a plane to Goto Copenhagen from our electric post-Kill-Dash-Nine Board meeting in Washington, DC and, afterwards, an intense client meeting. I went to watch Pete Sheldon, our new Director of Sales, and Justin Sheehy at work. I finally had a chance to sit and study a proposal for the Basho product roadmap for the next year. This roadmap is both breathtakingly ambitious and oddly incremental, quotidian even.

In the next year we will solve problems endemic to distributed systems  – groundbreaking work of the sort careers are surely made — and yet at the same time, these problems seem incremental and iterative; part of an ongoing process of small improvements. They seem both astounding and inevitable.

This led me to an interesting insight — doing this is not easy.

What we are doing is like digging a canal through bedrock. We are hacking away at hard problems — problems others encountered and, either died trying or, mopping their brows with their handkerchiefs, threw down their shovels and went shopping. A lot of cool companies are hacking away, too, so it is not like we are alone, but the honorable and the diligent are not what this post is about.

This post is about the ugly truth I have to call out.

To put it bluntly, if you are claiming the architectural challenges presented by applications with high write loads spread across multiple data centers are easy, you are lying. You do not, as Theo Schlossnagle remarked recently to us, “respect the problem.” You must respect the problem or you disrespect the necessary tradeoffs. And if you disrespect the tradeoffs, you disrespect your user. And if you disrespect your user, you are, inevitably, a liar. You say _this_ is easy. You promise free lunches. You guarantee things that turn out to be impossible. You lie.

What our technology generation is attempting is really hard. There is no easy button. You can’t play fast and loose with the laws of physics or hand-wave around critical durability issues. You can sell this stuff to your venture capitalist, but we’re not buying it.

Immutable laws are not marketing. And therefore, marketing can’t release you from the bonds of immutable laws. You can’t solve the intractable problems of distributed systems so eloquently summarized with three letters – C-A-P – by Google’s cloud architect (and Basho Board member) Dr. Eric Brewer (a man both lauded and whose full impact on our world has not yet been reckoned), with specious claims about downloads and full consistency.

To wit:

  • Memory is not storage.
  • Trading the RDBMS world for uptime is hard. There are no half-steps. No transitional phases.
  • The geometry of a spinning disk matters for your app. You can’t escape this.
  • Your hardware RAID controller is not perfect. It screws things up and needs to be debugged.
  • Replication between two data centers is hard, let alone replication between three or 15 data centers.
  • Easily adding nodes to a cluster under load impacts performance for a period determined by the amount of data stored on the existing nodes and the load on the system…and the kind of servers you are using…and a dozen other things. It looks easy in the beginning.

These are all sensible limitations. Like the speed of light or the poor quality of network television, these are universal constants. The point is: tradeoffs can’t be solved by marketing.

To be sure, there are faster databases than Riak. But do they ship with default settings optimized for speed or optimized for safety? We *ache* to be faster. We push ourselves to be faster. We tune and optimize and push. But we will never cross the line to lose data. While it is always tempting to set our defaults to *fast* instead of *safe*, we won’t do it. We will sacrifice speed to protect your data. In fact, if you prefer speed to preserving data, *don’t use Riak*. We tell the truth even if it means losing users. We will not lie.

Which is why others who do it make me ball my fists, score my palms, and look for a heavy bag to punch. Lying about what you can do – and spreading lies about other approaches – is a blatant attempt to replace the sacrifice of hard-core engineering and ops with fear, uncertainty, and doubt – FUD.

People who claim they are “winning NoSQL” with FUD are damaging our collective chance to effect a long-overdue change to the way data is stored and distributed. This opportunity is nothing short of a quantum shift in the the quality of your life if you are in development, operations, or are a founder who lives and dies by the minute-to-minute performance of your brainchild/application.

The FUD-spreaders are destroying this opportunity with their lies. They are polluting the well by focusing on false marketing – on being the loud idiot drunk – instead of solving the problem. They can screw this up with their failure. It is time for us to demand they drop the FUD – drop the “F” bomb – and stop lying about what they can do. Just tell the truth, like Basho does — admit this is a hard problem and that hardcore engineering is the answer. In fact, they should do the honorable thing and quit the field if they are not ready to invest in the work needed to solve this problem.

If we, collectively, the developer and sysadmin community running the infrastructure of the world economy, allow people to replace engineering with marketing lies, to trade coffee mugs for countless hours of debugging, and in doing so, to destroy the reputation of a new class of data storage systems before they have had a chance to gain a foothold in the technology world, we all lose.

There are many reasons why the FUD spreaders persist.

There are the smart folks who throw our hands up and cynically say that liars are by their nature better marketers. But marketing need not be lies, cynically accepted.

Then there are some of us who are too busy keeping projects or businesses afloat to really dig into the facts. But we sense that we are being lied to, and so we detach, saying this is all FUD. This can’t help us. Tragically, we miss the opportunity to make a big change

Most of us simply want to trust other developers and will believe claims that seem too good to be true. If we do this, we are in a small but serious way saying that our hard-won operational wisdom is meaningless, that anyone who has deployed a production application or contributed to an open-source project has no standing to challenge the loud-mouth making claims that up-time is easy.

Up-time is not easy. Sleeping through the night without something failing is a blessing. Do not – *do not* – let VCs and marketers mess up our opportunity to take weekends off and sleep through the night when we are on call. The database technologies of 1980 (and their modern apologists in NoSQL) should not shape the lives of technologists in 2011.

In Conclusion…

In the briefest terms, Basho won’t betray this revolution because we keep learning big lessons from our small mistakes. We are our harshest critics.

We will deliver a series of releases that allow you to tune for the entire spectrum of CAP tradeoffs – strong consistency to strong partition tolerance – while making clear the tradeoffs and costs. At the same time Riak will provide plugins for Memcache, secondary indices, and also a significant departure from existing concepts of MapReduce that allows for flexible, simple, yet massively distributed computation, and much more user-friendly error reporting and logging. (Everyone reading this understands why that last item merits inclusion on any list of great aspirations – usability can damn or drive a project.)

We will deliver these major innovations, improvements, and enhancements, and they will be hard for us and our community to build. And it will take time for us to explain it to people. And you will find bugs. And it will work better a year after it ships than it does on day one.

But we will never lie to you.

We call on others to please drop the FUD, to acknowledge the truth about the challenges we all face running today’s infrastructure, and to join us in changing the world for the better.

Tony

 

Riak Search Explained

May 9, 2011

At last month’s San Francisco Riak Meetup, Basho Developer Advocate Dan Reverri gave an extensive and action-packed presentation on Riak Search. This talk goes from a basic overview of what Riak Search is and does, up through running a Riak Search Sample application. If you’re at all interested in what Riak Search can do for your applications, this video is well worth your 35 minutes. You can also get a PDF copy of Dan’s slides here.

When you’re done with the video:

NOTE: In the presentation Dan states that Riak Search’s standard analyzer supports stemming. This is not entirely true. The standard analyzer currently does not support stemming, but Riak Search allows you to call out to Java Lucene analyzers, which *do* support stemming. Sorry for any confusion.

Enjoy.

Mark

Riak Search Explained from Basho Technologies on Vimeo.

Why MapReduce Is Hard

April 20, 2011

At the highest level, MapReduce can be a fundamentally hard concept to grasp. In our last blog post on MapReduce, you learned how easy it is. After all, it’s just code, right?

The story gets a bit more complicated as soon as you run MapReduce in a distributed environment like Riak. Some of these caveats are just things to be aware of, others are simply in the nature of both Riak’s implementation and the ideas in MapReduce itself. Adding to that, it’s just a hard concept to grasp. I’ll be the first to admit it took me a long time to understand.

MapReduce is hard because it’s just code

Wait, didn’t I just tell you that MapReduce is easy because it’s just code? I believe I did, Bob.

Code requires knowledge of what it does. Code requires the discipline to keep it clean and easy to grasp. Code requires knowledge of the data structures involved. Code requires knowledge of how Riak treats your MapReduce functions depending on their phase in a particular MapReduce request.

Take Riak’s map functions. Depending on whether you’re intending to chain multiple functions in one request, they must either return a bucket/key pair or the data for either the client or the following reduce function.

One way to solve this would be to add an extra argument, based on which the map function can decide whether it returns a bucket/key pair or the data. Why would you do this? Consider a scenario where you want to extract a set of objects based on a match in one attribute and then run a second map matching another attribute. Here’s an example:

javascript
var extractByMatchingAttribute = function(value, keydata, args) {
var doc = Riak.mapKeyValuesJson(value)[0];
if (doc.host.match(/basho.com/)) {
if (args.last) {
return [1];
} else {
return [[value.bucket, value.key]];
}
}
}

Now you can pass in an argument object (e.g. {last: true}) to every phase in a MapReduce request. This is a valid solution, but makes the map function painfully more aware of the environment in which it’s running.

“But hey, ” you wonder, “couldn’t I just make these into a single map function instead and return the value immediately?” You could, but depending on other MapReduce scenarios you could end up repeating a lot of code across a lot of functions. Once again, you could extract it, ignore it, or try to abstract, making map functions chainable through extra arguments, which then can be passed to every single map phase independently.

I told you, MapReduce is hard because it’s just code, didn’t I?

MapReduce is hard because it doesn’t (yet) have a common language

Wouldn’t it be nice if we could just use a simple language to express what we want our Riak cluster to do instead? Something like the following, where we have a bucket full of log entries, every one of them represented by a JSON object, which handily includes a host attribute:

sql
LOAD log_entries FROM KEY '2011-04-08' TO KEY '2011-05-10' INTO entries
FILTER entries BY host MATCHING /basho.com/ AND
GROUP entries BY host

There, three lines of easily readable text in a purely hypothetical example, although slightly inspired by Apache Pig. “Oh hey,” you’re saying, “doesn’t that look a bit like SQL?” Maybe it does, maybe it doesn’t, but that’s not the point. More important than that, it’s easier to read.

Will we see something along these lines in Riak? No comment.

MapReduce is hard because accessing lots of data is expensive

MapReduce is the technique du jour to query and filter data in Riak. It’s intended to be used as a tool to transform and analyze a set of data. You hand in a bunch of bucket/key pairs, and Riak feeds them into your map and reduce functions. Simple enough.

It has been our experience, however, that what users want is to feed the data from a whole bucket into a MapReduce request or a whole range of them. Thankfully you can do the latter using the rather nice key filters.

Still, fetching all the keys from a bucket requires a lot of resources. Key filters can certainly help to reduce it, because they fold the list of keys before the data is loaded from disk, which is certainly faster than loading everything right away. But it still takes a lot of time when there’s millions of keys stored in a Riak cluster. Not to mention the fact that every node in the cluster is involved in both fetching all objects from a bucket and running key filters.

If this is what you’re trying to achieve, you should have a look at Riak
Search
. It does a much better job at these reverse lookup queries (which some call secondary indexes), and you can neatly feed the results into a MapReduce request, something we’ve implemented in Riaktant, our little syslog collector and aggregator. But we’ll leave dissecting that code to another blog post.

We have some more tricks up our sleeve which should be appearing in the near future, so stay tuned. I’m sure it’ll blow your mind just like it blew mine.

MapReduce is hard because going distributed requires coordination

Whenever you fire off a MapReduce request in a Riak cluster, the node you’re requesting becomes the coordinator of this particular request. This poor node suddenly has the responsibility of sending your map request to all the relevant nodes in the cluster, using the the preference list where applicable.

The coordinating node suddenly has the job of sending out the map requests and waiting for them to return, triggering the timeout if it doesn’t. If the map phases return bucket/key pairs and there’s another map phase to run, it starts over, sending new requests to the nodes responsible for the pairs.

Map phases in a Riak cluster always run close to the data to reduce overhead both on network traffic and the general load in the cluster, as it greatly reduces the amount of data sent across the wire, and lets the nodes responsible for the data get some of that MapReduce action as well.

That’s a lot of work for the coordinating node, but except for the reduce part, it’s not necessarily heavy computationally. It does, however, have to track state of all the phases sent out to the cluster, collect the results and control behaviour in case of failure of just a single node in the cluster.

What can you do about this? Other than keep your initial data set for a MapReduce set as small as possible, the ball is pretty much in our court. It’s Riak’s task to make sure a MapReduce request is handled and coordinated properly.

MapReduce is hard because reduce runs recursively

Enter the brain bender called re-reduce. If you haven’t run into it, you’re in for a treat. A reduce function returns a list of aggregation results based on a list of inputs. Depending on how large the initial result set from the function is going to be, Riak’s MapReduce will feed it the first round of results again, a technique commonly called re-reduce.

In general your reduce functions should be oblivious to the kind of data they’re fed, they should work properly in both the normal and the re-reduce case. Two options here:

  • Make your reduce functions self-aware.
    They know what kind of data they get from a previous map phase, and what kind of data they return. But that means adding some sort of type checking to your reduce functions, which is cumbersome and, especially with JavaScript, not exactly beautiful and actually achievable. So let’s assume I never told you about this option.
  • Make sure they always return data in the format they receive it.
    This is the way you’ll usually end up going. It’s just a lot easier to spend a little bit more time thinking how to build a result that you can re-run the same function and the result will either be unchanged or just aggregated again in the same fashion as before.

Here’s an example: a function that groups by an attribute. It assumes a list containing JavaScript objects with attributes as keys and numbers as values, e.g. {attribute: 1}. It simply adds up the values for every object, so that a list containing three pairs of the aforementioned example turns into {attribute: 3}. This makes it nicely
ignorant of where the data comes from and re-reduce will simply be fed the same data structures as the initial reduce.

The second parameter to the function is an argument you can pass to the reduce phase, telling it which particular attribute you’re interested in, making the function nicely reuseable along the way.

javascript
function(values, field) {
var result = {};
for (value in values) {
for (field in values[value]) {
if (field in result) {
result[field] += values[value][field];
} else {
result[field] = values[value][field];
}
}
}
return [result];
}

Re-reduce is a weird concept to grasp, but when you just think of it as two different ways Riak will pass data to your reduce function, it’s actually not that hard. Just make sure a reduce function returns data in a way it can run on again, and you’re fine.

MapReduce is hard because debugging errors is hard

Finding out where things go wrong in a distributed MapReduce environment like Riak is hard. The only useful way of inspecting data you have is by using the ejsLog() function in JavaScript, and that means you’ll have to go look on every node in the cluster to find out what’s wrong or if the data is in the format you expect it to be. The following dumps the list of objects for a map or reduce function as JSON into the log file /tmp/mapreduce.log.

javascript
ejsLog('/tmp/mapreduce', JSON.stringify(values));

The small fix: test your MapReduce functions on a cluster with a single node before cranking it up a notch, or have a look at Sean Cribbs’ riak-qc.js framework for testing Riak MapReduce functions.

The MapReduce bottom line

MapReduce is hard, but we’re not going shopping just yet. Instead, we’re keeping a close eye on what our users are trying to achieve, improving Riak along the way. For more details on Riak’s MapReduce, check out the wiki, read a blog post on more practical MapReduce, or watch last year’s webinar on MapReduce. If you’re feeling adventurous, have a look at the examples and MapReduce code used in Riaktant. I’ll be sure to get back to you with a proper introduction on how it uses MapReduce to sift through syslog data.

Mathias

Riak Core – The Coordinator

April 19, 2011

This was originally posted on Ryan Zezeski’s working blog Try Try Try.

At the end of my vnode post I asked the question Where’s the redundancy? There is none in RTS, thus far. Riak Core isn’t magic but rather a suite of tools for building distributed, highly available systems. You have to build your own redundancy. In this post I’ll talk about the coordinator and show how to implement one.

What is a Coordinator?

Logically speaking, a coordinator is just what it sounds like. It’s job is to coordinate incoming requests. It enforces the consistency semantics of N, R and W and performs anti-entropy services like read repair. In simpler terms, it’s responsible for distributing data across the cluster and re-syncing data when it finds conflicts. You could think of vnodes as the things that Get Shit Done (TM) and the coordinators as the other things telling them what to do and overseeing the work. They work in tandem to make sure your request is being handled as best as it can.

To be more concrete a coordinator is a gen_fsm. Each request is handled in it’s own Erlang process. A coordinator communicates with the vnode instances to fulfill requests.

To wrap up, a coordinator

  • coordinates requests
  • enforces the consistency requirements
  • performs anti-entropy
  • is an Erlang process that implements the gen_fsm behavior
  • and communicates with the vnode instances to execute the request

Implementing a Coordinator

Unlike the vnode, Riak Core doesn’t define a coordinator behavior. You have to roll your own each time. I used Riak’s get and put coordinators for guidance. You’ll notice they both have a similar structure. I’m going to propose a general structure here that you can use as your guide, but remember that there’s nothing set in stone on how to write a coordinator.

Before moving forward it’s worth mentioning that you’ll want to instantiate these coordinators under a simple_one_for_one supervisor. If you’ve never heard of simple_one_for_one before then think of it as a factory for Erlang processes of the same type. An incoming request will at some point call supervisor:start_child/2 to instantiate a new FSM dedicated to handling this specific request.

init(Args) -> {ok, InitialState, SD, Timeout}

erlang
Args = term()
InitialState = atom()
SD = term()
Timeout = integer()

This is actually part of the gen_fsm behavior, i.e. it’s a callback you must implement. It’s job is to specify the InitialState name and it’s data (SD). In this case you’ll also want to specify a Timeout value of 0 in order to immediately go to the InitialState, prepare.

A get coordinator for RTS is passed four arguments.

  1. ReqId: A unique id for this request.
  2. From: Who to send the reply to.
  3. Client: The name of the client entity — the entity that is writing log events to RTS.
  4. StatName: The name of the statistic the requester is interested in.

All this data will be passed as a list to init and the only work that needs to be done is to build the initial state record and tell the FSM to proceed to the prepare state.

erlang
init([ReqId, From, Client, StatName]) ->
SD = #state{req_id=ReqId,
from=From,
client=Client,
stat_name=StatName},
{ok, prepare, SD, 0}.

The write coordinator for RTS is very similar but has two additional arguments.

  1. Op: The operation to be performed, one of set, append, incr,
    incrby or sadd.
  2. Val: The value of the operation. For the incr op this is undefined.

Here is the code.

erlang
init([ReqID, From, Client, StatName, Op, Val]) ->
SD = #state{req_id=ReqID,
from=From,
client=Client,
stat_name=StatName,
op=Op,
val=Val},
{ok, prepare, SD, 0}.

prepare(timeout, SD0) -> {next_state, NextState, SD, Timeout}

erlang
SD0 = SD = term()
NextState = atom()
Timeout = integer()

The job of prepare is to build the preference list. The preference list is the preferred set of vnodes that should participate in this request. Most of the work is actually done by riak_core_util:chash_key/1 and riak_core_apl:get_apl/3. Both the get and write coordinators do the same thing here.

  1. Calculate the index in the ring that this request falls on.
  2. From this index determine the N preferred partitions that should handle the request.

Here is the code.

erlang
prepare(timeout, SD0=#state{client=Client,
stat_name=StatName}) ->
DocIdx = riak_core_util:chash_key({list_to_binary(Client),
list_to_binary(StatName)}),
Prelist = riak_core_apl:get_apl(DocIdx, ?N, rts_stat),
SD = SD0#state{preflist=Prelist},
{next_state, execute, SD, 0}.

The fact that the key is a two-tuple is simply a consequence of the fact that Riak Core was extracted from Riak and some of it’s key-value semantics crossed during the extraction. In the future things like this may change.

execute(timeout, SD0) -> {next_state, NextState, SD}

erlang
SD0 = SD = term()
NextState = atom()

The execute state executes the request by sending commands to the vnodes in the preflist and then putting the coordinator into a waiting state. The code to do this in RTS is really simple; call the vnode command passing it the preference list. Under the covers the vnode has been changed to use riak_core_vnode_master:command/4 which will distribute the commands across the Preflist for you. I’ll talk about this later in the post.

Here’s the code for the get coordinator.

erlang
execute(timeout, SD0=#state{req_id=ReqId,
stat_name=StatName,
preflist=Prelist}) ->
rts_stat_vnode:get(Prelist, ReqId, StatName),
{next_state, waiting, SD0}.

The code for the write coordinator is almost identical except it’s parameterized on Op.

erlang
execute(timeout, SD0=#state{req_id=ReqID,
stat_name=StatName,
op=Op,
val=undefined,
preflist=Preflist}) ->
rts_stat_vnode:Op(Preflist, ReqID, StatName),
{next_state, waiting, SD0}.

waiting(Reply, SD0) -> Result

erlang
Reply = {ok, ReqID}
Result = {next_state, NextState, SD}
| {stop, normal, SD}
NextState = atom()
SD0 = SD = term()

This is probably the most interesting state in the coordinator as it’s job is to enforce the consistency requirements and possibly perform anti-entropy in the case of a get. The coordinator waits for replies from the various vnode instances it called in execute and stops once it’s requirements have been met. The typical shape of this function is to pattern match on the Reply, check the state data SD0, and then either continue waiting or stop depending on the current state data.

The get coordinator waits for replies with the correct ReqId, increments the reply count and adds the Val to the list of Replies. If the quorum R has been met then return the Val to the requester and stop the coordinator. If the vnodes didn’t agree on the value then return all observed values. In this post I am punting on the conflict resolution and anti-entropy part of the coordinator and exposing the inconsistent state to the client application. I’ll implement them in my next post. If the quorum hasn’t been met then continue waiting for more replies.

erlang
waiting({ok, ReqID, Val}, SD0=#state{from=From, num_r=NumR0, replies=Replies0}) ->
NumR = NumR0 + 1,
Replies = [Val|Replies0],
SD = SD0#state{num_r=NumR,replies=Replies},
if
NumR =:= ?R ->
Reply =
case lists:any(different(Val), Replies) of
true ->
Replies;
false ->
Val
end,
From ! {ReqID, ok, Reply},
{stop, normal, SD};
true -> {next_state, waiting, SD}
end.

The write coordinator has things a little easier here cause all it cares about is knowing that W vnodes executed it’s write request.

erlang
waiting({ok, ReqID}, SD0=#state{from=From, num_w=NumW0}) ->
NumW = NumW0 + 1,
SD = SD0#state{num_w=NumW},
if
NumW =:= ?W ->
From ! {ReqID, ok},
{stop, normal, SD};
true -> {next_state, waiting, SD}
end.

What About the Entry Coordinator?

Some of you may be wondering why I didn’t write a coordinator for the entry vnode? If you don’t remember this is responsible for matching an incoming log entry and then executing it’s trigger function. For example, any incoming log entry from an access log in combined logging format will cause the total_reqs stat to be incremented by one. I only want this action to occur at maximum once per entry. There is no notion of N. I could write a coordinator that tries to make some guarentees about it’s execution but for now I’m ok with possibly dropping data occasionally.

Changes to rts.erl and rts_stat_vnode

Now that we’ve written coordinators to handle requests to RTS we need to refactor the old rts.erl and rts_stat_vnode. The model has changed from rts calling the vnode directly to delegating the work to rts_get_fsm which will call the various vnodes and collect responses.

“`text
rts:get —-> rts_stat_vnode:get (local)

                                                       /--> stat_vnode@rts1

rts:get —-> rts_get_fsm:get —-> riak_stat_vnode:get –|—> stat_vnode@rts2
–> stat_vnode@rts3
“`

Instead of performing a synchronous request the rts:get/2 function now calls the get coordinator and then waits for a response.

erlang
get(Calient, StatName) ->
{ok, ReqID} = rts_get_fsm:get(Client, StatName),
wait_for_reqid(ReqID, ?TIMEOUT).

The write requests underwent a similar refactoring.

“`erlang
do_write(Client, StatName, Op) ->
{ok, ReqID} = rts_write_fsm:write(Client, StatName, Op),
wait_for_reqid(ReqID, ?TIMEOUT).

do_write(Client, StatName, Op, Val) ->
{ok, ReqID} = rts_write_fsm:write(Client, StatName, Op, Val),
wait_for_reqid(ReqID, ?TIMEOUT).
“`

The rts_stat_vnode was refactored to use riak_core_vnode_master:command/4 which takes a Preflist, Msg, Sender and VMaster as argument.

Preflist: The list of vnodes to send the command to.

Msg: The command to send.

Sender: A value describing who sent the request, in this case the coordinator. This is used by the vnode to correctly address the reply message.

VMaster: The name of the vnode master for the vnode type to send this command to.

erlang
get(Preflist, ReqID, StatName) ->
riak_core_vnode_master:command(Preflist,
{get, ReqID, StatName},
{fsm, undefined, self()},
?MASTER).

Coordinators in Action

Talk is cheap, let’s see it in action. Towards the end of the vnode post I made the following statement:

“If you start taking down nodes you’ll find that stats start to disappear.”

One of the main objectives of the coordinator is to fix this problem. Lets see if it worked.

Build the devrel

bash
make
make devrel

Start the Cluster

bash
for d in dev/dev*; do $d/bin/rts start; done
for d in dev/dev{2,3}; do $d/bin/rts-admin join rts1@127.0.0.1; done

Feed in Some Data

bash
gunzip -c progski.access.log.gz | head -100 | ./replay --devrel progski

Get Some Stats

text
./dev/dev1/bin/rts attach
(rts1@127.0.0.1)1> rts:get("progski", "total_reqs").
{ok,97}
(rts1@127.0.0.1)2> rts:get("progski", "GET").
{ok,91}
(rts1@127.0.0.1)3> rts:get("progski", "total_sent").
{ok,445972}
(rts1@127.0.0.1)4> rts:get("progski", "HEAD").
{ok,6}
(rts1@127.0.0.1)5> rts:get("progski", "PUT").
{ok,not_found}
(rts1@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_reqs").
[{730750818665451459101842416358141509827966271488,
'rts3@127.0.0.1'},
{753586781748746817198774991869333432010090217472,
'rts1@127.0.0.1'},
{776422744832042175295707567380525354192214163456,
'rts2@127.0.0.1'}]
(rts1@127.0.0.1)7> rts:get_dbg_preflist("progski", "GET").
[{274031556999544297163190906134303066185487351808,
'rts1@127.0.0.1'},
{296867520082839655260123481645494988367611297792,
'rts2@127.0.0.1'},
{319703483166135013357056057156686910549735243776,
'rts3@127.0.0.1'}]

Don’t worry about what I did on lines 6 and 7 yet, I’ll explain in a second.

Kill a Node

text
(rts1@127.0.0.1)8> os:getpid().
"91461"
Ctrl^D
kill -9 91461

Verify it’s Down

bash
$ ./dev/dev1/bin/rts ping
Node 'rts1@127.0.0.1' not responding to pings.

Get Stats on rts2

You’re results my not exactly match mine as it depends on which vnode instances responded first. The coordinator only cares about getting R responses.

text
./dev/dev2/bin/rts attach
(rts2@127.0.0.1)1> rts:get("progski", "total_reqs").
{ok,97}
(rts2@127.0.0.1)2> rts:get("progski", "GET").
{ok,[not_found,91]}
(rts2@127.0.0.1)3> rts:get("progski", "total_sent").
{ok,445972}
(rts2@127.0.0.1)4> rts:get("progski", "HEAD").
{ok,[not_found,6]}
(rts2@127.0.0.1)5> rts:get("progski", "PUT").
{ok,not_found}

Let’s Compare the Before and After Preflist

Notice that some gets on rts2 return a single value as before whereas others return a list of values. The reason for this is because the Preflist calculation is now including fallback vnodes. A fallback vnode is one that is not on it’s appropriate physical node. Since we killed rts1 it’s vnode requests must be routed somewhere else. That somewhere else is a fallback vnode. Since the request-reply model between the coordinator and vnode is asynchronous our reply value will depend on which vnode instances reply first. If the instances with values reply first then you get a single value, otherwise you get a list of values. My next post will improve this behavior slightly to take advantage of the fact that we know there are still two nodes with the data and there should be no reason to return conflicting values.

text
(rts2@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_reqs").
[{730750818665451459101842416358141509827966271488,
'rts3@127.0.0.1'},
{776422744832042175295707567380525354192214163456,
'rts2@127.0.0.1'},
{753586781748746817198774991869333432010090217472,
'rts3@127.0.0.1'}]
(rts2@127.0.0.1)7> rts:get_dbg_preflist("progski", "GET").
[{296867520082839655260123481645494988367611297792,
'rts2@127.0.0.1'},
{319703483166135013357056057156686910549735243776,
'rts3@127.0.0.1'},
{274031556999544297163190906134303066185487351808,
'rts2@127.0.0.1'}]

In both cases either rts2 or rts3 stepped in for the missing rts1. Also, in each case, one of these vnodes is going to return not_found since it’s a fallback. I added another debug function to determine which one.

text
(rts2@127.0.0.1)8> rts:get_dbg_preflist("progski", "total_reqs", 1).
[{730750818665451459101842416358141509827966271488,
'rts3@127.0.0.1'},
97]
(rts2@127.0.0.1)9> rts:get_dbg_preflist("progski", "total_reqs", 2).
[{776422744832042175295707567380525354192214163456,
'rts2@127.0.0.1'},
97]
(rts2@127.0.0.1)10> rts:get_dbg_preflist("progski", "total_reqs", 3).
[{753586781748746817198774991869333432010090217472,
'rts3@127.0.0.1'},
not_found]
(rts2@127.0.0.1)11> rts:get_dbg_preflist("progski", "GET", 1).
[{296867520082839655260123481645494988367611297792,
'rts2@127.0.0.1'},
91]
(rts2@127.0.0.1)12> rts:get_dbg_preflist("progski", "GET", 2).
[{319703483166135013357056057156686910549735243776,
'rts3@127.0.0.1'},
91]
(rts2@127.0.0.1)13> rts:get_dbg_preflist("progski", "GET", 3).
[{274031556999544297163190906134303066185487351808,
'rts2@127.0.0.1'},
not_found]

Notice the fallbacks are at the end of each list. Also notice that since we’re on rts2 that total_reqs will almost always return a single value because it’s fallback is on another node whereas GET has a local fallback and will be more likely to return first.

Conflict Resolution & Read Repair

In the next post I’ll be making several enhancements to the get coordinator by performing basic conflict resolution and implementing read repair.

Ryan

Follow Up To Riak Operations Webinar

April 15, 2011

Thanks to all who attended Thursday’s webinar on Riak Operations. If you couldn’t make it you can find a screencast of the webinar below. You can also check out the slides directly.

This webinar took users through many of the different operational aspects of running a production Riak cluster. Topics covered include: basic Riak configuration, monitoring, performance, backups, and much more.

Grant