Tuesday, August 28, 2018

Design of Data Intensive Applications - 2 Replication

One of the concern in the design of data intensive applications is replication
Why ?

For distributing data geographically to reduce latency
For fault tolerance — if one node dies, the replica can serve as the db
For faster reads and scale out— reads can be done on another machine and reads can be done on another.
Different Types of Replication
Single Leader:
All the writes are routed to the single leader and the leader will send the changes to the followers.

In this scenario there can be two ways to replicate

synchronous : The leader waits for all followers to confirm that they got the update before committing

If looking for synchronous replication, if 1 node goes down, the whole system grinds to a halt.
To overcome this problem, mostly when we say synchronous, we will just have synchronous update to 1 follower and rest all will be asynchronous. This way we have all the updates on atleast two nodes. This is called semi-synchronous
asynchronous : The leader just sends the updates to the followers but doesn’t wait for them to be complete

In this case if the leader fails, all the writes that are not yet replicated to the followers are lost. This means writes are not guaranteed to be durable even if confirmed to the client.
Advantage is that the leader can process writes even if all the followers have fallen.
Setting up a new follower :

Take a consistent snapshot of the leader at some point of time.
copy the snapshot to the follower node.
The follower connects to the leader and asks for all the changes made after the time snapshot was taken. The leader knows the position in the replication log when the snapshot was taken.
The leader sends the new updates from that point on and when all the changes are processed, the follower is caught up.
Handling node outages:

Follower recovery:

If a follower needs to reboot or recover from failure, it can do so easily if it maintains a log of all the changes it has seen from the leader. When it comes back online, it can request the leader to send all the changes that have taken place after that point and get caught up.

Leader failure — failover:

When a leader fails then the following have to happen

A new leader has to be elected
Clients have to know that this is the new leader and they should send writes to this.
Other followers should get updates from this leader.
To determine leader has failed, there is no foolproof way as there are many things that could potentially go wrong. Power failure, network failure, crashes etc. All the nodes in a cluster send messages back and forth and if a node doesn’t respond to any messages from 30 seconds, it is assumed that the node failed.

Choosing a new leader this can be done with an election(where a new leader is chosen based on the remaining replicas) . The best candidate is the one with the most recent changes.

Reconfiguring system to use new leader : Clients needs to be routed to the new leader. using any of the routing methods. Also if the old leader comes back up it should be made known that it is not the leader and it should accept updates from the leader.

When new leaders comes online, it might not have all the writes made to the old leader and it might get inconsistent writes. It can only ignore the old writes as it doesn’t know which violate the client’s durability requirements.
Sometimes there will be a split brain situation where both old and new leaders consider them to the leaders and both will get writes. In this case the data becomes inconsistent so in this case, one has to shut down carefully.
The timeout to determine the leader failure should also be done carefully as if it is too short then it will cause unnecessary failures (especially when system is in high load), otherwise the system might become unavailable.
Replication Log Implementations:

Statement based replication:

Here each statement is replicated (INSERT, UPDATE, DELETE etc).

Pros: Simple to send updates

Cons:

If any non deterministic functions like rand(), now() etc are used, then the values will be different for each replica. This can be fixed by replacing the value of the function before sending the updates to replica.
If the statement use an autoinc column, then the order has to be same on all replicas or otherwise the values will be different.
If the side effects with triggers, sps and functions are non deterministic, it will cause problems
Write Ahead Log shipping

Write ahead log is an append only sequence of bytes containing all writes to the database. WAL contains the details of which bytes were changed at which disk blocks.

Pros:

Since it is exact bytes, there are no non deterministic problems and the follower will have the exact values.

Cons:

Since it is at low level, it is tightly coupled to the database engine. If the version changes how things are stored or if we want to go to a different storage engine it will not work.

Logical Log Replication
In this replication, we use different log formats to replicate. It does not use the storage engine’s physical representation.

For an inserted row, it contains all the new columns

For a deleted row, it will contain enough information to uniquely identify the row that was deleted. It would be the primary key but if there is no primary key, then values of all the columns will be logged.

