Tag Archives: eventual consistency

Relational to Riak, Part 1- High Availability

January 10, 2013

This is the first in a series of blog posts that discusses a high-level overview of the benefits and tradeoffs of Riak versus traditional relational databases. If this is relevant to your projects or applications, register for our “From Relational to Riak” webcast on January 24.

One of the biggest differences between Riak and relational systems is the focus on availability and how the underlying architecture deals with failure modes.

Most relational databases leverage a master/slave architecture to replicate data. This approach usually means the master coordinates all write operations, working with the slave nodes to update data. If the master node fails, the database will reject write operations until the failure is resolved – often involving failover or leader election – to maintain correctness. This can result in a window of write unavailability.

Master Slave Systems

Conversely, Riak uses a masterless system with no single point of failure, meaning any node can serve read or write requests. If a node experiences an outage, other nodes can continue to accept read and write requests. Additionally, if a node fails or becomes unavailable to the rest of the cluster due to a network partition, a neighboring node will take over responsibilities for the unavailable node. Once this node becomes available again, the neighboring node will pass over any updates through a process called “hinted handoff.” This is another way that Riak maintains availability and resilience even despite serious failure.

Riak Masterless

Because Riak’s system allows for reads and writes, even when multiple nodes are unavailable, and uses an eventually consistent design to maintain availability, in rare cases different replicas may contain different versions of an object. This can occur if multiple clients update the same piece of data at the exact same time or if nodes are down or laggy. These conflicts happen a statistically small portion of the time, but are important to know about. Riak has a number of mechanisms for detecting and resolving these conflicts when they occur. For more on how Riak achieves availability and the tradeoffs involved, see our documentation on the subject.

For many use cases today, high availability and fault tolerance are critical to the user experience and the company’s revenue. Unavailability has a negative impact on your revenue, damages user trust and leads to a poor user experience. For use cases such as online retail, shopping carts, advertising, social and mobile platforms or anything with critical data needs, high availability is key and Riak may be the right choice.

Sign up for the webcast here or read our whitepaper on moving from relational to Riak.

Basho

Rolling with Eventual Consistency

September 18, 2012

In a previous post I wrote about the different mindset that a software engineer should have when building for a key-value database as opposed to a relational database. When working with a relational database, you describe the model first and then query the data later. With a key-value database, you focus first on what you want the result of the query to look like, and then work backward toward a model.

As an example, I described averaging some value:

“In a SQL database, we can just call average() on that column, and it will compute the answer at query time. In a key-value store, we can add logic in the application layer to catch the object before it enters Riak, fetch the average value and number of included elements from Riak, compute the new rolling average, and save that answer back in Riak.”

Some questions came in about what that would specifically look like in code. The rest of this post will explore a solution that takes into account the distributed, eventually consistent nature of Riak.

Average the Scenario

Say we have a DataPoint object. An object of this type can be created, but for simplicity let us say that it is never changed or deleted. Every DataPoint has a value property which is an integer. In our application, we sometimes want the average of all of the value properties for all of the DataPoint objects. Oh, and let’s say we have trillions of DataPoint objects.

Notice that according to my previous definition, this usage scenario is ad-hoc because we don’t know exactly when the application needs the average, but it is not dynamic because we know that we will be querying for the average of the value property, and not the average of some property defined at query time.

Using the client library ripple, we might define the object as follows:

class DataPoint
include Ripple::Document
property :value, Integer, :presence => true
end

The ripple library stores objects as JSON behind the scenes. The following request sent to Riak:

curl http://127.0.0.1:8091/buckets/data_points/keys/somekey

might return something like:

{"value":23,"_type":"DataPoint"}

Pretty simple so far. Now how should we calculate the average? The naïve solution would be to read all of the DataPoint objects out, read their value property, and average them. This might work for small sets of objects, but remember we have trillions of them. Fetching all of that data for one simple query is not a realistic solution.

We know that we want the average at the application level. Riak is really good at serving single objects. So instead of querying for the average, we should simply fetch the average as its own data object in Riak. In ripple this might look like:

class Statistic
include Ripple::Document
property :average, Float, :presence => true
end

Now we just have to hook the two together. We can add a hook to the DataPoint object so that every time it is saved, it updates the Statistic object.

class DataPoint
include Ripple::Document
property :value, Integer, :presence => true
after_save :update_statistics

  def update_statistics
    id = 'data_point_document'
    statistic = StatisticDocument.find(id) || StatisticDocument.new
    statistic.key = id
    statistic.update_with self.value
  end
end

We need to define some new properties and the update_with method for the Statistic object.

