Tag Archives: nodes

Cluster Management Improvements in Riak 1.2

September 13, 2012

Last month we released Riak 1.2 , with a number of improvements in Riak stats, the protobufs API, LevelDB backend and repair/recovery capabilities. Riak 1.2 also features a new strategy for making cluster changes like adding and removing nodes. With the new approach, Riak allows you to stage changes, view the impact on the cluster, and then commit or abort changes. The increased visibility lets Riak operators make more informed decisions about when and how to scale up, scale down and upgrade or replace nodes. Additionally, you can now make multiple changes, like adding a number of nodes, at the same time – critical for large-scale clusters.

Pre 1.2 Cluster Management

In prior versions of Riak, users made changes to the cluster using commands under the “riak-admin” syntax. To add or remove a node to the cluster, you would simply call “riak-admin join” or “riak-admin leave,” and the Riak cluster would immediately begin to handoff data and ownership as appropriate. While this approach was simple, it did raise two issues we’ve tried to address with the new cluster management capabilities:

  • Coordinating cluster changes: Prior to Riak 1.2, there was no way to group changes together. Changes were entered sequentially, and if there was more than one change (e.g. joining multiple nodes to a cluster), the first change would happen in a single transition and the remaining changes (e.g. the rest of the joins) would occur together in a second transition. In the case of multiple joins, in the first transition, data is transferred from the cluster to the new node. Then, in the second transition, some of the data transferred to the first new node is then transferred to the other new nodes, wasting network bandwidth and disk space. This proved particularly problematic for production deployments in which nodes were frequently added or removed.
  • Planning: The pre-1.2 approach to cluster management didn’t give you visibility into how your changes would affect the cluster before you made them. For instance, the only way to know how many transfers a join would take would be to start the join and then run “riak-admin ring-status”. Likewise, you couldn’t know what ownership would look like until after the join.

Staged Clustering

We addressed both of the above issues with a new approach we’re calling ‘Staged Clustering’.

In Riak 1.2, instead of joins, leaves, etc. taking place immediately, they’re first staged. After staging cluster changes, you can view how the changes will affect the cluster, seeing how ring ownership will change and how many transfers between nodes will need to occur to complete the transition. After looking at the plan, you can then add or remove changes staged to be committed, scrap the plan, or execute it as is.

Staged Clustering Riak

Staged Clustering High Level Process

The ‘Staged Clustering’ interface is implemented in Riak’s command line tool, riak-admin, under the ‘cluster’ command. Underneath the ‘cluster’ command are subcommands used to stage, view, and commit cluster changes (e.g. to join the current cluster to node dev1, you’d use: ‘riak-admin cluster join dev1’ ). You can read more about the new syntax in the Riak Wiki. Currently, the new approach to cluster management is not implemented in Riak Control, our open-source management and monitoring GUI, but is planned for a later release.


Let’s take a look at how the new cluster management strategy would work in a scenario where we wanted to add three nodes to an existing node (dev1) to form a four-node cluster.

1. View the Current Member Status

First, we call ‘riak-admin member_status’ to get a view of the current cluster, the nodes in it and their current ring ownership:

Member Status Riak

2. Stage Joining New Nodes

Next, we’ll join three nodes (dev2, dev3, dev4) to the cluster using the cluster command.

dev2: riak-admin cluster join dev1
dev3: riak-admin cluster join dev1
dev4: riak-admin cluster join dev1

The joins are now staged for commit.

3. View How Staged Changes Will Affect the Cluster

Now we can use the new ‘riak-admin cluster plan’ command to see the impact of the joins on the cluster, viewing changes to ring ownership and transfers that need to occur.

riak-admin cluster plan

Cluster Management Plan Riak

In the output, we see: the changes staged for commit, the number of resulting cluster transitions (1), how the data will be distributed around the ring after transition (25% on each node), and the number of transfers the transition will take (48 total).

4. Commit Changes

