A Bluffers Guide to CRDTs in Riak (2013)

Riak 1.4 included counters. This was a break from the norm in
Riak. We’ve always said: “Your data is opaque to Riak”, but with
counters that isn’t true. Riak knows what you’re storing against a
counter key, and how to increment it and decrement it. You tell Riak
this with the counter API. You never fetch, mutate and put a counter,
you just say: increment by 5 or decrement by 100. You never send a
vclock and most importantly, Riak knows how to merge concurrent writes
to a counter. You never see siblings for a counter, you always see a
single value. No writes are lost and eventually, the counter will
reflect all writes and reach a consistent value.

More datatypes, please

Counters are OK, but you can’t build many application on just
counters. For Riak 2.0 we’ve add some more data types. We believe with
the addition of these data types you can model many applications’ data
storage needs with greater simplicity, and never have to write sibling
merge functions again.

So, What Are CRDTs?

When we’ve talked in the past about adding data types to Riak we’ve
spoken about CRDTs. CRDT stands for (variously) Conflict-free
Replicated Data Type, Convergent Replicated Data Type, Commutative
Replicated Data Type, and maybe others. The key, repeated phrase shows
that we’re dealing with Replicated Data Types.

Replication is normal for Riak. It is what the N-Value defines. Data
Types are pretty common in computing. Sets, Bags, Lists, Registers,
Maps, Counters…etc. That leaves us with the C to deal with.

Conflict Free lala – Join Semi Yada

State based, or Convergent CRDTs were thought up 15 years ago by
Carlos Baquero and Franciso Moura[1] as an application of something
called a Join Semi-Lattice: A triple of a Partially Ordered Set, a
Least Element (Bottom hehehehehe) and a function that produces a Least
Upper Bound. This function must be idempotent, associative, and
commutative and when applied to two Sets returns a merged, or least
upper bound Set that is also an instance of the Partially Ordered Set.

But what does that mean, and how is it useful?

What They Mean to Me? – Opaque No More!

Riak is an eventually consistent system. It leans very much toward the
AP end of the CAP spectrum. We achieve this Availability with things
like sloppy quorum writes to fallback nodes. However, even without
partitions and many nodes, interleaved or concurrent writes can lead
to conflicts. Traditionally Riak keeps all values and presents them to
the user to resolve. The client application must have a deterministic
way to resolve conflicts. It might be to pick the highest timestamp,
or union all the values in a list, or something more complex. What
ever it does, it is ad hoc, and created specifically for the data
model and application at hand. But it must look just like that LUB
function of the Join Semi-Lattice: It must be idempotent, commutative,
and associative.

Conflict Free? – Free from Conflicting Values!

In Riak, the Conflict-free C is kind of a lie. There’s still conflict,
it is just that the resolution is part of the data types’ design. Of
all the C’s above, Convergent is the one that matters to us. The data
types we’ve created for Riak converge automatically, at write and read
time, on the server. If a client application can model its data using
the data types we provide, they will never see sibling values or need
to write ad hoc, custom merge functions.

Why Do I Need Them?

If you’re storing data in Riak, and you have allow_mult set to
true, then you need to handle conflicting writes. If you have to
handle it in your code you have to either simplify your data model or
code complex merge functions.

Example?

The classic example from the dynamo paper[2] is the Amazon shopping
cart. A single user adds two items to their cart, both adds hit
different partitioned servers. There are now two carts one with item
A and one with item B. The merge logic is simple: union the carts
to get a single correct value. But what of removes? How can you tell
if item A is absent from cart two because it was added by A and
not yet seen, or removed from cart two and cart one was unaware?

Maybe your application is slightly more complex, you need to add a
count of how many of each item are in the cart. Now your merge
function has to decide when presented with 6 instances of “hairbrush”
in one sibling and 4 in another if the user added 6 and then removed 2
or added 4 and then added 2 more. Maybe you start to add
tombstones. Or record operations (like StateBox.) But quickly it gets
complex. And what if you need to record more information, or evolve
your data model, your merge function must grow, adapt, work on
different versions of your data? It becomes very complex very quickly.

And every new data model needs new ad hoc merge functions And when you
come to store some other data in Riak, say user profiles, you need to
start from scratch building up your data model and merge logic.

Primitives

When modeling an application’s domain in a programming language
developers are used to composing state from a few primitive data
types, like Sets, Maps, Registers, Integers, Booleans etc. Data Types
in Riak give the developer back that power and expressivity, and
relieve them of the burden of designing and testing deterministic
merge functions. The key is that the data is no longer opaque to
Riak. When you use the Data Types API Riak “knows” what type of thing
you are storing and is able to perform that semantic merge for
you. Riak already detects conflicts, with Data Types it is able to
merge them too.