class Statistic
include Ripple::Document
property :average, Float, :presence => true
property :count, Integer, :presence => true

  def update_with(value)
    self.average = (self.average * self.count + value) / (self.count + 1)
    self.count = self.count + 1
    self.save
  end
end

By storing the additional information of the number of objects in the count property, we have enough information to reconstruct a rolling average.

Fast Answers

Now whenever we want the average value, we simply fetch the one Riak object:

Statistic.find('data_point_document').average

Our answer comes back really fast, because it has been pre-computed.

But Wait!

The example above works great for one single-threaded application. To preserve the distributed and fault-tolerant features that Riak provides for us, we need to do a little more work.

Consider the following scenario. One copy of the application, let’s call it ClientA, is talking to the Riak cluster. Another, ClientB, is also talking to the cluster. ClientA and ClientB both want to add a DataPoint object. After saving their respective DataPoint objects, they both fetch the Statistic object, which currently has average property set to 10.0, and compute a new average. ClientA thinks the new average property is 12.0. But ClientB added a different value, so it thinks the new average property is 9.0. Both save to the Riak cluster, and we have a classic race condition. Which client wins? It doesn’t matter, because both answers are wrong. Both fail to take into account the DataPoint object handled by the other client.

To get the correct answer, let’s separate the clients. Both clients can save to the same object, but we will partition the data within the object. Formerly the json object looked like:

{"average":10.0,"count":4,"_type":"Statistic"}

We want it to look more like:

{"client_data":{"ClientA":{"average":10.0,"count":4},"ClientB":{"average":10.0,"count":4}}"_type":"Statistic"}

The data model is more complicated, but we now have enough information to compute the correct answer. We will change the model so that ClientA will only change the “ClientA” portion of the data object, and ClientB will only change the “ClientB” portion of the object. The application will know when we ask for the answer to compute the average across all clients.

The Statistic object now looks something like this:

class Statistic
include Ripple::Document
property :client_data, Hash, :presence => true

  def update_with(value)
    self.reload
    self.client_data ||= {}
    statistic = self.client_data[Client.id] || {'average' => 0.0, 'count' => 0}
    statistic['average'] = (statistic['average'] * statistic['count'] + value) / (statistic['count'] + 1)
    statistic['count']   = (statistic['count'] + 1)
    self.client_data[Client.id] = statistic
    self.save
  end

  def count
    self.client_data.map{|h| h[1]['count']}.inject(0, &:+)
  end

  def average
    self.client_data.map{|h| h[1]['count'] * h[1]['average']}.inject(0, &:+).to_f / self.count
  end
end

The method Client.id in the above can be set in a couple of different ways. In this case, we set it using an environment variable passed in at runtime. We rely on a single thread per application to ensure that each client has a single, consistent client identifier, and that it always writes to its own portion of the data object.

We also make sure that we always fetch the object before we write to it, so that this client is always working with the latest copy of the object and not a stale one that was updated somewhere else in the application.

But Wait [Again]!

We solved part of the problem by keeping each client out of the other’s business, but we still have a race condition when the clients try to save the same Statistic object to the cluster. If Riak is operating on a last-write-wins principle, then whichever client has the later timestamp is going to overwrite the other. That gives us the wrong answer, and that won’t do.

We can rely on Riak’s vector clocks to solve this problem. First, we set the allow_mult property on the bucket for Statistic objects so that Riak will store both copies of an object when two come in during a race condition.
Statistic.bucket.allow_mult = true

For reasons outside of the scope of this post, we also want to make sure that we have a PR and a PW quorum greater than N. This helps us ensure that our answer isn’t misread from a fallback node when we reload the Statistic object. N is 3 by default, so the following does the trick by setting PR and PW both to 2.
Statistic.bucket.props = Statistic.bucket.props.merge(:pr => 2, :pw => 2)

When ripple reads an object, it gets a vector clock. When it saves the object, it sends back the same vector clock. Every time Riak changes an object, it changes the vector clock in such a way that it preserves the clock’s lineage. So if Riak gets back a different vector clock with an object than the one it currently has, it knows it has a collision and saves both values as siblings.

Our race condition plays out. We now have two sibling objects in Riak stored under the same key: one is up-to-date for ClientA, and the other is up-to-date for ClientB. The truth is somewhere in between.

The next time ripple reads the Statistic object, it gets back both objects with a new vector clock for the set. We now have to resolve the conflict to find the true answer. We know that ClientA‘s true answer is going to have the highest count property for “ClientA”, and we know that ClientB‘s true answer is going to have the highest count property for “ClientB”. Since we know that only one copy of a given client was writing at a time [single-threaded], we know that only the client data with the highest count is correct.

