Tag Archives: availability

Riak in Production – Retail and eCommerce Stories

January 29, 2013

This is the first in a series of blog posts covering the benefits Riak offers to developers and operators of retail and eCommerce platforms. To learn more, join our “Retail on Riak” webcast on Friday, February 8th.

As retailers grow and have to store more and more data, traditional relational databases aren’t always the best option. Retailers want to scale easily, without the operational burden of manual sharding. Meanwhile, business requirements demand their data is always available for reads and writes. Riak is a highly available, low latency distributed database that is ideal for retailers who need to serve product data quickly and maintain “always on” shopping experiences. Riak is based on architectural principles from Amazon. Riak is designed for high availability and scale so retailers can always serve customers, even under failure conditions, and rapidly grow to meet peak loads.

Retailers of all sizes have chosen Riak to power parts of their business, including:

  • Best Buy: Best Buy is North America’s top specialty retailer of consumer electronics, personal computers, entertainment software, and appliances. Riak has been an integral part in the transformation push to re-platform Best Buy’s eCommerce platform. For more info, check out Best Buy’s talk from our 2012 developer conference, RICON.
  • ideeli: ideeli is one of the fastest growing retailers with over 5 million members and more than 1,000 brand partners. They use Riak to serve HTML documents and user-specific products. ideeli chose Riak to power their event-based shopping experience due to Riak’s ability to serve users information at low latency and provide ease of use and scale to ideeli’s operations team. Check out the complete case study for more details.
  • Copious: Copious is a social commerce marketplace that makes it easy for people to buy and sell the things they love. They currently store all registered accounts in Riak as well as the tokens that make it possible for users to authenticate with Copious via their Facebook or Twitter accounts. They chose to use Riak for their social login functionality because of its operational simplicity, which allows them to easily scale up without sharding and provides the high availability required for a smooth user experience. For more details, check out the complete Copious story on our blog.

For more information about the benefits of Riak for retailers and the retailers already using it, register for our “Retail on Riak” webcast on February 8th!

Basho

Riak on Engine Yard

January 25, 2013

Today we’re excited to introduce early access of Riak on Engine Yard! You can also learn more on the Engine Yard blog. With Riak on Engine Yard, you can deploy a Riak cluster as simply as defining some configuration values and clicking “Add Cluster.”

A common theme, in several of our recent blog posts, has been Basho’s key focus on ease of deployment. We excel in making highly available, low latency, distributed systems. Engine Yard’s strengths lie in providing a hardened and secure Platform as a Service where you can manage your entire platform while retaining control of the environment. In addition, Engine Yard is well known for its contributions to the Ruby, PHP, and Node.js communities. The introduction of Riak on Engine Yard further validates customer demand for reliable and easy to use cloud solutions.

If you were at Ricon2012, you were probably one of the many who attended a talk entitled “Riak in the Cloud.” If you were unable to attend, you missed an amazing session where Ines Sombra and Michael Broadhead from Engine Yard spoke about their experiences with Riak and deploying it in the cloud. It’s great to see the lessons of distributed systems that were discussed translated into reality.

We look forward to seeing what the Basho community builds using Riak on Engine Yard. Get started now with 500 hours for free on their platform.

Basho

Riak for Retail and eCommerce Platforms

January 22, 2013

Traditionally, most retailers have used relational databases to manage their platforms and eCommerce sites. However, with the rapid growth of data and business requirements for high availability and scale, more retailers are looking at non-relational solutions like Riak.