If we want to commit these changes, we use the commit command:

riak-admin cluster commit

These changes start taking place immediately. If we run ‘riak-admin member_status’, we can see the status of the transition. Additionally, we’ve fleshed out the ‘riak-admin transfers’ command to give you much more visibility into active transfers in Riak 1.2.

Other Resources

For more in-depth information on the new cluster management stuff in Riak 1.2, check out this recorded webinar with Basho engineer Joseph Blomstedt and the updated docs.

Why Your Riak Cluster Should Have At Least Five Nodes

April 26, 2012

Here at Basho we want to make sure that your Riak implementations are set up from the beginning to succeed. While you can use the Riak Fast Track to quickly set up a 3-node dev/test environment, we recommend that all production deployments use a minimum of 5 nodes, ensuring you benefit from the architectural principles that underpin Riak’s availability, fault-tolerance and scaling properties.

TL;DR: Deployments of five nodes or greater will provide a foundation for the best performance and growth as the cluster expands. Since Riak scales linearly with the addition of more nodes, users find improved performance, reliability, and throughput with larger clusters. Smaller deployments can compromise the fault-tolerance of the system: with a “sane” replication requirement for availability (we default to three copies), node failures in smaller clusters mean that replication requirements may not be met. This can result in degraded performance and risk of data loss. Additionally, clusters smaller than five nodes mean that with a sane replication requirement of 3, a high percentage (75-100% of the nodes) will need to respond to each request, putting undue load on the cluster that may degrade performance.

Let’s take a closer look in the scenario of a three- and four-node cluster.

Performance and Fault Tolerance Concerns in a 3-Node Cluster

To ensure that the cluster is always available to respond to read and write requests, Basho recommends a “sane default” for data replication: three copies of the data on three different nodes. The default configuration of Riak requires four nodes at minimum to insure no single node holds more than one copy of any particular piece of data. (In future versions of Riak we’ll be able to guarantee that each replica is living on a separate physical node. At this point it’s almost at 100%, but we won’t tell you it’s guaranteed until it is.) While it is possible to change the settings to ensure that the three replicas are on distinct nodes in a three node cluster, you still run into issues of replica placement during a node failure or network partition.

In the event of node failure or a network partition in a three-node cluster, the default requirement for replication remains three but there are only two nodes available to service requests. This will result in degraded performance and carries a risk of data loss.

Performance and Fault Tolerance Concerns in a 4-Node Cluster

With a requirement of three replicas, any one request for a particular piece of data from a 4-node cluster will require a response from 75 – 100% of the nodes in the cluster, which may result in degraded performance. In the event of node failure or a network partition in a 4-node cluster, you are back to the issues we outline above.

What if I want to change the replication default?

If using a different data replication number is right for your implementation, just make sure to use a cluster of N +2 nodes where N is the number of replicas for the reasons outlined above.

Going With 5 Nodes

As you add nodes to a Riak cluster that starts with 5 nodes, the percentage of the cluster required to service each request goes down. Riak scales linearly and predictably from this point on. When a node is taken out of service or fails, the number of nodes remaining is large enough to protect you from data loss.

So do your development and testing with smaller clusters, but when it comes to production, start with five nodes.

Happy scaling.


Riak Core – The Coordinator

April 19, 2011

This was originally posted on Ryan Zezeski’s working blog Try Try Try.

At the end of my vnode post I asked the question Where’s the redundancy? There is none in RTS, thus far. Riak Core isn’t magic but rather a suite of tools for building distributed, highly available systems. You have to build your own redundancy. In this post I’ll talk about the coordinator and show how to implement one.

What is a Coordinator?

Logically speaking, a coordinator is just what it sounds like. It’s job is to coordinate incoming requests. It enforces the consistency semantics of N, R and W and performs anti-entropy services like read repair. In simpler terms, it’s responsible for distributing data across the cluster and re-syncing data when it finds conflicts. You could think of vnodes as the things that Get Shit Done (TM) and the coordinators as the other things telling them what to do and overseeing the work. They work in tandem to make sure your request is being handled as best as it can.