We can find the true answer by merging the siblings and comparing the data one client at a time. For each client, we always take the value with the highest count property and throw away the smaller counts, which represent older values.

When ripple saves back the object, it sends the new vector clock as well so that Riak knows to replace the old siblings with this new resolved object. We handle this with the on_conflict method in ripple.

class StatisticDocument
include Ripple::Document
property :client_data, Hash, :presence => true

  def update_with(value)
    self.reload
    self.client_data ||= {}
    statistic = self.client_data[Client.id] || {'average' => 0.0, 'count' => 0}
    statistic['average'] = (statistic['average'] * statistic['count'] + value).to_f / (statistic['count'] + 1)
    statistic['count']   = (statistic['count'] + 1)
    self.client_data[Client.id] = statistic
    self.save
  end

  def count
    self.client_data.map{|h| h[1]['count']}.inject(0, &:+)
  end

  def average
    self.client_data.map{|h| h[1]['count'] * h[1]['average']}.inject(0, &:+).to_f / self.count
  end

  on_conflict do |siblings, c|
    resolved = {}
    siblings.reject!{|s| s.client_data == nil}
    siblings.each do |sibling|
      resolved.merge! sibling.client_data do |client_id, resolved_value, sibling_value|
        resolved_value['count'] > sibling_value['count'] ? resolved_value : sibling_value
      end
    end
    self.client_data = resolved
  end
end

Voila! We now have a rolling average that gracefully handles race conditions caused by the distributed nature of a Riak cluster.

A working example of this solution is available in my riak-rolling-average repo. As usual, the good stuff is in the tests. You can run the tests with bundle exec rake CLIENT=someclient where someclient is unique to that Ruby thread.

Note that test_example.rb runs five clients concurrently by shelling out to five rake tasks. Each rake task loads in a pre-defined chunk of data from the data file. This causes plenty of race conditions, which is what we want to simulate. We still get the correct answer.

The Path Not Traveled

If you aren’t familiar with my previous advice on approaching key-value architecture, you might be tempted to solve a use case like the average described above using Riak’s map/reduce functionality. There are several reasons why the solution above would be preferred, but let us play the devil’s advocate and explore the map/reduce solution.

You can view the entire code in the repo below, but if we solve this using javascript map/reduce functions, then we can extract the data from each DatePoint object in the following map phase:

function(riakObject){
match = riakObject.values[0].data.match(/\"value\":([\\d]+)/);
if(match){
return [[parseInt(match[1]), 1]];
} else {
return [null];
}
}

Then compute the average by combining the results together in the following reduce phase:

function(values){
var sum = 0.0;
var count = 0;
for(i=0; i<values.length; i++){
value = values[i];
if(value){
sum = sum + (value[0] * value[1]);
count = count + value[1];
}
}
if(count > 0) {
return [[(sum / count), count]];
} else {
return [];
}
}

If you look in the mapreduce branch of the riak-rolling-average repo, you will find the code for this example. On my local laptop, which I admit is not optimized for this type of operation, it takes about 3 seconds to fetch the answer with map/reduce searching over 5000 DataPoint objects. [A compiled Erlang map/reduce function would perform much better.] It only took a few milliseconds to fetch the answer using the pre-computing code.

Even if I did have a more powerful cluster on which to run this map/reduce, recall that we have trillions of DataPoint objects. Each object must be fetched from the key-value store and pulled into the javascript virtual machine for processing. This is essentially equivalent to the naïve approach described earlier, but performed on the server instead of the application client. If multiple users initiate the same map/reduce, contention for resources would quickly overwhelm the cluster’s hardware. In practice, we generally reserve Riak’s map/reduce for data migrations and offline analysis although there are exceptions to that guideline.

Conceptually, the map/reduce solution might be simpler to architect and it might seem more intuitive to people from an RDBMS background. Admittedly, the recommended solution above uses more resources during writes, opens the possibility of number of clients and vector clocks expanding the object size and complexity, and generally requires a more complex model; however, it also provides a much more performant answer and is more suitable to the eventual consistency and distributed nature of Riak.

Other Issues

Note that I did not address bootstrapping the average when DataPoint objects already exist, nor handling deletes or updates to existing DataPoint objects.

Happy Coding

Some of the ways we need to think about architecture problems when we write applications for Riak might not be intuitive at first, because the same problems are already solved by convention in the RDBMS world. Some of this shift in mindset is necessitated by the key-value nature of Riak. As the rolling average example here demonstrates, some concerns that we need to address in the application architecture are brought about by the distributed, eventually consistent nature of Riak as well. This shift in mindset is a tradeoff that you can choose to make for the sake of high availability, fault tolerance, horizontal scaling, and other nice features that Riak provides. If you enjoy learning new things to take advantage of new capabilities, then I’d wager you will enjoy developing applications with Riak.

Casey

Related

  • A working example of this code is available in my riak-rolling-average repo. Check out the mapreduce branch for the production-unfriendly version using Riak’s map/reduce.
  • Vector Clocks are Easy
  • Vector Clocks are Hard
  • Meangirls provides serializable data types for eventually consistent systems, similar to but more comprehensive than my example.
  • Hanover also provides eventually consistent data types.
  • The Statistic object is an eventually consistent data type, an example of a CRDT: Convergent or Commutative Replicated Data Type. Read the comprehensive research paper on CRDT.
  • Bryce Kerley speaking about CRDTs at SRCFringe Edinburgh 2012
  • I put some hours into a gem ripple-statistics which adds this and other simple calculations to the ripple library. Note the limitations of the current code there.

The Second Edition of Riak Handbook Available Now for Download


Basho Technologies today announced the immediate availability of the second edition of Riak Handbook.

CAMBRIDGE, MA – June 1, 2012Basho Technologies today announced the immediate availability of the second edition of Riak Handbook. The significantly updated Riak Handbook includes more than 43 pages of new content covering many of the latest feature enhancements to Riak, Basho’s industry-leading, open-source, distributed database. Riak Handbook is authored by former Basho developer and advocate, Mathias Meyer.

Riak Handbook is a comprehensive, hands-on guide to Riak. The initial release of Riak Handbook focused on the driving forces behind Riak, including Amazon Dynamo, eventual consistency and CAP Theorem. Through a collection of examples and code, Mathias’ Riak Handbook explores the mechanics of Riak, such as storing and retrieving data, indexing, searching and querying data, and sheds a light on Riak in production. The updated handbook expands on previously covered key concepts and introduces new capabilities, including the following:

  • An overview of Riak Control, a new Web-based operations management tool
  • Full coverage on pre- and post-commit hooks, including JavaScript and Erlang examples
  • An entirely new section on deploying Erlang code in a Riak cluster
  • Additional details on secondary indexes
  • Insight into load balancing Riak nodes
  • An introduction to network node planning
  • An introduction to Riak CS, includes Amazon S3 API compatibility

The updated Riak Handbook includes an entirely new section dedicated to popular use cases and is full of examples and code from real-time usage scenarios.

Mathias Meyer is an experienced software developer, consultant and coach from Berlin, Germany. He has worked with database technology leaders such as Sybase and Oracle. He entered into the world of NoSQL in 2008 and joined Basho Technologies in 2010.

About Basho Technologies
Basho Technologies is the leader in highly-available, distributed database technologies used to power scalable, data-intensive Web, mobile, and e-commerce applications and large cloud computing platforms. Basho customers, including fast-growing Web businesses and large Fortune 500 enterprises, use Riak to implement content delivery platforms and global session stores, to aggregate large amounts of data for logging, search, and analytics, to manage, store and stream unstructured data, and to build scalable cloud computing platforms.

Riak is available open source for download at http://wiki.basho.com/Riak.html. Riak EnterpriseDS is available with advanced replication, services and 24/7 support. Riak CS enables mutli-tenant object storage with advanced reporting and an Amazon S3 compatible API. For more information visit www.basho.com or follow us on Twitter at www.twitter.com/basho.

Why Your Riak Cluster Should Have At Least Five Nodes

April 26, 2012

Here at Basho we want to make sure that your Riak implementations are set up from the beginning to succeed. While you can use the Riak Fast Track to quickly set up a 3-node dev/test environment, we recommend that all production deployments use a minimum of 5 nodes, ensuring you benefit from the architectural principles that underpin Riak’s availability, fault-tolerance and scaling properties.

TL;DR: Deployments of five nodes or greater will provide a foundation for the best performance and growth as the cluster expands. Since Riak scales linearly with the addition of more nodes, users find improved performance, reliability, and throughput with larger clusters. Smaller deployments can compromise the fault-tolerance of the system: with a “sane” replication requirement for availability (we default to three copies), node failures in smaller clusters mean that replication requirements may not be met. This can result in degraded performance and risk of data loss. Additionally, clusters smaller than five nodes mean that with a sane replication requirement of 3, a high percentage (75-100% of the nodes) will need to respond to each request, putting undue load on the cluster that may degrade performance.

Let’s take a closer look in the scenario of a three- and four-node cluster.

Performance and Fault Tolerance Concerns in a 3-Node Cluster

To ensure that the cluster is always available to respond to read and write requests, Basho recommends a “sane default” for data replication: three copies of the data on three different nodes. The default configuration of Riak requires four nodes at minimum to insure no single node holds more than one copy of any particular piece of data. (In future versions of Riak we’ll be able to guarantee that each replica is living on a separate physical node. At this point it’s almost at 100%, but we won’t tell you it’s guaranteed until it is.) While it is possible to change the settings to ensure that the three replicas are on distinct nodes in a three node cluster, you still run into issues of replica placement during a node failure or network partition.

In the event of node failure or a network partition in a three-node cluster, the default requirement for replication remains three but there are only two nodes available to service requests. This will result in degraded performance and carries a risk of data loss.

Performance and Fault Tolerance Concerns in a 4-Node Cluster

With a requirement of three replicas, any one request for a particular piece of data from a 4-node cluster will require a response from 75 – 100% of the nodes in the cluster, which may result in degraded performance. In the event of node failure or a network partition in a 4-node cluster, you are back to the issues we outline above.

What if I want to change the replication default?

If using a different data replication number is right for your implementation, just make sure to use a cluster of N +2 nodes where N is the number of replicas for the reasons outlined above.

Going With 5 Nodes

As you add nodes to a Riak cluster that starts with 5 nodes, the percentage of the cluster required to service each request goes down. Riak scales linearly and predictably from this point on. When a node is taken out of service or fails, the number of nodes remaining is large enough to protect you from data loss.

So do your development and testing with smaller clusters, but when it comes to production, start with five nodes.

Happy scaling.

Shanley

How Eventual is Eventual Consistency?

March 2, 2012

The second BashoChats Meetup was held last week at BashoWest. The office was packed with area developers and our two speakers, Ted Nyman and Peter Bailis, each delivered exceptional talks. Our awesome videographer Matt Fisher finished Peter’s talk first and it’s so good that we didn’t see any reason to keep it from you while he put the final touches on Ted’s.

Peter is Graduate Student in the much-heralded Berkeley CS department. Suffice it to say that we were honored to have him at BashoChats. He and some colleagues have been working on something called Probabilistically Bounded Staleness for Practical Partial Quorums (PBS). In short, PBS aims to define just how eventual “eventual consistency” is, and their research produced some fascinating findings that should affect how people view and deploy distributed databases like Riak, Cassandra and Voldemort.

This talk, the subject matter, and the presenter are all fantastic. Watch it twice and tell three friends about it. (The PDF version of the slides are here for any interested parties.)

We’ll have Ted Nyman’s talk up next week. In the meantime, join BashoChats so you can be a part of the next event.

Mark

New Riak Handbook Available Now for Download


Former Basho Developer Advocate Mathias Meyer authors a comprehensive, hands-on guide to Riak.

CAMBRIDGE, MA – January 17, 2012 – Basho Technologies, the leader in highly-available, distributed data store technologies, today announced that former Basho developer advocate Mathias Meyer has completed Riak Handbook, a comprehensive, hands-on guide to Riak, Basho’s industry-leading, open source, distributed database.

Riak Handbook begins by exploring the driving forces behind Riak, including Amazon Dynamo, eventual consistency and CAP Theorem. Through a collection of examples and code, Mathias Riak Handbook walks through Riaks many features in detail including the following capabilities:

  • How to store-and-retrieve data in Riak
  • Analyze data with MapReduce using JavaScript and Erlang
  • Build and search full-text indexes with Riak Search
  • Index and query data using secondary indexes
  • Model data for eventual consistency
  • Scale to multi-node clusters in less than five minutes
  • Operate Riak in production
  • Handle failures in your application

Mathias Meyer is an experienced software developer, consultant and coach from Berlin, Germany. He has worked with database technology leaders such as Sybase and Oracle. He entered into the world of NoSQL in 2008 and worked at Basho Technologies from 2010 to 2011.

“We are excited that Mathias took on the endeavor to build a comprehensive book all about Riak,” said John Hornbeck, Vice President of Client Services, Basho Technologies. “Our customers and community will benefit from having a single source that covers everything from setting up Riak, to scaling out quickly, to operating and maintaining Riak. We have already seen strong customer interest in Riak Handbook, including many seeking site licenses to outfit their entire teams.”

Riak Handbook is available for purchase at riakhandbook.com. Single editions are available at $29/download. Site licenses are available for organizations implementing Riak for only $249.

About Basho Technologies
Basho Technologies is the leader in highly-available, distributed data store technologies used to power scalable, data-intensive Web, mobile and e-commerce applications. Our flagship product, Riak, frees customer applications from the performance, scalability, and availability constraints of traditional databases while reducing overall storage and support costs by up to 80%. Basho customers, including fast-growing Web businesses and large Fortune 500 enterprises, use Riak to implement global session stores, to aggregate large amounts of data for logging, search, and analytics, and to manage, store and stream unstructured data.

Riak is available open source for download at basho.com/resources/downloads. Riak EnterpriseDS is available with advanced replication, services and 24/7 support. For more information visit basho.com or follow us on Twitter at www.twitter.com/basho.

Basho Technologies is based in Cambridge, MA, and maintains regional offices in San Francisco, CA and Reston, VA.

statebox, an eventually consistent data model for Erlang (and Riak)

May 13, 2011

This was originally posted by Bob Ippolito on May 9th on the Mochi Media Labs Blog. If you’re planning to comment, please do so on the original post.

A few weeks ago when I was on call at work I was chasing down a bug in friendwad [1] and I realized that we had made a big mistake. The data model was broken, it could only work with transactions but we were using Riak. The original prototype was built with Mnesia, which would’ve been able to satisfy this constraint, but when it was refactored for an eventually consistent data model it just wasn’t correct anymore. Given just a little bit of concurrency, such as a popular user, it would produce inconsistent data. Soon after this discovery, I found another service built with the same invalid premise and I also realized that a general solution to this problem would allow us to migrate several applications from Mnesia to Riak.

When you choose an eventually consistent data store you’re prioritizing availability and partition tolerance over consistency, but this doesn’t mean your application has to be inconsistent. What it does mean is that you have to move your conflict resolution from writes to reads. Riak does almost all of the hard work for you [2], but if it’s not acceptable to discard some writes then you will have to set allow_mult to true on your bucket(s) and handle siblings [3] from your application. In some cases, this might be trivial. For example, if you have a set and only support adding to that set, then a merge operation is just the union of those two sets.

statebox is my solution to this problem. It bundles the value with repeatable operations [4] and provides a means to automatically resolve conflicts. Usage of statebox feels much more declarative than imperative. Instead of modifying the values yourself, you provide statebox with a list of operations and it will apply them to create a new statebox. This is necessary because it may apply this operation again at a later time when resolving a conflict between siblings on read.

Design goals (and non-goals):

  • The intended use case is for data structures such as dictionaries and sets
  • Direct support for counters is not required
  • Applications must be able to control the growth of a statebox so that it does not grow indefinitely over time
  • The implementation need not support platforms other than Erlang and the data does not need to be portable to nodes that do not share code
  • It should be easy to use with Riak, but not be dependent on it (clear separation of concerns)
  • Must be comprehensively tested, mistakes at this level are very expensive
  • It is ok to require that the servers’ clocks are in sync with NTP (but it should be aware that timestamps can be in the future or past)

Here’s what typical statebox usage looks like for a trivial application (note: Riak metadata is not merged [5]). In this case we are storing an orddict in our statebox, and this orddict has the keys following and followers.

“`erlang
-module(friends).
-export([add_friend/2, get_friends/1]).

-define(BUCKET, <<”friends”>>).
-define(STATEBOX_MAX_QUEUE, 16). %% Cap on max event queue of statebox
-define(STATEBOX_EXPIRE_MS, 300000). %% Expire events older than 5 minutes
-define(RIAK_HOST, “127.0.0.1″).
-define(RIAK_PORT, 8087).

-type user_id() :: atom().
-type orddict(T) :: [T].
-type ordsets(T) :: [T].
-type friend_pair() :: {followers, ordsets(user_id())} |
{following, ordsets(user_id())}.

-spec add_friend(user_id(), user_id()) -> ok.
add_friend(FollowerId, FolloweeId) ->
statebox_riak:apply_bucket_ops(
?BUCKET,
[{[friend_id_to_key(FollowerId)],
statebox_orddict:f_union(following, [FolloweeId])},
{[friend_id_to_key(FolloweeId)],
statebox_orddict:f_union(followers, [FollowerId])}],
connect()).

-spec get_friends(user_id()) -> [] | orddict(friend_pair()).
get_friends(Id) ->
statebox_riak:get_value(?BUCKET, friend_id_to_key(Id), connect()).

%% Internal API

connect() ->
{ok, Pid} = riakc_pb_client:start_link(?RIAK_HOST, ?RIAK_PORT),
connect(Pid).

connect(Pid) ->
statebox_riak:new([{riakc_pb_client, Pid},
{max_queue, ?STATEBOX_MAX_QUEUE},
{expire_ms, ?STATEBOX_EXPIRE_MS},
{from_values, fun statebox_orddict:from_values/1}]).

friend_id_to_key(FriendId) when is_atom(FriendId) ->
%% NOTE: You shouldn’t use atoms for this purpose, but it makes the
%% example easier to read!
atom_to_binary(FriendId, utf8).
“`

To show how this works a bit more clearly, we’ll use the following sequence of operations:

“`erlang
add_friend(alice, bob), %% AB
add_friend(bob, alice), %% BA
add_friend(alice, charlie). %% AC
“`

Each of these add_friend calls can be broken up into four separate atomic operations, demonstrated in this pseudocode:

“`erlang
%% add_friend(alice, bob)
Alice = get(alice),
put(update(Alice, following, [bob])),
Bob = get(bob),
put(update(Bob, followers, [alice])).

“`

Realistically, these operations may happen with some concurrency and cause conflict. For demonstration purposes we will have AB happen concurrently with BA and the conflict will be resolved during AC. For simplicity, I’ll only show the operations that modify the key for
alice.

“`erlang
AB = get(alice), %% AB (Timestamp: 1)
BA = get(alice), %% BA (Timestamp: 2)
put(update(AB, following, [bob])), %% AB (Timestamp: 3)
put(update(BA, followers, [bob])), %% BA (Timestamp: 4)
AC = get(alice), %% AC (Timestamp: 5)
put(update(AC, following, [charlie])). %% AC (Timestamp: 6)
“`

Timestamp 1:

There is no data for alice in Riak yet, so
statebox_riak:from_values([]) is called and we get a statebox
with an empty orddict.

“`erlang
Value = [],
Queue = [].
“`

Timestamp 2:

There is no data for alice in Riak yet, so
statebox_riak:from_values([]) is called and we get a statebox
with an empty orddict.

“`erlang
Value = [],
Queue = [].
“`

Timestamp 3:

Put the updated AB statebox to Riak with the updated value.

“`erlang
Value = [{following, [bob]}],
Queue = [{3, {fun op_union/2, following, [bob]}}].
“`

Timestamp 4:

Put the updated BA statebox to Riak with the updated value. Note
that this will be a sibling of the value stored by AB.

“`erlang
Value = [{followers, [bob]}],
Queue = [{4, {fun op_union/2, followers, [bob]}}].
“`

Timestamp 5:

Uh oh, there are two stateboxes in Riak now… so
statebox_riak:from_values([AB, BA]) is called. This will apply
all of the operations from both of the event queues to one of the
current values and we will get a single statebox as a result.

“`erlang
Value = [{followers, [bob]},
{following, [bob]}],
Queue = [{3, {fun op_union/2, following, [bob]}},
{4, {fun op_union/2, followers, [bob]}}].
“`

Timestamp 6:

Put the updated AC statebox to Riak. This will resolve siblings
created at Timestamp 3 by BA.

“`erlang
Value = [{followers, [bob]},
{following, [bob, charlie]}],
Queue = [{3, {fun op_union/2, following, [bob]}},
{4, {fun op_union/2, followers, [bob]}},
{6, {fun op_union/2, following, [charlie]}}].
“`

Well, that’s about it! alice is following both bob and charlie despite the concurrency. No locks were harmed during this experiment, and we’ve arrived at eventual consistency by using statebox_riak, statebox, and Riak without having to write any conflict resolution code of our own.

Bob

And if you’re at all interested in getting paid to do stuff like this, Mochi is hiring.

References

[1] friendwad manages our social graph for Mochi Social and MochiGames.
It is also evidence that naming things is a hard problem in
computer science.
[2] See Basho’s articles on Why Vector Clocks are Easy and
Why Vector Clocks are Hard.
[3] When multiple writes happen to the same place and they have
branching history, you’ll get multiple values back on read.
These are called siblings in Riak.
[4] An operation F is repeatable if and only if F(V) = F(F(V)).
You could also call this an idempotent unary operation.
[5] The default conflict resolution algorithm in statebox_riak
chooses metadata from one sibling arbitrarily. If you use
metadata, you’ll need to come up with a clever way to merge it
(such as putting it in the statebox and specifying a custom
resolve_metadatas in your call to statebox_riak:new/1).

From Relational to Riak (Webcast)

**January 02, 2013**

New to Riak? Thinking about using Riak instead of a relational database? Join Basho chief architect Andy Gross and director of product management Shanley Kane for an intro this Thursday (11am PT/2pm ET). In about 30 minutes, we’ll cover the basics of:

* Scalability benefits of Riak, including an examination of limitations around master/slave architectures and sharding, and what Riak does differently
* A look at the operational aspects of Riak and where they differ from relational approaches
* Riak’s data model and benefits for developers, as well as the tradeoffs and limitations of a key/value approach
* Migration considerations, including where to start when migrating existing apps
* Riak’s eventually consistent design
* Multi-site replication options in Riak

Register for the webcast [here](http://info.basho.com/RelationalToRiakJan3.html).

[Shanley](http://twitter.com/shanley)

[Andy](https://twitter.com/argv0)

Planning for Eventual Consistency

May 14, 2010

You may remember that last week, we recorded a podcast with Benjamin Black all about the immense variety of databases in the NoSQL space and what your criteria should be when choosing one.

If you listened carefully, you may also remember that Benjamin and Justin Sheehy started to discuss eventual consistency. We decided to roll that into its own podcast as we thought it was a topic worthy of its own episode.

Think there are only a certain subset of databases that are “eventually consistent”? Think again. Regardless of the database you choose, eventual consistency is something you should embrace and plan for, not fear.

Listen, Learn, and Enjoy – -

Mark


If you are having problems getting the podcast to play, click here to play in new window or right click to download the podcast.

Toward A Consistent, Fact-based Comparison of Emerging Database Technologies

A Muddle That Slows Adoption

Basho released Riak as an open source project seven months ago and began commercial service shortly thereafter. As new entrants into the loose collection of database projects we observed two things:

  1. Widespread Confusion — the NoSQL rubric, and the decisions of most projects to self-identify under it, has created a false perception of overlap and similarity between projects differing not just technically but in approaches to licensing and distribution, leading to…
  2. Needless Competition — driving the confusion, many projects (us included, for sure) competed passionately (even acrimoniously) for attention as putative leaders of NoSQL, a fool’s errand as it turns out. One might as well claim leadership of all tools called wrenches.

The optimal use cases, architectures, and methods of software development differ so starkly even among superficially similar projects that to compete is to demonstrate a (likely pathological) lack of both user needs and self-knowledge.

This confusion and wasted energy — in other words, the market inefficiency — has been the fault of anyone who has laid claim to, or professed designs on, the NoSQL crown.

  1. Adoption suffers — Users either make decisions based on muddled information or, worse, do not make any decision whatsoever.
  2. Energy is wasted — At Basho we spent too much time from September to December answering the question posed without fail by investors and prospective users and clients: “Why will you ‘win’ NoSQL?”

With the vigor of fools, we answered this question, even though we rarely if ever encountered another project’s software in a “head-to-head” competition. (In fact, in the few cases where we have been pitted head-to-head against another project, we have won or lost so quickly that we cannot help but conclude the evaluation could have been avoided altogether.)

The investors and users merely behaved as rational (though often bemused) market actors. Having accepted the general claim that NoSQL was a monolithic category, both sought to make a bet.

Clearly what is needed is objective information presented in an environment of cooperation driven by mutual self-interest.

This information, shaped not by any one person’s necessarily imperfect understanding of the growing collection of data storage projects but rather by all the participants themselves, would go a long way to remedying the inefficiencies discussed above.

Demystification through data, not marketing claims

We have spoken to representatives of many of the emerging database projects. They have enthusiastically agreed to participate in a project to disclose data about each project. Disclosure will start with the following: a common benchmark framework and benchmarks/load tests modeling common use cases.

    1. A Common Benchmark Framework — For this collaboration to succeed, no single aspect will impact success or failure more than arriving at a common benchmark framework.

At Basho we have observed the proliferation of “microbenchmarks,” or benchmarks that do not reflect the conditions of a production environment. Benchmarks that use a small data set, that do not store to disk, that run for short (less than 12 hours) durations, do more to confuse the issue for end users than any single other factor. Participants will agree on benchmark methods, tools, applicability to use cases, and to make all benchmarks reproducible.

Compounding the confusion is when benchmarks are used for comparison of different use cases or was run on different hardware and yet compared head-to-head as if the tests or systems were identical. We will seek to help participants run equivalent on the various databases and we will not publish benchmark results that do not profile the hardware and configuration of the systems.

  1. Benchmarks That Support Use Cases — participants agree to benchmark their software under the conditions and with load tests reflective of use cases they commonly see in their user base or for which they think their software is best suited.
  2. Dissemination to third-parties — providing easy-to-find data to any party interested in posting results.
  3. Honestly exposing disagreement — Where agreement cannot be reached on any of the elements of the common benchmarking efforts, participants will respectfully expose the rationales for their divergent positions, thus still providing users with information upon which to base decisions.

There is more work to be done but all participants should begin to see the benefits: faster, better decisions by users.

We invite others to join, once we are underway. We, and our counterparts at other projects, believe this approach will go a long way to removing the inefficiencies hindering adoption of our software.

Tony and Justin