Riak is a masterless, distributed database that provides retailers with high read and write availability, fault-tolerance and the ability to grow with low operational cost. Architectural, operational and development benefits for retailers include:

  • “Always On” Shopping Experience: Based on architectural principles from Amazon, Riak is designed to favor data availability, even in the event of hardware failure or network partition. For retailers, failure to accept additions to a shopping cart, or serve product information quickly, has a direct and negative impact on revenue. Riak is architected to ensure the system can always accept writes and serve reads at low-latency.
  • Resilient Infrastructure: At scale, hardware malfunction, network partition, and other failure modes are inevitable. Riak provides a number of mechanisms to ensure that retail infrastructure is resilient to failure. Data is replicated automatically within the cluster so nodes can go down but the system still responds to requests. This ensures read and write availability, even in serious failure conditions.
  • Low-Latency Data Storage: Many retailers now operate online and mobile experiences with an API or data services platform. In order to provide a fast and available experience to end users, Riak is designed to serve predictable, low-latency requests as part of a service-oriented infrastructure and is accessible via HTTP API, protocol buffers, or Riak’s many client libraries.
  • Scale to Peak Loads with Low Operational Cost: During major holidays and other periods of peak load, retailers may have to significantly increase their database capacity quickly. When new nodes are added, Riak automatically distributes data evenly to naturally prevent hot spots in the database, and yields a near-linear increase in performance and throughput when capacity is added.
  • Global Data Locality and Redundancy: Riak Enterprise’s multi-site replication allows replication of data to multiple data centers, providing both a global data footprint and the ability to survive datacenter failure.

Top retailers using Riak include Best Buy and ideeli. Best Buy selected Riak as an integral part in the transformation push to re-platform its eCommerce platform. For more information about how Best Buy is using Riak, check out this video.

ideeli uses Riak to serve HTML documents and user-specific products. ideeli chose Riak to provide its highly available, event-based shopping experience – Riak gives them the ability to serve user information at low latency and provides ease of use and scale to ideeli’s operations team. For more information on ideeli’s use of Riak check out the complete case study.

Common use cases for Riak in the retail/eCommerce space include shopping carts (due to Riak’s “always-on” capabilities), product catalogs (Riak is well suited for the storage of rapidly growing content that needs to be served at low-latency), API platforms (Riak’s flexible, schemaless design allows for rapid application development), and mobile applications (Riak is ideal for powering mobile experiences across platforms due to its low-latency, always-available small object storage capabilities).

To help retailers evaluate and adopt Riak, we’ve published a technical overview: “Retail on Riak: A Technical Introduction.” We discuss more in-depth information on modeling applications for common use cases, switching from a relational architecture, querying, multi-site replication and more.

Basho

Relational to Riak, Part 1- High Availability

January 10, 2013

This is the first in a series of blog posts that discusses a high-level overview of the benefits and tradeoffs of Riak versus traditional relational databases. If this is relevant to your projects or applications, register for our “From Relational to Riak” webcast on January 24.

One of the biggest differences between Riak and relational systems is the focus on availability and how the underlying architecture deals with failure modes.

Most relational databases leverage a master/slave architecture to replicate data. This approach usually means the master coordinates all write operations, working with the slave nodes to update data. If the master node fails, the database will reject write operations until the failure is resolved – often involving failover or leader election – to maintain correctness. This can result in a window of write unavailability.

Master Slave Systems

Conversely, Riak uses a masterless system with no single point of failure, meaning any node can serve read or write requests. If a node experiences an outage, other nodes can continue to accept read and write requests. Additionally, if a node fails or becomes unavailable to the rest of the cluster due to a network partition, a neighboring node will take over responsibilities for the unavailable node. Once this node becomes available again, the neighboring node will pass over any updates through a process called “hinted handoff.” This is another way that Riak maintains availability and resilience even despite serious failure.

Riak Masterless

Because Riak’s system allows for reads and writes, even when multiple nodes are unavailable, and uses an eventually consistent design to maintain availability, in rare cases different replicas may contain different versions of an object. This can occur if multiple clients update the same piece of data at the exact same time or if nodes are down or laggy. These conflicts happen a statistically small portion of the time, but are important to know about. Riak has a number of mechanisms for detecting and resolving these conflicts when they occur. For more on how Riak achieves availability and the tradeoffs involved, see our documentation on the subject.

For many use cases today, high availability and fault tolerance are critical to the user experience and the company’s revenue. Unavailability has a negative impact on your revenue, damages user trust and leads to a poor user experience. For use cases such as online retail, shopping carts, advertising, social and mobile platforms or anything with critical data needs, high availability is key and Riak may be the right choice.

Sign up for the webcast here or read our whitepaper on moving from relational to Riak.

Basho

Why Your Riak Cluster Should Have At Least Five Nodes

April 26, 2012