For an updated row, it will contain the identifier and the values of all the changed columns.

Cons:

A transaction that modified multiple rows will create multiple logs and a final record saying the transaction was committed.

Pros:

This format is easy to parse by other applications also so this is used to send data to other applications like datawarehousing, for building custom indices and caches etc. This is called change data capture

Trigger based replication
Many databases have triggers where specific functions or stored procedures can be called when data changes. These can be used for replication also. It has more overhead than the database’s default replication.

Problems with Replication Lag
When you write to a leader and read from a follower whose data was replicated asynchronously, you might read older data because the latest write from the leader was not replicated yet. The data will be consistent eventually when the writes stop, but the eventual could be anytime between few seconds to a few minutes. This is called replication log. These are the problems with replication lag

Read your own writes:

One client wrote something to the database. Since it was a write request it went through the leader. Then it tried to read the record. Since it was a read request it was sent to the follower which didn’t have the latest write. So it was not able to read the data that it wrote.

To overcome this, we need to read-your-writes consistency or read-after-write consistency. We can achieve this by either of these

When reading something user may have modified, read from the leader else the follower. For e.g. a user can edit data on his profile, but not on other people’s profiles. So load user’s profile from the leader but other people’s profile from the followers.
But if it is a write heavy application, then we have to read a lot from the leader and it will negate the advantages of read scaling where you read from the followers. So in this case whenever a client writes to the db, for the next minute or so, read from the leader, then from the follower. You can also monitor the replication lag and prevent any queries to the followers which are lagging more than one minute
The client can provide the timestamp of the most recent write and if it queries a follower which is not up to date on the writes, it can either query another follower or wait till it gets the updates till that timestamp to return the data.
If the replicas are geographically distributed, any request that needs to be served by the leaders should be routed to the data center that contains the leader.
There are more issues to consider if we want to provide cross-device read after write consistency.

If the user updates some data on the desktop and wants to see the changes on the mobile, it is difficult to implement the timestamp based query as there are multiple devices. That metadata needs to be store in a central place.

Montonic Reads :
If you read once from one replica and later from another replica which is lagging more than the first replica, it will appear as if some updates are lost or as if we are moving back in time. For e.g. if user is seeing comments on a blog post and he sees three comments the first time and then sees only two comments, it will cause confusion. Monotonic reads is a guarantee that this will not happen

One way to do that is to make sure one users reads always go to the same replica. The replica can be chosen based on the hash of the users id. But if that replica is down, then the request need to be routed to a different replica.

Consistent Prefix Reads :
Sometimes when replicating something, the causal order between the events will be lost. For example when there are two events 1. A asks a question and 2. B answers the question. If a replicas sees 2 before 1, it will be very confusing as the client will see the answer before the question. To avoid this we can use the versions & version vectors to order the events. Each read request will send the version of the event it has and the client will send the version it is updating. Then it is possible to merge the responses preserving the causal order.

If eventual consistency doesn’t work for your system, then design for stronger guarantees
Transactions are good to overcome most of these problems but they are ignored as not being performant and cause availability issues.

Multiple Leader
In the single leader application, if the leader goes down the application is unavailable for some time. To overcome this we have failover. Another option is to allow multiple nodes to accept writes. It doesn’t make sense in a single datacenter but it makes sense to use multiple leaders in different datacenters, basically to have one leader per datacenter.

performance improves as now there are two leaders for writes
If one datacenter fails, the writes can still work from the other datacenter
A temporary disconnection between two datacenters with async replication will not cause issues as each leader can still take requests
Big disadvantage is that there will be write conflicts for concurrent writes on the two leaders that need to be resolved.
Use cases:

Clients with offline capabilities can be thought of as multi leader databases. Where they sync when network is available and keep the changes isolated when not connected. CouchDB is made for this mode of operation.

Collaborative editing like Google Docs also can be thought of a multi leader databases and it is a conflict resolution problem to resolve concurrent edits.