To be more concrete a coordinator is a gen_fsm. Each request is handled in it’s own Erlang process. A coordinator communicates with the vnode instances to fulfill requests.

To wrap up, a coordinator

  • coordinates requests
  • enforces the consistency requirements
  • performs anti-entropy
  • is an Erlang process that implements the gen_fsm behavior
  • and communicates with the vnode instances to execute the request

Implementing a Coordinator

Unlike the vnode, Riak Core doesn’t define a coordinator behavior. You have to roll your own each time. I used Riak’s get and put coordinators for guidance. You’ll notice they both have a similar structure. I’m going to propose a general structure here that you can use as your guide, but remember that there’s nothing set in stone on how to write a coordinator.

Before moving forward it’s worth mentioning that you’ll want to instantiate these coordinators under a simple_one_for_one supervisor. If you’ve never heard of simple_one_for_one before then think of it as a factory for Erlang processes of the same type. An incoming request will at some point call supervisor:start_child/2 to instantiate a new FSM dedicated to handling this specific request.

init(Args) -> {ok, InitialState, SD, Timeout}

Args = term()
InitialState = atom()
SD = term()
Timeout = integer()

This is actually part of the gen_fsm behavior, i.e. it’s a callback you must implement. It’s job is to specify the InitialState name and it’s data (SD). In this case you’ll also want to specify a Timeout value of 0 in order to immediately go to the InitialState, prepare.

A get coordinator for RTS is passed four arguments.

  1. ReqId: A unique id for this request.
  2. From: Who to send the reply to.
  3. Client: The name of the client entity — the entity that is writing log events to RTS.
  4. StatName: The name of the statistic the requester is interested in.

All this data will be passed as a list to init and the only work that needs to be done is to build the initial state record and tell the FSM to proceed to the prepare state.

init([ReqId, From, Client, StatName]) ->
SD = #state{req_id=ReqId,
{ok, prepare, SD, 0}.

The write coordinator for RTS is very similar but has two additional arguments.

  1. Op: The operation to be performed, one of set, append, incr,
    incrby or sadd.
  2. Val: The value of the operation. For the incr op this is undefined.

Here is the code.

init([ReqID, From, Client, StatName, Op, Val]) ->
SD = #state{req_id=ReqID,
{ok, prepare, SD, 0}.

prepare(timeout, SD0) -> {next_state, NextState, SD, Timeout}

SD0 = SD = term()
NextState = atom()
Timeout = integer()

The job of prepare is to build the preference list. The preference list is the preferred set of vnodes that should participate in this request. Most of the work is actually done by riak_core_util:chash_key/1 and riak_core_apl:get_apl/3. Both the get and write coordinators do the same thing here.

  1. Calculate the index in the ring that this request falls on.
  2. From this index determine the N preferred partitions that should handle the request.

Here is the code.

prepare(timeout, SD0=#state{client=Client,
stat_name=StatName}) ->
DocIdx = riak_core_util:chash_key({list_to_binary(Client),
Prelist = riak_core_apl:get_apl(DocIdx, ?N, rts_stat),
SD = SD0#state{preflist=Prelist},
{next_state, execute, SD, 0}.

The fact that the key is a two-tuple is simply a consequence of the fact that Riak Core was extracted from Riak and some of it’s key-value semantics crossed during the extraction. In the future things like this may change.

execute(timeout, SD0) -> {next_state, NextState, SD}

SD0 = SD = term()
NextState = atom()

The execute state executes the request by sending commands to the vnodes in the preflist and then putting the coordinator into a waiting state. The code to do this in RTS is really simple; call the vnode command passing it the preference list. Under the covers the vnode has been changed to use riak_core_vnode_master:command/4 which will distribute the commands across the Preflist for you. I’ll talk about this later in the post.

Here’s the code for the get coordinator.

execute(timeout, SD0=#state{req_id=ReqId,
preflist=Prelist}) ->
rts_stat_vnode:get(Prelist, ReqId, StatName),
{next_state, waiting, SD0}.

The code for the write coordinator is almost identical except it’s parameterized on Op.

execute(timeout, SD0=#state{req_id=ReqID,
preflist=Preflist}) ->
rts_stat_vnode:Op(Preflist, ReqID, StatName),
{next_state, waiting, SD0}.