Here at Basho we want to make sure that your Riak implementations are set up from the beginning to succeed. While you can use the Riak Fast Track to quickly set up a 3-node dev/test environment, we recommend that all production deployments use a minimum of 5 nodes, ensuring you benefit from the architectural principles that underpin Riak’s availability, fault-tolerance and scaling properties.

TL;DR: Deployments of five nodes or greater will provide a foundation for the best performance and growth as the cluster expands. Since Riak scales linearly with the addition of more nodes, users find improved performance, reliability, and throughput with larger clusters. Smaller deployments can compromise the fault-tolerance of the system: with a “sane” replication requirement for availability (we default to three copies), node failures in smaller clusters mean that replication requirements may not be met. This can result in degraded performance and risk of data loss. Additionally, clusters smaller than five nodes mean that with a sane replication requirement of 3, a high percentage (75-100% of the nodes) will need to respond to each request, putting undue load on the cluster that may degrade performance.

Let’s take a closer look in the scenario of a three- and four-node cluster.

Performance and Fault Tolerance Concerns in a 3-Node Cluster

To ensure that the cluster is always available to respond to read and write requests, Basho recommends a “sane default” for data replication: three copies of the data on three different nodes. The default configuration of Riak requires four nodes at minimum to insure no single node holds more than one copy of any particular piece of data. (In future versions of Riak we’ll be able to guarantee that each replica is living on a separate physical node. At this point it’s almost at 100%, but we won’t tell you it’s guaranteed until it is.) While it is possible to change the settings to ensure that the three replicas are on distinct nodes in a three node cluster, you still run into issues of replica placement during a node failure or network partition.

In the event of node failure or a network partition in a three-node cluster, the default requirement for replication remains three but there are only two nodes available to service requests. This will result in degraded performance and carries a risk of data loss.

Performance and Fault Tolerance Concerns in a 4-Node Cluster

With a requirement of three replicas, any one request for a particular piece of data from a 4-node cluster will require a response from 75 – 100% of the nodes in the cluster, which may result in degraded performance. In the event of node failure or a network partition in a 4-node cluster, you are back to the issues we outline above.

What if I want to change the replication default?

If using a different data replication number is right for your implementation, just make sure to use a cluster of N +2 nodes where N is the number of replicas for the reasons outlined above.

Going With 5 Nodes

As you add nodes to a Riak cluster that starts with 5 nodes, the percentage of the cluster required to service each request goes down. Riak scales linearly and predictably from this point on. When a node is taken out of service or fails, the number of nodes remaining is large enough to protect you from data loss.

So do your development and testing with smaller clusters, but when it comes to production, start with five nodes.

Happy scaling.

Shanley

Riak Core – The Coordinator

April 19, 2011

This was originally posted on Ryan Zezeski’s working blog Try Try Try.

At the end of my vnode post I asked the question Where’s the redundancy? There is none in RTS, thus far. Riak Core isn’t magic but rather a suite of tools for building distributed, highly available systems. You have to build your own redundancy. In this post I’ll talk about the coordinator and show how to implement one.

What is a Coordinator?

Logically speaking, a coordinator is just what it sounds like. It’s job is to coordinate incoming requests. It enforces the consistency semantics of N, R and W and performs anti-entropy services like read repair. In simpler terms, it’s responsible for distributing data across the cluster and re-syncing data when it finds conflicts. You could think of vnodes as the things that Get Shit Done (TM) and the coordinators as the other things telling them what to do and overseeing the work. They work in tandem to make sure your request is being handled as best as it can.

To be more concrete a coordinator is a gen_fsm. Each request is handled in it’s own Erlang process. A coordinator communicates with the vnode instances to fulfill requests.

To wrap up, a coordinator

  • coordinates requests
  • enforces the consistency requirements
  • performs anti-entropy
  • is an Erlang process that implements the gen_fsm behavior
  • and communicates with the vnode instances to execute the request

Implementing a Coordinator

Unlike the vnode, Riak Core doesn’t define a coordinator behavior. You have to roll your own each time. I used Riak’s get and put coordinators for guidance. You’ll notice they both have a similar structure. I’m going to propose a general structure here that you can use as your guide, but remember that there’s nothing set in stone on how to write a coordinator.