Conflict Resolution:

Avoid conflicts by locking down different parts or do the edits sequentially. but this is like the single leader case and not that useful here.
Last write wins. Give each write a monotonically increasing number and whichever has the highest number will win. But this will cause data loss
Merge: Sort the values and merge them. Show both values.
Let the client deal with it. Store both the values and let the user resolve it. CouchDB stores the conflict in a ds and the application will prompt the user to resolve it and the final copy is written to the db.
Algorithms for automatic conflict resolution

CRDTS : Conflict free replicated datatypes are a family of datastructures of sets, maps, ordered lists, counters that can be concurrently edited by multiple users, which automatically resolve conflicts in sensible ways.

Mergeable persistent data structures : These track history explicitly like git vcs and use a three way merge function (CRDTS use two way merge)

Operational Transformation is used to implement the collaborative editing design. It was designed for concurrent editing of an order list of items like the characters of a text.

Circular, Star and All-to-all topologies can be used for mutli leader replication.

Leaderless Replication
In this there is no concept of leader and each replica can accepts writes and reads.

Here a request is sent to multiple replicas and they will all apply the write and once a configured number of replicas successfully write it, the write is successful. All the replicas eventually get the write.

Similarly a read request is sent to multiple replicas and it gets the data from a configured number of replicas. The client can examine the responses and take the latest version.

Riak, Voldemort, Cassandra and Amazon Dynamo work on leaderless replication.

If one replica goes down, if there are still configured number of replicas available, the write will be successful. But when the failed replica comes back up it will have missed the write. If the client tries to read, it will send request to multiple replicas and all of them return a value to the client. The client will get different values from different replicas. The client can use the version numbers to determine which is the latest.

It can also identify which replicas are lagging and can send an update request to them (this is called read repair) .

Some databases have some background process which compares the data on replicas and automatically copies the missing data to the replicas. This process is called anti entropy and it will change the order of the writes etc.

Quorums for reading and writing

If we have n nodes, then we can consider the write to be successful if w nodes confirm the write. While reading, if we can get the value from r nodes, it will be considered successful. As long as w+r > n we can be sure that in those r nodes atleast one has the latest value. So w and r can be considered the minimum number of votes for the writes and reads to be valid.

Problems With Quorum:

Sloppy Quorum & Hinted Handoff :

If there are just n nodes, then in any failure the system will become unavailable. So in a system like Dynamo, what happens is when there is a network partition, writes will be done to nodes outside the top n preferred nodes, and those nodes may be different from the first N nodes encountered while walking the consistent hashing ring. This is called sloppy quorum. But when it writes to the nodes different from the preferred nodes, it will add some metadata to those nodes specifying which original nodes are the writes for. When those nodes come back up, the extra nodes will send the data to the preferred nodes. This is called hinted handoff.

So before the hinted handoff, some consistency is lost.

When two writes happens conurrently it is not possible to know which happened first

Also when restoring data on a failed node from an older backup, the new writes are lost.

Not Linearlizable:

It doesn’t preserve order as if there was only one database.

Sometimes there could be edge cases depending on the timing. Say there is a write with a quorum of 3 (w=3) and read with quorum of (r=2)

Say writer one wrote to Replica 1, 2, 3 replacing a value of variable from 0 to 1.

The write to replica 3 was finished but the write to 1 and 2 is not yet done.

A reader 1 comes to read at this time. The reader sees value 1 from replica 3 and 0 from replica 2.

Next a reader 2 reads from replica 1 and 2. It only sees value 0. So this transaction is not linearlizable (as if the transaction happened linearly)

This can be solved by the reader 1 doing read repair before sending the results to the application.

Leaderless replication is also suitable for multi datacenter replication as it tolerates conflicting writes, network interruptions and latency spikes

We can use versions and version vectors to deal with concurrent updates and conflicts

Source: https://www.amazon.com/Designing-Data-Intensive-Applications-Reliable-Maintainable/dp/1449373321

No comments:

Post a Comment

Comments