waiting(Reply, SD0) -> Result

Reply = {ok, ReqID}
Result = {next_state, NextState, SD}
| {stop, normal, SD}
NextState = atom()
SD0 = SD = term()

This is probably the most interesting state in the coordinator as it’s job is to enforce the consistency requirements and possibly perform anti-entropy in the case of a get. The coordinator waits for replies from the various vnode instances it called in execute and stops once it’s requirements have been met. The typical shape of this function is to pattern match on the Reply, check the state data SD0, and then either continue waiting or stop depending on the current state data.

The get coordinator waits for replies with the correct ReqId, increments the reply count and adds the Val to the list of Replies. If the quorum R has been met then return the Val to the requester and stop the coordinator. If the vnodes didn’t agree on the value then return all observed values. In this post I am punting on the conflict resolution and anti-entropy part of the coordinator and exposing the inconsistent state to the client application. I’ll implement them in my next post. If the quorum hasn’t been met then continue waiting for more replies.

waiting({ok, ReqID, Val}, SD0=#state{from=From, num_r=NumR0, replies=Replies0}) ->
NumR = NumR0 + 1,
Replies = [Val|Replies0],
SD = SD0#state{num_r=NumR,replies=Replies},
NumR =:= ?R ->
Reply =
case lists:any(different(Val), Replies) of
true ->
false ->
From ! {ReqID, ok, Reply},
{stop, normal, SD};
true -> {next_state, waiting, SD}

The write coordinator has things a little easier here cause all it cares about is knowing that W vnodes executed it’s write request.

waiting({ok, ReqID}, SD0=#state{from=From, num_w=NumW0}) ->
NumW = NumW0 + 1,
SD = SD0#state{num_w=NumW},
NumW =:= ?W ->
From ! {ReqID, ok},
{stop, normal, SD};
true -> {next_state, waiting, SD}

What About the Entry Coordinator?

Some of you may be wondering why I didn’t write a coordinator for the entry vnode? If you don’t remember this is responsible for matching an incoming log entry and then executing it’s trigger function. For example, any incoming log entry from an access log in combined logging format will cause the total_reqs stat to be incremented by one. I only want this action to occur at maximum once per entry. There is no notion of N. I could write a coordinator that tries to make some guarentees about it’s execution but for now I’m ok with possibly dropping data occasionally.

Changes to rts.erl and rts_stat_vnode

Now that we’ve written coordinators to handle requests to RTS we need to refactor the old rts.erl and rts_stat_vnode. The model has changed from rts calling the vnode directly to delegating the work to rts_get_fsm which will call the various vnodes and collect responses.

rts:get —-> rts_stat_vnode:get (local)

                                                       /--> stat_vnode@rts1

rts:get —-> rts_get_fsm:get —-> riak_stat_vnode:get –|—> stat_vnode@rts2
–> stat_vnode@rts3

Instead of performing a synchronous request the rts:get/2 function now calls the get coordinator and then waits for a response.

get(Calient, StatName) ->
{ok, ReqID} = rts_get_fsm:get(Client, StatName),
wait_for_reqid(ReqID, ?TIMEOUT).

The write requests underwent a similar refactoring.

do_write(Client, StatName, Op) ->
{ok, ReqID} = rts_write_fsm:write(Client, StatName, Op),
wait_for_reqid(ReqID, ?TIMEOUT).

do_write(Client, StatName, Op, Val) ->
{ok, ReqID} = rts_write_fsm:write(Client, StatName, Op, Val),
wait_for_reqid(ReqID, ?TIMEOUT).

The rts_stat_vnode was refactored to use riak_core_vnode_master:command/4 which takes a Preflist, Msg, Sender and VMaster as argument.

Preflist: The list of vnodes to send the command to.

Msg: The command to send.

Sender: A value describing who sent the request, in this case the coordinator. This is used by the vnode to correctly address the reply message.

VMaster: The name of the vnode master for the vnode type to send this command to.

get(Preflist, ReqID, StatName) ->
{get, ReqID, StatName},
{fsm, undefined, self()},

Coordinators in Action

Talk is cheap, let’s see it in action. Towards the end of the vnode post I made the following statement:

“If you start taking down nodes you’ll find that stats start to disappear.”

One of the main objectives of the coordinator is to fix this problem. Lets see if it worked.

Build the devrel

make devrel

Start the Cluster

for d in dev/dev*; do $d/bin/rts start; done
for d in dev/dev{2,3}; do $d/bin/rts-admin join rts1@; done

Feed in Some Data

gunzip -c progski.access.log.gz | head -100 | ./replay --devrel progski

Get Some Stats

./dev/dev1/bin/rts attach
(rts1@> rts:get("progski", "total_reqs").
(rts1@> rts:get("progski", "GET").
(rts1@> rts:get("progski", "total_sent").
(rts1@> rts:get("progski", "HEAD").
(rts1@> rts:get("progski", "PUT").
(rts1@> rts:get_dbg_preflist("progski", "total_reqs").
(rts1@> rts:get_dbg_preflist("progski", "GET").

Don’t worry about what I did on lines 6 and 7 yet, I’ll explain in a second.

Kill a Node

(rts1@> os:getpid().
kill -9 91461

Verify it’s Down

$ ./dev/dev1/bin/rts ping
Node 'rts1@' not responding to pings.

Get Stats on rts2

You’re results my not exactly match mine as it depends on which vnode instances responded first. The coordinator only cares about getting R responses.

./dev/dev2/bin/rts attach
(rts2@> rts:get("progski", "total_reqs").
(rts2@> rts:get("progski", "GET").
(rts2@> rts:get("progski", "total_sent").
(rts2@> rts:get("progski", "HEAD").
(rts2@> rts:get("progski", "PUT").

Let’s Compare the Before and After Preflist

Notice that some gets on rts2 return a single value as before whereas others return a list of values. The reason for this is because the Preflist calculation is now including fallback vnodes. A fallback vnode is one that is not on it’s appropriate physical node. Since we killed rts1 it’s vnode requests must be routed somewhere else. That somewhere else is a fallback vnode. Since the request-reply model between the coordinator and vnode is asynchronous our reply value will depend on which vnode instances reply first. If the instances with values reply first then you get a single value, otherwise you get a list of values. My next post will improve this behavior slightly to take advantage of the fact that we know there are still two nodes with the data and there should be no reason to return conflicting values.

(rts2@> rts:get_dbg_preflist("progski", "total_reqs").
(rts2@> rts:get_dbg_preflist("progski", "GET").

In both cases either rts2 or rts3 stepped in for the missing rts1. Also, in each case, one of these vnodes is going to return not_found since it’s a fallback. I added another debug function to determine which one.

(rts2@> rts:get_dbg_preflist("progski", "total_reqs", 1).
(rts2@> rts:get_dbg_preflist("progski", "total_reqs", 2).
(rts2@> rts:get_dbg_preflist("progski", "total_reqs", 3).
(rts2@> rts:get_dbg_preflist("progski", "GET", 1).
(rts2@> rts:get_dbg_preflist("progski", "GET", 2).
(rts2@> rts:get_dbg_preflist("progski", "GET", 3).

Notice the fallbacks are at the end of each list. Also notice that since we’re on rts2 that total_reqs will almost always return a single value because it’s fallback is on another node whereas GET has a local fallback and will be more likely to return first.

Conflict Resolution & Read Repair

In the next post I’ll be making several enhancements to the get coordinator by performing basic conflict resolution and implementing read repair.


Introducing Riak Core

July 30, 2010

What is riak_core?

riak_core is a single OTP application which provides all the services necessary to write a modern, well-behaved distributed application. riak_core began as part of Riak. Since the code was generally useful in building all kinds of distributed applications we decided to refactor and separate the core bits into their own codebase to make it easier to use.

Distributed systems are complex and some of that complexity shows in the amount of features available in riak_core. Rather than dive deeply into code, I’m going to separate the features into broad categories and give an overview of each.

Note: If you’re the impatient type and want to skip ahead and start reading code, you can check out the source to riak_core via hg or git.

Node Liveness & Membership

riak_core_node_watcher is the process responsible for tracking the status of nodes within a riak_core cluster. It uses net_kernel to efficiently monitor many nodes. riak_core_node_watcher also has the capability to take a node out of the cluster programmatically. This is useful in situations where a brief node outage is necessary but you don’t want to stop the server software completely.

riak_core_node_watcher also provides an API for advertising and locating services around the cluster. This is useful in clusters where nodes provide a specialized service, like a CUDA compute node, which is used by other nodes in the cluster.

riak_core_node_watch_events cooperates with riak_core_node_watcher to generate events based on node activity, i.e. joining or leaving the cluster, etc. Interested parties can register callback functions which will be called as events occur.

Partitioning & Distributing Work

riak_core uses a master/worker configuration on each node to manage the execution of work units. Consistent hashing is used to determine which target node(s) to send the request and the master process on each node farms out the request to the actual workers. riak_core calls worker processes vnodes. The coordinating process is the vnode_master.

The partitioning and distribution logic inside riak_core also handles hinted handoff when required. Hinted handoff occurs as a result of a node failure or outage. In order to assure availability, most clustered systems will use operational nodes in place of down nodes. When the down node comes back the cluster needs to migrate the data from its temporary home on the substitute nodes to the data’s permanent home on the restored node. This process is called hinted handoff and is managed by components inside riak_core. riak_core also handles migrating partitions to new nodes when they join the cluster such that all work continues to be evenly partitioned to all cluster members.

riak_core_vnode_master starts all the worker vnodes on a given node and routes requests to the vnodes as the cluster runs.

riak_core_vnode is an OTP behavior wrapping all the boilerplate logic required to implement a vnode. Application-specific vnodes need to implement a handful of callback functions in order to participate in handoff sessions and receive work units from the master.

Cluster State

A riak_core cluster stores global state in a ring structure. The state information is transferred between nodes in the cluster in a controlled manner to keep all cluster members in sync. This process is referred to as “gossiping”.

riak_core_ring is the module used to create and manipulate the ring state data shared by all nodes in the cluster. Ring state data includes items like partition ownership and cluster-specific ring metadata. Riak KV stores bucket metadata in the ring metadata, for example.

riak_core_ring_manager manages the cluster ring for a node. It is the main entry point for application code accessing the ring, via riak_core_ring_manager:get_my_ring/1, and also keeps a persistent snapshot of the ring in sync with the current ring state.

riak_core_gossip manages the ring gossip process and insures the ring is generally consistent across the cluster.

What’s the plan?

Over the next several months I’m going to cover the process of building a real application in a series of posts to this blog where each post covers some aspect of system building with riak_core. All of the source to the application will be published under the Apache2 licensed and shared via a public repo on github.

And what type of application will we build? Since the goal of this series is to illustrate how to build distributed systems using riak_core and also satisfy my own technical curiosity I’ve decided to build a distributed graph database. A graph database should provide enough use cases to really exercise riak_core while at the same time not obscuring the core learning experience in tons of complexity.

Thanks to Sean Cribbs and Andy Gross for providing helpful review and feedback.