When reading a Data Type value you will only ever see a single
value. That value is still Eventually Consistent, but it will be as
correct as it can be given the amount of entropy in the database, and
when the system is quiescent, all values will converge on a single,
deterministic, correct value.

What do you have?

We have some Data Types that you can store against a Key in Riak, that
we’re calling Top Level Types.

Top Level

  • Counters: As in Riak 1.4.

  • Sets: Which are collections of things. In Riak we expect you to
    store binaries, which is how we encode Strings of text in Erlang. The
    kind of thing you’d store in a Set might be the members of a team or
    department, followers on social network, or maybe objects in some real
    world collection.

  • Maps. A Map is way to compose Data Types into a richer, more complex
    structure. A Map is a collection of fields. A field is a name and Data
    Type pair. This is so that we don’t have to deal with merging fields
    of a different type. If two fields with the same name but different
    types are added to a Map, then they’re two different fields. You may
    only store Data Types in a Map.

Second Level

You can store any of the top level types in a field in a Map,
including a Map. And we’ve also added:-

  • Registers: A binary value. It might be an email address, or a
    first name
  • Flags: A Boolean.

Semantics

The semantics for each of these data types differs from their regular,
linear counterparts. Though we think the semantics we have chosen are
the most intuitive and useful, and least surprising.

EC is still EC

Since we’re storing our data types in Riak, (in riak_objects even) all
the trade offs of Eventual Consistency apply (except of course
conflict resolution.) That means that the Counters are not for
creating unique, ordered IDs. And Sets and Maps do not have the atomic
and blocking operations of their Redis counterparts.

Non-idempotent Counting

Nothing has changed for counters. They’re still not idempotent. If
Riak returns an Error for a counter operation, it may have only
partially failed, and a retry may lead to a double count. However,
adding an element to a Set, or a Field to a Map is idempotent.

Add Wins

The semantic we’ve chosen for the Set and Map is “Add Wins”. The
literature also calls this “Observed Remove” but that is an
implementation detail of how the Add Wins (and I’ll cover it below.)

Set

When any pair of operations on a Set are concurrent, and one adds an
element, while the other removes it, the add wins. If the remove
causally follows the Add, then the Remove is effective. Concurrent
operations on different elements work as you’d expect.

Map

The Map borrows its behaviour directly from the Set. Except that every
time you update the contents of a field (say increment the counter in
the “likes” field, or add a buddy to the “follows” field) then that
counts as “adding” the field. This way a concurrent removal of a field
with an update to a field will see the update winning. Add wins again.

The difficult to answer question is what should the value of a field
be when it is concurrently updated and removed. The answer is that the
update wins, and the field remains, and it’s value is that of all
surviving replicas’ merged.

Say a counter in a Map field is incremented to 5 at replica A and
replicated to B and C. Concurrently the counter field is removed from
A and incremented by 3 at C, the merged value will be 8. That is to
say the remove at A does not reverse all A’s previous actions.

There is something surprising and imperfect about this semantic. If A
had incremented the counter by 2 after it was partitioned from C but
before it removed the counter field, that update is lost. Only the
values that B and C have seen for A will remain. Removes are tricky.

There is another odd edge from removes that may also be
surprising. Imagine that concurrently Replica A coordinates a removal
of a Set Field from a Map while Replica B coordinates the removal of
all elements from that same Set. As per the rules above, field updates
count as “Adds” (for the add wins semantic) so the Field remains in
the Map, albeit as an empty Set.

Registers and Flags

These Map only types have simple semantics. The Register is Last Write
Wins, using a timestamp on the node handling the write. All the
caveats about clock synchronization therefore apply. Flags start out
Off, and you can turn them On. For any pair of concurrent, conflicting
operations (On | Off) On wins. Again, that same Add Wins semantic.


How Do I Use Them?

More on this later, but all additive operations (that is everything
except Set member and Map field removal) can be performed by simply
sending operations to Riak. Not the usual Get, Modify, Update
cycle. This “action at a distance” was introduced in Riak 1.4 with
counters and extends to the new data types.

Client API

There are two APIs as ever, PB and HTTP. As of time of writing the
HTTP API is unfinished so I’ll talk about the PB API. Assume that
there will be parity.

The API allows you to specify operations to be performed on Data Types
at a replica in Riak. For counters you may only send a single
operation, “increment” with an amount (negative for decrement.)

Sets

You may send a list of operations. The list may contain both Add ElementX and Remove ElementY operations. If you are removing
elements we strongly recommend that you first fetch the Set and it’s
context, and send the context with the remove operation(s.)