Before moving forward it’s worth mentioning that you’ll want to instantiate these coordinators under a simple_one_for_one supervisor. If you’ve never heard of simple_one_for_one before then think of it as a factory for Erlang processes of the same type. An incoming request will at some point call supervisor:start_child/2 to instantiate a new FSM dedicated to handling this specific request.

init(Args) -> {ok, InitialState, SD, Timeout}

erlang
Args = term()
InitialState = atom()
SD = term()
Timeout = integer()

This is actually part of the gen_fsm behavior, i.e. it’s a callback you must implement. It’s job is to specify the InitialState name and it’s data (SD). In this case you’ll also want to specify a Timeout value of 0 in order to immediately go to the InitialState, prepare.

A get coordinator for RTS is passed four arguments.

  1. ReqId: A unique id for this request.
  2. From: Who to send the reply to.
  3. Client: The name of the client entity — the entity that is writing log events to RTS.
  4. StatName: The name of the statistic the requester is interested in.

All this data will be passed as a list to init and the only work that needs to be done is to build the initial state record and tell the FSM to proceed to the prepare state.

erlang
init([ReqId, From, Client, StatName]) ->
SD = #state{req_id=ReqId,
from=From,
client=Client,
stat_name=StatName},
{ok, prepare, SD, 0}.

The write coordinator for RTS is very similar but has two additional arguments.

  1. Op: The operation to be performed, one of set, append, incr,
    incrby or sadd.
  2. Val: The value of the operation. For the incr op this is undefined.

Here is the code.

erlang
init([ReqID, From, Client, StatName, Op, Val]) ->
SD = #state{req_id=ReqID,
from=From,
client=Client,
stat_name=StatName,
op=Op,
val=Val},
{ok, prepare, SD, 0}.

prepare(timeout, SD0) -> {next_state, NextState, SD, Timeout}

erlang
SD0 = SD = term()
NextState = atom()
Timeout = integer()

The job of prepare is to build the preference list. The preference list is the preferred set of vnodes that should participate in this request. Most of the work is actually done by riak_core_util:chash_key/1 and riak_core_apl:get_apl/3. Both the get and write coordinators do the same thing here.

  1. Calculate the index in the ring that this request falls on.
  2. From this index determine the N preferred partitions that should handle the request.

Here is the code.

erlang
prepare(timeout, SD0=#state{client=Client,
stat_name=StatName}) ->
DocIdx = riak_core_util:chash_key({list_to_binary(Client),
list_to_binary(StatName)}),
Prelist = riak_core_apl:get_apl(DocIdx, ?N, rts_stat),
SD = SD0#state{preflist=Prelist},
{next_state, execute, SD, 0}.

The fact that the key is a two-tuple is simply a consequence of the fact that Riak Core was extracted from Riak and some of it’s key-value semantics crossed during the extraction. In the future things like this may change.

execute(timeout, SD0) -> {next_state, NextState, SD}

erlang
SD0 = SD = term()
NextState = atom()

The execute state executes the request by sending commands to the vnodes in the preflist and then putting the coordinator into a waiting state. The code to do this in RTS is really simple; call the vnode command passing it the preference list. Under the covers the vnode has been changed to use riak_core_vnode_master:command/4 which will distribute the commands across the Preflist for you. I’ll talk about this later in the post.

Here’s the code for the get coordinator.

erlang
execute(timeout, SD0=#state{req_id=ReqId,
stat_name=StatName,
preflist=Prelist}) ->
rts_stat_vnode:get(Prelist, ReqId, StatName),
{next_state, waiting, SD0}.

The code for the write coordinator is almost identical except it’s parameterized on Op.

erlang
execute(timeout, SD0=#state{req_id=ReqID,
stat_name=StatName,
op=Op,
val=undefined,
preflist=Preflist}) ->
rts_stat_vnode:Op(Preflist, ReqID, StatName),
{next_state, waiting, SD0}.

waiting(Reply, SD0) -> Result

erlang
Reply = {ok, ReqID}
Result = {next_state, NextState, SD}
| {stop, normal, SD}
NextState = atom()
SD0 = SD = term()

