Tag Archives: eventual consistency

statebox, an eventually consistent data model for Erlang (and Riak)

May 13, 2011

This was originally posted by Bob Ippolito on May 9th on the Mochi Media Labs Blog. If you’re planning to comment, please do so on the original post.

A few weeks ago when I was on call at work I was chasing down a bug in friendwad [1] and I realized that we had made a big mistake. The data model was broken, it could only work with transactions but we were using Riak. The original prototype was built with Mnesia, which would’ve been able to satisfy this constraint, but when it was refactored for an eventually consistent data model it just wasn’t correct anymore. Given just a little bit of concurrency, such as a popular user, it would produce inconsistent data. Soon after this discovery, I found another service built with the same invalid premise and I also realized that a general solution to this problem would allow us to migrate several applications from Mnesia to Riak.

When you choose an eventually consistent data store you’re prioritizing availability and partition tolerance over consistency, but this doesn’t mean your application has to be inconsistent. What it does mean is that you have to move your conflict resolution from writes to reads. Riak does almost all of the hard work for you [2], but if it’s not acceptable to discard some writes then you will have to set allow_mult to true on your bucket(s) and handle siblings [3] from your application. In some cases, this might be trivial. For example, if you have a set and only support adding to that set, then a merge operation is just the union of those two sets.

statebox is my solution to this problem. It bundles the value with repeatable operations [4] and provides a means to automatically resolve conflicts. Usage of statebox feels much more declarative than imperative. Instead of modifying the values yourself, you provide statebox with a list of operations and it will apply them to create a new statebox. This is necessary because it may apply this operation again at a later time when resolving a conflict between siblings on read.

Design goals (and non-goals):

  • The intended use case is for data structures such as dictionaries and sets
  • Direct support for counters is not required
  • Applications must be able to control the growth of a statebox so that it does not grow indefinitely over time
  • The implementation need not support platforms other than Erlang and the data does not need to be portable to nodes that do not share code
  • It should be easy to use with Riak, but not be dependent on it (clear separation of concerns)
  • Must be comprehensively tested, mistakes at this level are very expensive
  • It is ok to require that the servers’ clocks are in sync with NTP (but it should be aware that timestamps can be in the future or past)

Here’s what typical statebox usage looks like for a trivial application (note: Riak metadata is not merged [5]). In this case we are storing an orddict in our statebox, and this orddict has the keys following and followers.

-export([add_friend/2, get_friends/1]).

-define(BUCKET, <<”friends”>>).
-define(STATEBOX_MAX_QUEUE, 16). %% Cap on max event queue of statebox
-define(STATEBOX_EXPIRE_MS, 300000). %% Expire events older than 5 minutes
-define(RIAK_HOST, “″).
-define(RIAK_PORT, 8087).

-type user_id() :: atom().
-type orddict(T) :: [T].
-type ordsets(T) :: [T].
-type friend_pair() :: {followers, ordsets(user_id())} |
{following, ordsets(user_id())}.

-spec add_friend(user_id(), user_id()) -> ok.
add_friend(FollowerId, FolloweeId) ->
statebox_orddict:f_union(following, [FolloweeId])},
statebox_orddict:f_union(followers, [FollowerId])}],

-spec get_friends(user_id()) -> [] | orddict(friend_pair()).
get_friends(Id) ->
statebox_riak:get_value(?BUCKET, friend_id_to_key(Id), connect()).

%% Internal API

connect() ->
{ok, Pid} = riakc_pb_client:start_link(?RIAK_HOST, ?RIAK_PORT),

connect(Pid) ->
statebox_riak:new([{riakc_pb_client, Pid},
{max_queue, ?STATEBOX_MAX_QUEUE},
{expire_ms, ?STATEBOX_EXPIRE_MS},
{from_values, fun statebox_orddict:from_values/1}]).