Why? See below.

All operations are executed atomically at the coordinating replica. If
any operation in the list fails (only removes can fail!) then none of
the operations are applied.

Maps

You may send a list of operations. These are either field operations,
or field update operations. Field operations Add or Remove fields from
the Map. I find it helps to think of the Map as a schema for a (JSON
like?) document. Field operations alter the schema of the Map.

Field Update Operations act on the data stored in the Map. You may
send any number of operations batched together. You may mix Field
Operations and Field Update Operations.

For example, if you model Game State as a Map for a particular user
and game. You could send an operation when the user starts a game that
creates a Map, adds fields of Counters for points and lives, a Set
for achievements unlocked, and a Map for inventory that contains two
Sets (armor and weapons) and a Counter for Hit Points.

As the game is played operations that update multiple Counters, add
and remove elements from Sets and so on can be sent as batches that
execute atomically at the coordinating replica. This does not suggest
that you can enforce co-invariants between values in the Map.

What counts for Sets counts for Maps, we strongly recommend you send a
context with any batch of operations that contain a Field or Set
element Remove, no matter how deeply nested in the Map.

If you are only updating Fields in the Map then you do not need to
fetch first, and you do not need a context, you may just send
operations.

You do not need to explicitly create a field. Updating a field that is
not present at the coordinating replica will create and update the
field. For example, adding 10 to the Counter in Field <<”gold”>>
in the Map at key Game1 will create the field if it is not present,
and then increment by 10.

Context For a Remove

Why the context? Two reasons, the first is simple:

We don’t allow you to remove something from a Set / Map that is not
there. Since there is no guarantee that the replica coordinating your
remove operation(s) contains the value(s) you want to remove (imagine
an empty fallback spun up to accept the request) the context “seeds”
the handling replica with the values you’ve seen. If you don’t send
the context, and the replica doesn’t have the value(s) you want to
remove, the operations fails with “precondition failure” error. A
precondition of removing an element or Field is that it is present.

The second reason is more subtle and is going to need some
implementation specifics, which I’ll cover later, to really
understand. At this point it is enough to say that without a context
for a remove, you may remove more than you planned to. The “Add Wins”
semantic is based on “Observed Remove”, which means only remove that
which you have seen. The context tells the replica handling the
operation what you’ve seen. If an “Add” for the element you want to
remove was handled or seen by the replica after you sent your remove,
and there was no context, the remove would win over the concurrent
add. There maybe times you want this, but in general, use the context
for removes.

The context is a compact binary encoding of the Set or Map. We hope to
minimize it further in future releases.

What Do They Cost?

The main cost is that the Data Types take up space. There is some
computational cost to the merge functions, that will be performed on
your Riak servers, rather than in your client application. We have yet
to measure this.

How Big?

We store the Data Types is riak_objects. This is so they play nice
with all Riak’s systems, like AAE, Enterprise Multi Data Center
Replication, read repair etc. So off the bat, we have the overhead of
a riak_object.

The Data Types themselves are at least as big as what they contain,
plus a version vector, plus some Dots (see below.) We’ve tried to
keep them small with an efficient binary representation (and we’ll
keep improving on that), but they are larger than one might first
imagine.

Counter

Expect: a single integer.

IRL: A version vector with two integers per actor that has coordinated
an increment. Each actor is 8 bytes. Expect at least N-val actors.

Set

Expect: The sum of the size of its members.

IRL: The sum of the size of its members, plus a version vector, and a
minimal version vector (at most the size of the Set vector, typically
one {actor, count} pair) per member. The size of the version vector
depends on N-Val, MDC, Cluster Churn etc. Again, 8 bytes per actor,
though we only store each actor once.

Map

Expect: The sum of the size of the keys plus the sum of the size of
the values.

IRL: Each key is a pair of {name, int} where int maps to a module
that implements the Data Type of the field. Each member also has a
minimal clock (as for the Set.)

What Can’t They Do?

  • No invariants.
  • Counter increments aren’t idempotent.
  • You can’t store non-CRDT values in the Maps.
  • No 2i, riak_search or Yokozuna as of 2.0pre5.
  • No JavaScript MapReduce over CRDTs as they’re a binary format.
  • Can’t cook your dinner.

You know enough to use Riak’s Data Types now. If you really want to
know how the sausage is made, read on.

How do they work underneath?

It is worth remembering at this point that Vnodes are the unit of
concurrency in Riak. Whenever I say Actor I mean Vnode and Vnode Id. Correctness in Riak depends on individual Actors acting serially.

