Tuesday, September 18, 2018

Design of Data Intensive Systems - Part 5 Consistency & Consensus


Consistency Guarantees

Linearizability

make the system appear as if there is only one copy of data and all operations on it are atomic. It means all clients must be able to read the latest data once it is written. It is a recency guarantee

Linearizability is useful in

Locking and leader election
constraints and uniqueness guarantees (single up-to-date values which all nodes agree on)
When there are cross channel timing dependencies.
Implementing linearlizable systems

Single leader replication can be made linearlizable. But there could be stale replicas and split brain scenarios with the leaders and hence not completely linearlizable.

Multiple leader replication cannot be made linearizable as it needs conflict resolution which means there is no single copy of data

Leaderless replication can be made serializable if the reader performs read repair before returning and writer must read the status of a quorum of nodes before sending the writes. But in practice most of the leaderless system do not provide linearizability.

If a application requires linearizability it will not be available to some replicas during a network partition. It was work with the old replicas but then it is not linearlizable. And linearlizable systems are not that performant as the read and write timings are atleast proportional to the network delays.

Consensus alogorithms are linearizable. They have a way to avoid the split brain and stale replicas.

Linearizable systems are slow always not just during network faults. So latency sensitive systems might make some tradeoffs

Ordering Guarantees


Ordering is important to provide causality. In a single leader replication, all the writes go through the leader to determine the order of writes. In a serializable transaction, the writes are processed in a serial order. Timestamps and clocks are also an attempt to order the writes.

If a happens before b, then a could have caused b. So the causal order is a -> b. If a system obeys the causal order, it is said to be causally consistent. If you read something from database, if you see a piece of data, you should be able to see any data that precedes it.

Linearizable system maintains total order as it behaves as if there is only one piece of data

Two operations are consistent if they happen at the same time. If two events are ordered if they are causally related, but they are incomparable if they are concurrent. So causality defines a partial order not a total order

Which means there are no concurrent operations in a linearlizable datastore.

Generally, the causal ordering is sufficient for most applications. So to provide causal ordering, we can use monotonically increasing sequence numbers to order the events. When it is single leader it is easy, but when there are multiple nodes processing, there are ways to do the sequence, but that sequence doesn’t provide causality.

One way to generate sequence numbers that provide causual ordering is Lamport Timestamps. Here each node will generate sequence numbers and in addition to that it will also have the node id It will be a pair of (counter, node id). It can provide causal order in the following way. Each client will always send the maximum counter it saw among all nodes in each request. So say a node 1 has a counter of 5 and node 2 has max counter of 1. A client got a request from node 1 (with counter 5) and it sent a request to node 2 ( with counter 1), node 2 will see that and increment its counter to 6. That way it provides total ordering of all the operations .

But these timestamps cannot solve the problem of unique constraints as we will know the total order after the fact. When a system is trying to generate a unique username, then it needs to know if it is unique or not and if other systems are already processing that username. In additional to knowing total order, we also need to know when it is finalized. This idea of saying when a total order is finalized is called total order broadcast

Total order broadcast can be used to solve the uniqueness problem.

Write a username request into the log using a compare and set operation.
Read the log
If your request was the first request when you read, then you can commit the transaction. If some other’s node request was the first request in the log, then you abort the transaction as the username is already taken.
The implementation of the atomic compare and set operation requires consensus if multiple nodes request the same username.

If we have a linearlizable increment and get register, then it is possible to have total order broadcast. Each message that needs to be sent via total order broadcast has to get a sequence number from the linearizable register and attach it to the message and send it to all nodes. If a node delivers a message with sequence 4 and it gets a message with sequence 6, it knows that it has to wait for sequence 5 before it can deliver 6. For lamport timestamps, the counter may have gaps for e.g. if a node sees 6, for the next message it will send 7, but it will not wait for 5, but with total order broadcast, it has to wait for 5.

Implementing linearizable compare and set or total order broadcast both require consensus.

Consensus is required for

leader election to avoid multiple leaders and split brain. Consensus is required to select and agree on one leader.
Atomic commit — In a db supporting transactions on multiple nodes, if it fails on some nodes, then all the nodes should agree to commit or rollback on all nodes to maintain the atomicity of the transaction.

2 Phase Commit

For a transaction over multiple nodes, it is not possible to have an atomic transaction if each node can do its own commit/rollback process. It needs to go through a process of 2 phase commit. In this case, there is a coordinator to coordinate the transaction.

The application can send the changes across multiple nodes. Once the transaction is ready to commit, the coordinator starts phase 1 called prepare, where each node will prepare to commit by writing it in the log and when both the nodes are ok to commit, the coordinator will start phase 2, write the transaction in its logs and send the ok to both nodes. The both nodes then commit the transaction. If any of the nodes didn’t say an ok to the transaction in phase 1, then the coordinator aborts the transaction.