friend_id_to_key(FriendId) when is_atom(FriendId) ->
%% NOTE: You shouldn’t use atoms for this purpose, but it makes the
%% example easier to read!
atom_to_binary(FriendId, utf8).

To show how this works a bit more clearly, we’ll use the following sequence of operations:

add_friend(alice, bob), %% AB
add_friend(bob, alice), %% BA
add_friend(alice, charlie). %% AC

Each of these add_friend calls can be broken up into four separate atomic operations, demonstrated in this pseudocode:

%% add_friend(alice, bob)
Alice = get(alice),
put(update(Alice, following, [bob])),
Bob = get(bob),
put(update(Bob, followers, [alice])).


Realistically, these operations may happen with some concurrency and cause conflict. For demonstration purposes we will have AB happen concurrently with BA and the conflict will be resolved during AC. For simplicity, I’ll only show the operations that modify the key for

AB = get(alice), %% AB (Timestamp: 1)
BA = get(alice), %% BA (Timestamp: 2)
put(update(AB, following, [bob])), %% AB (Timestamp: 3)
put(update(BA, followers, [bob])), %% BA (Timestamp: 4)
AC = get(alice), %% AC (Timestamp: 5)
put(update(AC, following, [charlie])). %% AC (Timestamp: 6)

Timestamp 1:

There is no data for alice in Riak yet, so
statebox_riak:from_values([]) is called and we get a statebox
with an empty orddict.

Value = [],
Queue = [].

Timestamp 2:

There is no data for alice in Riak yet, so
statebox_riak:from_values([]) is called and we get a statebox
with an empty orddict.

Value = [],
Queue = [].

Timestamp 3:

Put the updated AB statebox to Riak with the updated value.

Value = [{following, [bob]}],
Queue = [{3, {fun op_union/2, following, [bob]}}].

Timestamp 4:

Put the updated BA statebox to Riak with the updated value. Note
that this will be a sibling of the value stored by AB.

Value = [{followers, [bob]}],
Queue = [{4, {fun op_union/2, followers, [bob]}}].

Timestamp 5:

Uh oh, there are two stateboxes in Riak now… so
statebox_riak:from_values([AB, BA]) is called. This will apply
all of the operations from both of the event queues to one of the
current values and we will get a single statebox as a result.

Value = [{followers, [bob]},
{following, [bob]}],
Queue = [{3, {fun op_union/2, following, [bob]}},
{4, {fun op_union/2, followers, [bob]}}].

Timestamp 6:

Put the updated AC statebox to Riak. This will resolve siblings
created at Timestamp 3 by BA.

Value = [{followers, [bob]},
{following, [bob, charlie]}],
Queue = [{3, {fun op_union/2, following, [bob]}},
{4, {fun op_union/2, followers, [bob]}},
{6, {fun op_union/2, following, [charlie]}}].

Well, that’s about it! alice is following both bob and charlie despite the concurrency. No locks were harmed during this experiment, and we’ve arrived at eventual consistency by using statebox_riak, statebox, and Riak without having to write any conflict resolution code of our own.


And if you’re at all interested in getting paid to do stuff like this, Mochi is hiring.


[1] friendwad manages our social graph for Mochi Social and MochiGames.
It is also evidence that naming things is a hard problem in
computer science.
[2] See Basho’s articles on Why Vector Clocks are Easy and
Why Vector Clocks are Hard.
[3] When multiple writes happen to the same place and they have
branching history, you’ll get multiple values back on read.
These are called siblings in Riak.
[4] An operation F is repeatable if and only if F(V) = F(F(V)).
You could also call this an idempotent unary operation.
[5] The default conflict resolution algorithm in statebox_riak
chooses metadata from one sibling arbitrarily. If you use
metadata, you’ll need to come up with a clever way to merge it
(such as putting it in the statebox and specifying a custom
resolve_metadatas in your call to statebox_riak:new/1).

From Relational to Riak (Webcast)

**January 02, 2013**