Counter

The Counter is a CRDT called a PN-Counter, where P is Positive and N
is Negative. It is a list of triples of {actor, positive, negative}
where the value is the difference between the sum of all positives and
the sum of all negatives. An actor may only update its own entry in
the list. When two counters merge we take the maximum of positive and
negative for each actor. When an actor is only in one counter, we just
keep its value in the merged counter.

Flags

Flags are logically equivalent to a Set that can only contain one
element. Whether the element is present or absent is equivalent to
whether the flag is On or Off, respectively. The same “Add Wins” /
“Observed Remove” behavior applies, except with Flags we call it
“Observed Disable”. To the user Flags look like Boolean values.

Registers

Registers are a pair of {value, timestamp}. They converge on the
highest timestamp. Much like a value in Cassandra. They require well
synchronized clocks. When two registers merge, the pair with the
highest timestamp is the merged value.

Sets (and Maps)

The crucial part of any convergent, or state based CRDT is its merge
function. The merge function is the LUB of the Join-Semi-Lattice, and
it is what defines the semantic of the data type, as well as being a
generalization of all those ad hoc conflict resolutions customers
might have had to write.

The merge function for an optimized OR-Set[3] is pretty simple for
any elements in both sets: they’re in the set. The difficulties arise
when an element is only in one of the two Sets being merged.

The Problem of Absence

When two replicas merge, and one contains an element in its Set that
the other does not, why is it there? It can either be:

  1. The element was added to one replica and the other is yet to see it
  2. The element was once in both replicas but one has removed it
    already.

We need to know why an element is only in one set to arrive at a
correct merged value. As ever in these things, causality to the
rescue. It stands to reason that if the element was once in Set A but
is no longer, it was removed. We could store a tombstone value for the
removed item, but that means our sets never get smaller. A Set with
one member that once had 100 members, would be the same size as a Set
with 100 members.

Instead what we do is attach a version vector to the Set, and every
time an element is added to the Set, we increment the entry in the
version vector for the replica that added the element. We also store
the {actor, count} pair that results from the increment against the
element (which I’m going to call the Dot[4] from now on.) If there
is already a Dot associated with the element we keep that too. What we
end up with looks like
[{actor1, count}=Dot1, {actor2, count}=Dot2,…], which is a version
vector, but it is a minimal clock, that stores only the Dots or
events when the element was added. Note that the version vector
attached to the whole set will always dominate all the minimal clocks
for all elements.

When an element is removed from the set we simply remove the Element
and its minimal clock.

Now, when a merge occurs we compare the two sets. We take all the
elements that are in Set A and not in Set B and compare their
minimal clocks to Set B’s set version vector. Every element whose
minimal clock is dominated has been removed from Set B, and does not
make it into the merged set. As a slight optimization, we also drop
any dots from the minimal clock that are dominated by Set B’s
clock. This keeps the minimal clock minimal. You can think of it as
subtracting the minimal clock from Set B’s set version vector, if any
Dots are left, the element is in the merged set with those remaining
Dots as the new minimal clock.

We repeat the process the other way, comparing all Set B’s elements
that are not in Set A to Set A’s set version vector.

We keep all elements that are in both Sets, merging their minimal
clocks. Finally we merge the two set version vectors to ensure the
property that the Set version vector always dominates all minimal
clocks is maintained.

This kind of fine grained causality tracking is very closely related
to Dotted Version Vectors[4]

Sets are implemented as a version vector, and a dictionary of mappings
from element -> minimal clock.

Maps

As far as implementation goes, Maps are just like the Set described
above. They use the same Map version vector, and a minimal clock per
entry to decide what to do with a Field that is only on one side of a
merge.

The main difference is of course when an element is in both Maps: we
call the Data Type’s merge function to get a single, convergent
value. Conceptually merging two Maps is the same as merging two Sets
of Fields, and then calling merge on all common Fields’ values.

[1] Specication of Convergent Abstract Data Types for Autonomous
Mobile Computing. Carlos Baquero Francisco Moura
http://gsd.di.uminho.pt/members/cbm/ps/scadt3.pdf

[2] Dynamo: amazon’s highly available key-value store
http://dl.acm.org/citation.cfm?id=1294281

[3] An Optimized Conflict-free Replicated Set Annette Bieniusa et al
http://arxiv.org/pdf/1210.3368.pdf

[4] Dotted Version Vectors: Logical Clocks for Optimistic Replication
Nuno Preguiça, Carlos Baquero, Paulo Sérgio Almeida, Victor Fonte,
Ricardo Gonçalves http://arxiv.org/abs/1011.5808

Read More

262588213843476