If any of the nodes or network fails during transaction, the coordinator aborts.

If the coordinator fails before confirming the phase 2, both the nodes will be waiting for coordinator to send the ok. Otherwise they cannot abort or commit because doing anything unilaterally will violate the atomicity. So it will block till the coordinator recovers and issues the commit or rollback.

There is something called 3PC (three phase commit) which assumes a system with bounded network delays and bounded response times. Basically if the crash of a node can be detected correctly, then 3PC can provide non-blocking transactions. But that cannot be assumed in general.

In practice 2 phase commit is implemented by distributed transactions XA (which is a coordinator api with which multiple applications interface). As discussed above during coordinator failure, there is a possibility for the locks to be held indefinitely. And either admin users have to resolve the locks or applications can use heuristics to unilaterally abort or commit.

XA can work with different applications, database and a message queue etc. If a transaction is within a homogeneous distributed system like a database, it can work well. but with heterogeneous distributed systems, are more challenging as they have to work with different technologies and cannot be optimized.

Disadvantages of 2PC

If a coordinator is not replicated and runs on single machine it become single point of failure
The application becomes stateful because it has to maintain the state in the coordinator log
Since it needs to work with disparate systems, it has only basic functions. I cannot detect deadlocks and doesn’t work with SSI (Serializable snapshot isolation)
For distributed systems, 2PC will amplify failures instead of helping to build fault tolerant systems.

Consensus

In consensus, one or mode nodes propose values and the algorithm decides a value. More formally the following properties must be satisfied

No two nodes decide differently
No node decides twice for same decision
If a node decides value v, then value v was proposed by some node.
Every node that doesn’t crash eventually decides a value.
Best known consensus algorithms are

View stamped replication VSR
Paxos
Raft
Zab
These algorithms decide a sequence of values which makes them total order broadcast algorithms. In each round the nodes propose what value they want to send next and decide the next message in total order.

These algorithms expect to work when a quorum of nodes are available.

To select a leader, we need consensus. When a leader is down, they start an election process which has an epoch number. When say an old leader from a previous epoch comes and tries to write something, the nodes which elected leader in a new epoch will not agree to it and hence the write fails. Basically it is a two step process. First a leader will be elected by consensus and when the leader wants to write something, again they need to agree and one of the node that participated in the leader election has to agree to this write. If any of the node knows of a different leader, it will not agree and hence quorum cannot happen and the write fails.

Limitations of Consensus:

The election process is like synchronous replication
It requires strict majority to operate
Since they rely on timeouts to detect failed nodes, transient network errors can cause frequent leader elections and terrible performance
Raft is particularly sensitive to network problems. If there is a weak network link, the leadership continually bounces and the system doesn’t work.

Membership and Coordination services:


Zookeeper and etcd are designed to hold small amount of data in memory. This data is replicated across multiple nodes through a fault tolerant total order broadcast algorithm

It has the following features

Linearlizable atomic operations
Using an atomic compare and set operation, a lock can be implemented. If multiple nodes try to set the value, only one will succeed. Distributed locks are implemented as leases which expires and eventually gets released if the client fails.

Total ordering of operations
When a resource is protected by a lock or lease, it needs a fencing token to prevent clients from conflicting with each other in case of a process pause. Zookeeperr implements this by totally ordering all operations and giving each operation a monotonically increasing transaction id called zxid and a version cversion

Failure detection
Clients maintain long lived session on Zookeeper servers. If there is a temporary failure or a zookeeper node goes down, the session is active. But if the heart beat fails for a more than a configured timeout, Zookeeper will expire the session and can be configured to release all the locks and leases held by that client. [These are called ephemeral nodes in zookeeper ]

Change Notifications
Clients can read the locks and values set by other clients and also watch for changes to them. Thus a client can find out when a new node joined the cluster and also when a node leaves the cluster (loss of ephemeral nodes). So the client can get these without polling by just subscribing to the changes.

Usecases

Allocation work to nodes - To identify a leader in a database scenario and also in a job scheduler scenario.
If there are lot of nodes, the Zookeeper doesn’t need to do leader election on all nodes, it can just work with 3 or 5 nodes, but can serve all the clients.

Zookeeper is useful to save only data which doesn’t change often. If you need to replicate application state, use another application called Apache BookKeeper.

2. Service discovery - In a cloud datacenter env, when a new service comes up, it can register with zookeeper so that its endpoints can be found by other services. It doesn’t need consensus but it can be useful for other services to find the leader since ZK already knows it.

3. Membership Services - A membership service determines which nodes are alive and are part of a cluster. A failure detection mechanism with a consensus mechanism can help determine which nodes are part of a cluster and which are not.

Source : https://books.google.com/books/about/Designing_Data_Intensive_Applications.html?id=zFheDgAAQBAJ

No comments:

Post a Comment

Comments