New to Riak? Thinking about using Riak instead of a relational database? Join Basho chief architect Andy Gross and director of product management Shanley Kane for an intro this Thursday (11am PT/2pm ET). In about 30 minutes, we’ll cover the basics of:

* Scalability benefits of Riak, including an examination of limitations around master/slave architectures and sharding, and what Riak does differently
* A look at the operational aspects of Riak and where they differ from relational approaches
* Riak’s data model and benefits for developers, as well as the tradeoffs and limitations of a key/value approach
* Migration considerations, including where to start when migrating existing apps
* Riak’s eventually consistent design
* Multi-site replication options in Riak

Register for the webcast [here](http://info.basho.com/RelationalToRiakJan3.html).



Planning for Eventual Consistency

May 14, 2010

You may remember that last week, we recorded a podcast with Benjamin Black all about the immense variety of databases in the NoSQL space and what your criteria should be when choosing one.

If you listened carefully, you may also remember that Benjamin and Justin Sheehy started to discuss eventual consistency. We decided to roll that into its own podcast as we thought it was a topic worthy of its own episode.

Think there are only a certain subset of databases that are “eventually consistent”? Think again. Regardless of the database you choose, eventual consistency is something you should embrace and plan for, not fear.

Listen, Learn, and Enjoy – -


If you are having problems getting the podcast to play, click here to play in new window or right click to download the podcast.

Toward A Consistent, Fact-based Comparison of Emerging Database Technologies

A Muddle That Slows Adoption

Basho released Riak as an open source project seven months ago and began commercial service shortly thereafter. As new entrants into the loose collection of database projects we observed two things:

  1. Widespread Confusion — the NoSQL rubric, and the decisions of most projects to self-identify under it, has created a false perception of overlap and similarity between projects differing not just technically but in approaches to licensing and distribution, leading to…
  2. Needless Competition — driving the confusion, many projects (us included, for sure) competed passionately (even acrimoniously) for attention as putative leaders of NoSQL, a fool’s errand as it turns out. One might as well claim leadership of all tools called wrenches.

The optimal use cases, architectures, and methods of software development differ so starkly even among superficially similar projects that to compete is to demonstrate a (likely pathological) lack of both user needs and self-knowledge.

This confusion and wasted energy — in other words, the market inefficiency — has been the fault of anyone who has laid claim to, or professed designs on, the NoSQL crown.

  1. Adoption suffers — Users either make decisions based on muddled information or, worse, do not make any decision whatsoever.
  2. Energy is wasted — At Basho we spent too much time from September to December answering the question posed without fail by investors and prospective users and clients: “Why will you ‘win’ NoSQL?”

With the vigor of fools, we answered this question, even though we rarely if ever encountered another project’s software in a “head-to-head” competition. (In fact, in the few cases where we have been pitted head-to-head against another project, we have won or lost so quickly that we cannot help but conclude the evaluation could have been avoided altogether.)

The investors and users merely behaved as rational (though often bemused) market actors. Having accepted the general claim that NoSQL was a monolithic category, both sought to make a bet.

Clearly what is needed is objective information presented in an environment of cooperation driven by mutual self-interest.

This information, shaped not by any one person’s necessarily imperfect understanding of the growing collection of data storage projects but rather by all the participants themselves, would go a long way to remedying the inefficiencies discussed above.

Demystification through data, not marketing claims

We have spoken to representatives of many of the emerging database projects. They have enthusiastically agreed to participate in a project to disclose data about each project. Disclosure will start with the following: a common benchmark framework and benchmarks/load tests modeling common use cases.

    1. A Common Benchmark Framework — For this collaboration to succeed, no single aspect will impact success or failure more than arriving at a common benchmark framework.

At Basho we have observed the proliferation of “microbenchmarks,” or benchmarks that do not reflect the conditions of a production environment. Benchmarks that use a small data set, that do not store to disk, that run for short (less than 12 hours) durations, do more to confuse the issue for end users than any single other factor. Participants will agree on benchmark methods, tools, applicability to use cases, and to make all benchmarks reproducible.

Compounding the confusion is when benchmarks are used for comparison of different use cases or was run on different hardware and yet compared head-to-head as if the tests or systems were identical. We will seek to help participants run equivalent on the various databases and we will not publish benchmark results that do not profile the hardware and configuration of the systems.

  1. Benchmarks That Support Use Cases — participants agree to benchmark their software under the conditions and with load tests reflective of use cases they commonly see in their user base or for which they think their software is best suited.
  2. Dissemination to third-parties — providing easy-to-find data to any party interested in posting results.
  3. Honestly exposing disagreement — Where agreement cannot be reached on any of the elements of the common benchmarking efforts, participants will respectfully expose the rationales for their divergent positions, thus still providing users with information upon which to base decisions.

There is more work to be done but all participants should begin to see the benefits: faster, better decisions by users.

We invite others to join, once we are underway. We, and our counterparts at other projects, believe this approach will go a long way to removing the inefficiencies hindering adoption of our software.

Tony and Justin

Schema Design in Riak – Introduction

March 19, 2010

One of the challenges of switching from a relational database (Oracle, MySQL, etc.) to a “NoSQL” database like Riak is understanding how to represent your data within the database. This post is the beginning of a series of entries on how to structure your data within Riak in useful ways.

Choices have consequences

There are many reasons why you might choose Riak for your database, and I’m going to explain how a few of those reasons will affect the way your data is structured and manipulated.

One oft-cited reason for choosing Riak, and other alternative databases, is the need to manage huge amounts of data, collectively called “Big Data”. If you’re storing lots of data, you’re less likely to be doing online queries across large swaths of the data. You might be doing real-time aggregation in addition to calculating longer-term information in the background or offline. You might have one system collecting the data and another processing it. You might be storing loosely-structured information like log data or ad impressions. All of these use-cases call for low ceremony, high availability for writes, and little need for robust ways of finding data — perfect for a key/value-style scheme.

Another reason one might pick Riak is for flexibility in modeling your data. Riak will store any data you tell it to in a content-agnostic way — it does not enforce tables, columns, or referential integrity. This means you can store binary files right alongside more programmer-transparent formats like JSON or XML. Using Riak as a sort of “document database” (semi-structured, mostly de-normalized data) and “attachment storage” will have different needs than the key/value-style scheme — namely, the need for efficient online-queries, conflict resolution, increased internal semantics, and robust expressions of relationships.

The third reason for choosing Riak I want to discuss is related to CAP – in that Riak prefers A (Availability) over C (Consistency). In contrast to a traditional relational database system, in which transactional semantics ensure that a datum will always be in a consistent state, Riak chooses to accept writes even if the state of the object has been changed by another client (in the case of a race-condition), or if the cluster was partitioned and the state of the object diverges. These architecture choices bring to the fore something we should have been considering all along — how should our applications deal with inconsistency? Riak lets you choose whether to let the “last one win” or to resolve the conflict in your application by automated or human-assisted means.

More mindful domain modeling

What’s the moral of these three stories? When modeling your data in Riak, you need to understand better the shape of your data. You can no longer rely on normalization, foreign key constraints, secondary indexes and transactions to make decisions for you.

Questions you might ask yourself when designing your schema:

  • Will my access pattern be read-heavy, write-heavy, or balanced?
  • Which datasets churn the most? Which ones require more sophisticated conflict resolution?
  • How will I find this particular type of data? Which method is most efficient?
  • How independent/interrelated is this type of data with this other type of data? Do they belong together?
  • What is an appropriate key-scheme for this data? Should I choose my own or let Riak choose?
  • How much will I need to do online queries on this data? How quickly do I need them to return results?
  • What internal structure, if any, best suits this data?
  • Does the structure of this data promote future design modifications?
  • How resilient will the structure of the data be if requirements change? How can the change be effected without serious interruption of service?

I like to draw up my domain concepts on a pad of unlined paper or a whiteboard with boxes and arrows, then figure out how they map onto the database. Ultimately, the concepts define your application, so get those solid before you even worry about Riak.

Thinking non-relationally

Once you’ve thought carefully about the questions described above, it’s time think about how your data will map to Riak. We’ll start from the small-scale in this post (single domain concepts) and work our way out in future installments.

Internal structure

For a single class of objects in your domain, let’s consider the structure of that data. Here’s where you’re going to decide two interrelated issues — how this class of data will be queried and how opaque its internal structure will be to Riak.

The first issue, how the data will be queried, depends partly on how easy it is to intuit the key of a desired object. For example, if your data is user profiles that are mostly private, perhaps the user’s email or login name would be appropriate for the key, which would be easy to establish when the user logs in. However, if the key is not so easy to determine, or is arbitrary, you will need map-reduce or link-walking to find it.

The second issue, how opaque the data is to Riak, is affected by how you query but also by the nature of the data you’re storing. If you need to do intricate map-reduce queries to find or manipulate the data, you’ll likely want it in a form like JSON (or an Erlang term) so your map and reduce functions can reason about the data. On the other hand, if your data is something like an image or PDF, you don’t want to shoehorn that into JSON. If you’re in the situation where you need both a form that’s opaque to Riak, and to be able to reason about it with map-reduce, have your application add relevant metadata to the object. These are created using X-Riak-Meta-* headers in HTTP or riak_object:update_metadata/2 in Erlang.

Rule of thumb: if it’s an abstract datatype, use a map-reduce-friendly format like JSON; if it’s a concrete form, use its original representation. Of course, there are exceptions to every rule, so think carefully about your modeling problem.

Consistency, replication, conflict resolution

The second issue I would consider for each type of data is the access pattern and desired level of consistency. This is related to the questions above of read/write loads, churn, and conflicts.

Riak provides a few knobs you can turn at schema-design time and at request-time that relate to these issues. The first is allow_mult, or whether to allow recording of divergent versions of objects. In a write-heavy load or where clients are updating the same objects frequently, possibly at the same time, you probably want this on (true), which you can change by setting the bucket properties. The tradeoffs are that the vector clock may grow quickly and your application will need to decide how to resolve conflicts.

The second knob you can turn is the n_val, or how many replicas of each object to store, also a per-bucket setting. The default value is 3, which will work for many applications. If you need more assurance that your data is going to withstand failures, you might increase the value. If your data is non-critical or in large chunks, you might decrease the value to get greater performance. Knowing what to choose for this value will depend on an honest assessment of both the value of your data and operational concerns.

The third knob you can turn is per-request quorums. For reads, this is the R request parameter: how many replicas need to agree on the value for the read to succeed (the default is 2). For writes, there are two parameters, W and DW. W is how many replicas need to acknowledge the write request before it succeeds (default is 2). DW (durable writes) is how many replica backends need to confirm that the write finished before the entire write succeeds (default is 0). If you need greater consistency when reading or writing your data, you’ll want to increase these numbers. If you need greater performance and can sacrifice some consistency, decrease them. In any case, your R, W, and DW values must be smaller than n_val if you want the request to succeed.

What do these have to do with your data model? Fundamentally understanding the structure and purpose of your data will help you determine how you should turn these knobs. Some examples:

  • Log data: You’ll probably want low R and W values so that writes are accepted quickly. Because these are fire-and-forget writes, you won’t need allow_mult turned on. You might also want a low n_val, depending on how critical your data is.
  • Binary files: Your n_val is probably the most significant issue here, mostly depending on how large your files are and how many replicas of them you can tolerate (storage consumption).
  • JSON documents (abstract types): The defaults will work in most cases. Depending on how frequently the data is updated, and how many you update within a single conceptual operation with the application, you may want to enable allow_mult to prevent blind overwrites.

Sean Cribbs