This is probably the most interesting state in the coordinator as it’s job is to enforce the consistency requirements and possibly perform anti-entropy in the case of a get. The coordinator waits for replies from the various vnode instances it called in execute and stops once it’s requirements have been met. The typical shape of this function is to pattern match on the Reply, check the state data SD0, and then either continue waiting or stop depending on the current state data.

The get coordinator waits for replies with the correct ReqId, increments the reply count and adds the Val to the list of Replies. If the quorum R has been met then return the Val to the requester and stop the coordinator. If the vnodes didn’t agree on the value then return all observed values. In this post I am punting on the conflict resolution and anti-entropy part of the coordinator and exposing the inconsistent state to the client application. I’ll implement them in my next post. If the quorum hasn’t been met then continue waiting for more replies.

erlang
waiting({ok, ReqID, Val}, SD0=#state{from=From, num_r=NumR0, replies=Replies0}) ->
NumR = NumR0 + 1,
Replies = [Val|Replies0],
SD = SD0#state{num_r=NumR,replies=Replies},
if
NumR =:= ?R ->
Reply =
case lists:any(different(Val), Replies) of
true ->
Replies;
false ->
Val
end,
From ! {ReqID, ok, Reply},
{stop, normal, SD};
true -> {next_state, waiting, SD}
end.

The write coordinator has things a little easier here cause all it cares about is knowing that W vnodes executed it’s write request.

erlang
waiting({ok, ReqID}, SD0=#state{from=From, num_w=NumW0}) ->
NumW = NumW0 + 1,
SD = SD0#state{num_w=NumW},
if
NumW =:= ?W ->
From ! {ReqID, ok},
{stop, normal, SD};
true -> {next_state, waiting, SD}
end.

What About the Entry Coordinator?

Some of you may be wondering why I didn’t write a coordinator for the entry vnode? If you don’t remember this is responsible for matching an incoming log entry and then executing it’s trigger function. For example, any incoming log entry from an access log in combined logging format will cause the total_reqs stat to be incremented by one. I only want this action to occur at maximum once per entry. There is no notion of N. I could write a coordinator that tries to make some guarentees about it’s execution but for now I’m ok with possibly dropping data occasionally.

Changes to rts.erl and rts_stat_vnode

Now that we’ve written coordinators to handle requests to RTS we need to refactor the old rts.erl and rts_stat_vnode. The model has changed from rts calling the vnode directly to delegating the work to rts_get_fsm which will call the various vnodes and collect responses.

“`text
rts:get —-> rts_stat_vnode:get (local)

                                                       /--> stat_vnode@rts1

rts:get —-> rts_get_fsm:get —-> riak_stat_vnode:get –|—> stat_vnode@rts2
–> stat_vnode@rts3
“`

Instead of performing a synchronous request the rts:get/2 function now calls the get coordinator and then waits for a response.

erlang
get(Calient, StatName) ->
{ok, ReqID} = rts_get_fsm:get(Client, StatName),
wait_for_reqid(ReqID, ?TIMEOUT).

The write requests underwent a similar refactoring.

“`erlang
do_write(Client, StatName, Op) ->
{ok, ReqID} = rts_write_fsm:write(Client, StatName, Op),
wait_for_reqid(ReqID, ?TIMEOUT).

do_write(Client, StatName, Op, Val) ->
{ok, ReqID} = rts_write_fsm:write(Client, StatName, Op, Val),
wait_for_reqid(ReqID, ?TIMEOUT).
“`

The rts_stat_vnode was refactored to use riak_core_vnode_master:command/4 which takes a Preflist, Msg, Sender and VMaster as argument.

Preflist: The list of vnodes to send the command to.

Msg: The command to send.

Sender: A value describing who sent the request, in this case the coordinator. This is used by the vnode to correctly address the reply message.

VMaster: The name of the vnode master for the vnode type to send this command to.

erlang
get(Preflist, ReqID, StatName) ->
riak_core_vnode_master:command(Preflist,
{get, ReqID, StatName},
{fsm, undefined, self()},
?MASTER).

Coordinators in Action

Talk is cheap, let’s see it in action. Towards the end of the vnode post I made the following statement:

“If you start taking down nodes you’ll find that stats start to disappear.”

One of the main objectives of the coordinator is to fix this problem. Lets see if it worked.

Build the devrel

bash
make
make devrel

Start the Cluster

bash
for d in dev/dev*; do $d/bin/rts start; done
for d in dev/dev{2,3}; do $d/bin/rts-admin join rts1@127.0.0.1; done

Feed in Some Data

bash
gunzip -c progski.access.log.gz | head -100 | ./replay --devrel progski

Get Some Stats

text
./dev/dev1/bin/rts attach
(rts1@127.0.0.1)1> rts:get("progski", "total_reqs").
{ok,97}
(rts1@127.0.0.1)2> rts:get("progski", "GET").
{ok,91}
(rts1@127.0.0.1)3> rts:get("progski", "total_sent").
{ok,445972}
(rts1@127.0.0.1)4> rts:get("progski", "HEAD").
{ok,6}
(rts1@127.0.0.1)5> rts:get("progski", "PUT").
{ok,not_found}
(rts1@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_reqs").
[{730750818665451459101842416358141509827966271488,
'rts3@127.0.0.1'},
{753586781748746817198774991869333432010090217472,
'rts1@127.0.0.1'},
{776422744832042175295707567380525354192214163456,
'rts2@127.0.0.1'}]
(rts1@127.0.0.1)7> rts:get_dbg_preflist("progski", "GET").
[{274031556999544297163190906134303066185487351808,
'rts1@127.0.0.1'},
{296867520082839655260123481645494988367611297792,
'rts2@127.0.0.1'},
{319703483166135013357056057156686910549735243776,
'rts3@127.0.0.1'}]

Don’t worry about what I did on lines 6 and 7 yet, I’ll explain in a second.

Kill a Node

text
(rts1@127.0.0.1)8> os:getpid().
"91461"
Ctrl^D
kill -9 91461

Verify it’s Down

bash
$ ./dev/dev1/bin/rts ping
Node 'rts1@127.0.0.1' not responding to pings.

Get Stats on rts2

You’re results my not exactly match mine as it depends on which vnode instances responded first. The coordinator only cares about getting R responses.

text
./dev/dev2/bin/rts attach
(rts2@127.0.0.1)1> rts:get("progski", "total_reqs").
{ok,97}
(rts2@127.0.0.1)2> rts:get("progski", "GET").
{ok,[not_found,91]}
(rts2@127.0.0.1)3> rts:get("progski", "total_sent").
{ok,445972}
(rts2@127.0.0.1)4> rts:get("progski", "HEAD").
{ok,[not_found,6]}
(rts2@127.0.0.1)5> rts:get("progski", "PUT").
{ok,not_found}

Let’s Compare the Before and After Preflist

Notice that some gets on rts2 return a single value as before whereas others return a list of values. The reason for this is because the Preflist calculation is now including fallback vnodes. A fallback vnode is one that is not on it’s appropriate physical node. Since we killed rts1 it’s vnode requests must be routed somewhere else. That somewhere else is a fallback vnode. Since the request-reply model between the coordinator and vnode is asynchronous our reply value will depend on which vnode instances reply first. If the instances with values reply first then you get a single value, otherwise you get a list of values. My next post will improve this behavior slightly to take advantage of the fact that we know there are still two nodes with the data and there should be no reason to return conflicting values.

text
(rts2@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_reqs").
[{730750818665451459101842416358141509827966271488,
'rts3@127.0.0.1'},
{776422744832042175295707567380525354192214163456,
'rts2@127.0.0.1'},
{753586781748746817198774991869333432010090217472,
'rts3@127.0.0.1'}]
(rts2@127.0.0.1)7> rts:get_dbg_preflist("progski", "GET").
[{296867520082839655260123481645494988367611297792,
'rts2@127.0.0.1'},
{319703483166135013357056057156686910549735243776,
'rts3@127.0.0.1'},
{274031556999544297163190906134303066185487351808,
'rts2@127.0.0.1'}]

In both cases either rts2 or rts3 stepped in for the missing rts1. Also, in each case, one of these vnodes is going to return not_found since it’s a fallback. I added another debug function to determine which one.

text
(rts2@127.0.0.1)8> rts:get_dbg_preflist("progski", "total_reqs", 1).
[{730750818665451459101842416358141509827966271488,
'rts3@127.0.0.1'},
97]
(rts2@127.0.0.1)9> rts:get_dbg_preflist("progski", "total_reqs", 2).
[{776422744832042175295707567380525354192214163456,
'rts2@127.0.0.1'},
97]
(rts2@127.0.0.1)10> rts:get_dbg_preflist("progski", "total_reqs", 3).
[{753586781748746817198774991869333432010090217472,
'rts3@127.0.0.1'},
not_found]
(rts2@127.0.0.1)11> rts:get_dbg_preflist("progski", "GET", 1).
[{296867520082839655260123481645494988367611297792,
'rts2@127.0.0.1'},
91]
(rts2@127.0.0.1)12> rts:get_dbg_preflist("progski", "GET", 2).
[{319703483166135013357056057156686910549735243776,
'rts3@127.0.0.1'},
91]
(rts2@127.0.0.1)13> rts:get_dbg_preflist("progski", "GET", 3).
[{274031556999544297163190906134303066185487351808,
'rts2@127.0.0.1'},
not_found]

Notice the fallbacks are at the end of each list. Also notice that since we’re on rts2 that total_reqs will almost always return a single value because it’s fallback is on another node whereas GET has a local fallback and will be more likely to return first.

Conflict Resolution & Read Repair

In the next post I’ll be making several enhancements to the get coordinator by performing basic conflict resolution and implementing read repair.

Ryan

Schema Design in Riak – Introduction

March 19, 2010

One of the challenges of switching from a relational database (Oracle, MySQL, etc.) to a “NoSQL” database like Riak is understanding how to represent your data within the database. This post is the beginning of a series of entries on how to structure your data within Riak in useful ways.

Choices have consequences

There are many reasons why you might choose Riak for your database, and I’m going to explain how a few of those reasons will affect the way your data is structured and manipulated.

One oft-cited reason for choosing Riak, and other alternative databases, is the need to manage huge amounts of data, collectively called “Big Data”. If you’re storing lots of data, you’re less likely to be doing online queries across large swaths of the data. You might be doing real-time aggregation in addition to calculating longer-term information in the background or offline. You might have one system collecting the data and another processing it. You might be storing loosely-structured information like log data or ad impressions. All of these use-cases call for low ceremony, high availability for writes, and little need for robust ways of finding data — perfect for a key/value-style scheme.

Another reason one might pick Riak is for flexibility in modeling your data. Riak will store any data you tell it to in a content-agnostic way — it does not enforce tables, columns, or referential integrity. This means you can store binary files right alongside more programmer-transparent formats like JSON or XML. Using Riak as a sort of “document database” (semi-structured, mostly de-normalized data) and “attachment storage” will have different needs than the key/value-style scheme — namely, the need for efficient online-queries, conflict resolution, increased internal semantics, and robust expressions of relationships.

The third reason for choosing Riak I want to discuss is related to CAP – in that Riak prefers A (Availability) over C (Consistency). In contrast to a traditional relational database system, in which transactional semantics ensure that a datum will always be in a consistent state, Riak chooses to accept writes even if the state of the object has been changed by another client (in the case of a race-condition), or if the cluster was partitioned and the state of the object diverges. These architecture choices bring to the fore something we should have been considering all along — how should our applications deal with inconsistency? Riak lets you choose whether to let the “last one win” or to resolve the conflict in your application by automated or human-assisted means.

More mindful domain modeling

What’s the moral of these three stories? When modeling your data in Riak, you need to understand better the shape of your data. You can no longer rely on normalization, foreign key constraints, secondary indexes and transactions to make decisions for you.

Questions you might ask yourself when designing your schema:

  • Will my access pattern be read-heavy, write-heavy, or balanced?
  • Which datasets churn the most? Which ones require more sophisticated conflict resolution?
  • How will I find this particular type of data? Which method is most efficient?
  • How independent/interrelated is this type of data with this other type of data? Do they belong together?
  • What is an appropriate key-scheme for this data? Should I choose my own or let Riak choose?
  • How much will I need to do online queries on this data? How quickly do I need them to return results?
  • What internal structure, if any, best suits this data?
  • Does the structure of this data promote future design modifications?
  • How resilient will the structure of the data be if requirements change? How can the change be effected without serious interruption of service?

I like to draw up my domain concepts on a pad of unlined paper or a whiteboard with boxes and arrows, then figure out how they map onto the database. Ultimately, the concepts define your application, so get those solid before you even worry about Riak.

Thinking non-relationally

Once you’ve thought carefully about the questions described above, it’s time think about how your data will map to Riak. We’ll start from the small-scale in this post (single domain concepts) and work our way out in future installments.

Internal structure

For a single class of objects in your domain, let’s consider the structure of that data. Here’s where you’re going to decide two interrelated issues — how this class of data will be queried and how opaque its internal structure will be to Riak.

The first issue, how the data will be queried, depends partly on how easy it is to intuit the key of a desired object. For example, if your data is user profiles that are mostly private, perhaps the user’s email or login name would be appropriate for the key, which would be easy to establish when the user logs in. However, if the key is not so easy to determine, or is arbitrary, you will need map-reduce or link-walking to find it.

The second issue, how opaque the data is to Riak, is affected by how you query but also by the nature of the data you’re storing. If you need to do intricate map-reduce queries to find or manipulate the data, you’ll likely want it in a form like JSON (or an Erlang term) so your map and reduce functions can reason about the data. On the other hand, if your data is something like an image or PDF, you don’t want to shoehorn that into JSON. If you’re in the situation where you need both a form that’s opaque to Riak, and to be able to reason about it with map-reduce, have your application add relevant metadata to the object. These are created using X-Riak-Meta-* headers in HTTP or riak_object:update_metadata/2 in Erlang.

Rule of thumb: if it’s an abstract datatype, use a map-reduce-friendly format like JSON; if it’s a concrete form, use its original representation. Of course, there are exceptions to every rule, so think carefully about your modeling problem.

Consistency, replication, conflict resolution

The second issue I would consider for each type of data is the access pattern and desired level of consistency. This is related to the questions above of read/write loads, churn, and conflicts.

Riak provides a few knobs you can turn at schema-design time and at request-time that relate to these issues. The first is allow_mult, or whether to allow recording of divergent versions of objects. In a write-heavy load or where clients are updating the same objects frequently, possibly at the same time, you probably want this on (true), which you can change by setting the bucket properties. The tradeoffs are that the vector clock may grow quickly and your application will need to decide how to resolve conflicts.

The second knob you can turn is the n_val, or how many replicas of each object to store, also a per-bucket setting. The default value is 3, which will work for many applications. If you need more assurance that your data is going to withstand failures, you might increase the value. If your data is non-critical or in large chunks, you might decrease the value to get greater performance. Knowing what to choose for this value will depend on an honest assessment of both the value of your data and operational concerns.

The third knob you can turn is per-request quorums. For reads, this is the R request parameter: how many replicas need to agree on the value for the read to succeed (the default is 2). For writes, there are two parameters, W and DW. W is how many replicas need to acknowledge the write request before it succeeds (default is 2). DW (durable writes) is how many replica backends need to confirm that the write finished before the entire write succeeds (default is 0). If you need greater consistency when reading or writing your data, you’ll want to increase these numbers. If you need greater performance and can sacrifice some consistency, decrease them. In any case, your R, W, and DW values must be smaller than n_val if you want the request to succeed.

What do these have to do with your data model? Fundamentally understanding the structure and purpose of your data will help you determine how you should turn these knobs. Some examples:

  • Log data: You’ll probably want low R and W values so that writes are accepted quickly. Because these are fire-and-forget writes, you won’t need allow_mult turned on. You might also want a low n_val, depending on how critical your data is.
  • Binary files: Your n_val is probably the most significant issue here, mostly depending on how large your files are and how many replicas of them you can tolerate (storage consumption).
  • JSON documents (abstract types): The defaults will work in most cases. Depending on how frequently the data is updated, and how many you update within a single conceptual operation with the application, you may want to enable allow_mult to prevent blind overwrites.

Sean Cribbs