Tuesday, August 28, 2018

Design of Data Intensive Applications - 2 Replication

One of the concern in the design of data intensive applications is replication
Why ?

For distributing data geographically to reduce latency
For fault tolerance — if one node dies, the replica can serve as the db
For faster reads and scale out— reads can be done on another machine and reads can be done on another.
Different Types of Replication
Single Leader:
All the writes are routed to the single leader and the leader will send the changes to the followers.

In this scenario there can be two ways to replicate

synchronous : The leader waits for all followers to confirm that they got the update before committing

If looking for synchronous replication, if 1 node goes down, the whole system grinds to a halt.
To overcome this problem, mostly when we say synchronous, we will just have synchronous update to 1 follower and rest all will be asynchronous. This way we have all the updates on atleast two nodes. This is called semi-synchronous
asynchronous : The leader just sends the updates to the followers but doesn’t wait for them to be complete

In this case if the leader fails, all the writes that are not yet replicated to the followers are lost. This means writes are not guaranteed to be durable even if confirmed to the client.
Advantage is that the leader can process writes even if all the followers have fallen.
Setting up a new follower :

Take a consistent snapshot of the leader at some point of time.
copy the snapshot to the follower node.
The follower connects to the leader and asks for all the changes made after the time snapshot was taken. The leader knows the position in the replication log when the snapshot was taken.
The leader sends the new updates from that point on and when all the changes are processed, the follower is caught up.
Handling node outages:

Follower recovery:

If a follower needs to reboot or recover from failure, it can do so easily if it maintains a log of all the changes it has seen from the leader. When it comes back online, it can request the leader to send all the changes that have taken place after that point and get caught up.

Leader failure — failover:

When a leader fails then the following have to happen

A new leader has to be elected
Clients have to know that this is the new leader and they should send writes to this.
Other followers should get updates from this leader.
To determine leader has failed, there is no foolproof way as there are many things that could potentially go wrong. Power failure, network failure, crashes etc. All the nodes in a cluster send messages back and forth and if a node doesn’t respond to any messages from 30 seconds, it is assumed that the node failed.

Choosing a new leader this can be done with an election(where a new leader is chosen based on the remaining replicas) . The best candidate is the one with the most recent changes.

Reconfiguring system to use new leader : Clients needs to be routed to the new leader. using any of the routing methods. Also if the old leader comes back up it should be made known that it is not the leader and it should accept updates from the leader.

When new leaders comes online, it might not have all the writes made to the old leader and it might get inconsistent writes. It can only ignore the old writes as it doesn’t know which violate the client’s durability requirements.
Sometimes there will be a split brain situation where both old and new leaders consider them to the leaders and both will get writes. In this case the data becomes inconsistent so in this case, one has to shut down carefully.
The timeout to determine the leader failure should also be done carefully as if it is too short then it will cause unnecessary failures (especially when system is in high load), otherwise the system might become unavailable.
Replication Log Implementations:

Statement based replication:

Here each statement is replicated (INSERT, UPDATE, DELETE etc).

Pros: Simple to send updates


If any non deterministic functions like rand(), now() etc are used, then the values will be different for each replica. This can be fixed by replacing the value of the function before sending the updates to replica.
If the statement use an autoinc column, then the order has to be same on all replicas or otherwise the values will be different.
If the side effects with triggers, sps and functions are non deterministic, it will cause problems
Write Ahead Log shipping

Write ahead log is an append only sequence of bytes containing all writes to the database. WAL contains the details of which bytes were changed at which disk blocks.


Since it is exact bytes, there are no non deterministic problems and the follower will have the exact values.


Since it is at low level, it is tightly coupled to the database engine. If the version changes how things are stored or if we want to go to a different storage engine it will not work.

Logical Log Replication
In this replication, we use different log formats to replicate. It does not use the storage engine’s physical representation.

For an inserted row, it contains all the new columns

For a deleted row, it will contain enough information to uniquely identify the row that was deleted. It would be the primary key but if there is no primary key, then values of all the columns will be logged.

For an updated row, it will contain the identifier and the values of all the changed columns.


A transaction that modified multiple rows will create multiple logs and a final record saying the transaction was committed.


This format is easy to parse by other applications also so this is used to send data to other applications like datawarehousing, for building custom indices and caches etc. This is called change data capture

Trigger based replication
Many databases have triggers where specific functions or stored procedures can be called when data changes. These can be used for replication also. It has more overhead than the database’s default replication.

Problems with Replication Lag
When you write to a leader and read from a follower whose data was replicated asynchronously, you might read older data because the latest write from the leader was not replicated yet. The data will be consistent eventually when the writes stop, but the eventual could be anytime between few seconds to a few minutes. This is called replication log. These are the problems with replication lag

Read your own writes:

One client wrote something to the database. Since it was a write request it went through the leader. Then it tried to read the record. Since it was a read request it was sent to the follower which didn’t have the latest write. So it was not able to read the data that it wrote.

To overcome this, we need to read-your-writes consistency or read-after-write consistency. We can achieve this by either of these

When reading something user may have modified, read from the leader else the follower. For e.g. a user can edit data on his profile, but not on other people’s profiles. So load user’s profile from the leader but other people’s profile from the followers.
But if it is a write heavy application, then we have to read a lot from the leader and it will negate the advantages of read scaling where you read from the followers. So in this case whenever a client writes to the db, for the next minute or so, read from the leader, then from the follower. You can also monitor the replication lag and prevent any queries to the followers which are lagging more than one minute
The client can provide the timestamp of the most recent write and if it queries a follower which is not up to date on the writes, it can either query another follower or wait till it gets the updates till that timestamp to return the data.
If the replicas are geographically distributed, any request that needs to be served by the leaders should be routed to the data center that contains the leader.
There are more issues to consider if we want to provide cross-device read after write consistency.

If the user updates some data on the desktop and wants to see the changes on the mobile, it is difficult to implement the timestamp based query as there are multiple devices. That metadata needs to be store in a central place.

Montonic Reads :
If you read once from one replica and later from another replica which is lagging more than the first replica, it will appear as if some updates are lost or as if we are moving back in time. For e.g. if user is seeing comments on a blog post and he sees three comments the first time and then sees only two comments, it will cause confusion. Monotonic reads is a guarantee that this will not happen

One way to do that is to make sure one users reads always go to the same replica. The replica can be chosen based on the hash of the users id. But if that replica is down, then the request need to be routed to a different replica.

Consistent Prefix Reads :
Sometimes when replicating something, the causal order between the events will be lost. For example when there are two events 1. A asks a question and 2. B answers the question. If a replicas sees 2 before 1, it will be very confusing as the client will see the answer before the question. To avoid this we can use the versions & version vectors to order the events. Each read request will send the version of the event it has and the client will send the version it is updating. Then it is possible to merge the responses preserving the causal order.

If eventual consistency doesn’t work for your system, then design for stronger guarantees
Transactions are good to overcome most of these problems but they are ignored as not being performant and cause availability issues.

Multiple Leader
In the single leader application, if the leader goes down the application is unavailable for some time. To overcome this we have failover. Another option is to allow multiple nodes to accept writes. It doesn’t make sense in a single datacenter but it makes sense to use multiple leaders in different datacenters, basically to have one leader per datacenter.

performance improves as now there are two leaders for writes
If one datacenter fails, the writes can still work from the other datacenter
A temporary disconnection between two datacenters with async replication will not cause issues as each leader can still take requests
Big disadvantage is that there will be write conflicts for concurrent writes on the two leaders that need to be resolved.
Use cases:

Clients with offline capabilities can be thought of as multi leader databases. Where they sync when network is available and keep the changes isolated when not connected. CouchDB is made for this mode of operation.

Collaborative editing like Google Docs also can be thought of a multi leader databases and it is a conflict resolution problem to resolve concurrent edits.

Conflict Resolution:

Avoid conflicts by locking down different parts or do the edits sequentially. but this is like the single leader case and not that useful here.
Last write wins. Give each write a monotonically increasing number and whichever has the highest number will win. But this will cause data loss
Merge: Sort the values and merge them. Show both values.
Let the client deal with it. Store both the values and let the user resolve it. CouchDB stores the conflict in a ds and the application will prompt the user to resolve it and the final copy is written to the db.
Algorithms for automatic conflict resolution

CRDTS : Conflict free replicated datatypes are a family of datastructures of sets, maps, ordered lists, counters that can be concurrently edited by multiple users, which automatically resolve conflicts in sensible ways.

Mergeable persistent data structures : These track history explicitly like git vcs and use a three way merge function (CRDTS use two way merge)

Operational Transformation is used to implement the collaborative editing design. It was designed for concurrent editing of an order list of items like the characters of a text.

Circular, Star and All-to-all topologies can be used for mutli leader replication.

Leaderless Replication
In this there is no concept of leader and each replica can accepts writes and reads.

Here a request is sent to multiple replicas and they will all apply the write and once a configured number of replicas successfully write it, the write is successful. All the replicas eventually get the write.

Similarly a read request is sent to multiple replicas and it gets the data from a configured number of replicas. The client can examine the responses and take the latest version.

Riak, Voldemort, Cassandra and Amazon Dynamo work on leaderless replication.

If one replica goes down, if there are still configured number of replicas available, the write will be successful. But when the failed replica comes back up it will have missed the write. If the client tries to read, it will send request to multiple replicas and all of them return a value to the client. The client will get different values from different replicas. The client can use the version numbers to determine which is the latest.

It can also identify which replicas are lagging and can send an update request to them (this is called read repair) .

Some databases have some background process which compares the data on replicas and automatically copies the missing data to the replicas. This process is called anti entropy and it will change the order of the writes etc.

Quorums for reading and writing

If we have n nodes, then we can consider the write to be successful if w nodes confirm the write. While reading, if we can get the value from r nodes, it will be considered successful. As long as w+r > n we can be sure that in those r nodes atleast one has the latest value. So w and r can be considered the minimum number of votes for the writes and reads to be valid.

Problems With Quorum:

Sloppy Quorum & Hinted Handoff :

If there are just n nodes, then in any failure the system will become unavailable. So in a system like Dynamo, what happens is when there is a network partition, writes will be done to nodes outside the top n preferred nodes, and those nodes may be different from the first N nodes encountered while walking the consistent hashing ring. This is called sloppy quorum. But when it writes to the nodes different from the preferred nodes, it will add some metadata to those nodes specifying which original nodes are the writes for. When those nodes come back up, the extra nodes will send the data to the preferred nodes. This is called hinted handoff.

So before the hinted handoff, some consistency is lost.

When two writes happens conurrently it is not possible to know which happened first

Also when restoring data on a failed node from an older backup, the new writes are lost.

Not Linearlizable:

It doesn’t preserve order as if there was only one database.

Sometimes there could be edge cases depending on the timing. Say there is a write with a quorum of 3 (w=3) and read with quorum of (r=2)

Say writer one wrote to Replica 1, 2, 3 replacing a value of variable from 0 to 1.

The write to replica 3 was finished but the write to 1 and 2 is not yet done.

A reader 1 comes to read at this time. The reader sees value 1 from replica 3 and 0 from replica 2.

Next a reader 2 reads from replica 1 and 2. It only sees value 0. So this transaction is not linearlizable (as if the transaction happened linearly)

This can be solved by the reader 1 doing read repair before sending the results to the application.

Leaderless replication is also suitable for multi datacenter replication as it tolerates conflicting writes, network interruptions and latency spikes

We can use versions and version vectors to deal with concurrent updates and conflicts

Source: https://www.amazon.com/Designing-Data-Intensive-Applications-Reliable-Maintainable/dp/1449373321

Tuesday, August 21, 2018

Design of data intensive applications -1

Design of data intensive applications -1

An application has to meet various requirements to be useful. They are functional and non-functional requirements. 

Some of the non functional requirements are Scalability, Maintainability & Reliability

Reliability : the system continues to work correctly in the face of adversity like software errors, hardware failures and human errors

Scalability: As the system grows there should be ways of dealing with that. This is because the system degrades under increasing load.

Describing load: Load can be described with load parameters which depend on the architecture of the system. It may be number of requests per second or the ratio of reads to write to a database, rate of cache misses. In Twitter one of the useful load parameters is the fan out of a tweet.

To describe performance we can ask two questions - when load on a system is increased how. the performance affected and — what resources are needed to keep the performance same with increased load parameters. To answer these we need performance numbers. We can take the average numbers but it is not representative of the pain. Percentiles are better as we know how much percent of requests are served in how much time. Also it is important to measure the response times on the client side

The system which runs on single server is easy work with but the capacity of a single machine is limited and hence we cannot avoid scaling out ( using multiple machines). This is easy if the application is stateless but replicating state across multiple machines is complex. So if possible keep database on a single node and scale out only is necessary.

There is no magical scaling sauce. It needs to be rethought at every level of magnitude increase. It also depends on what requests are frequent and what requests are rare. I.e. the load parameters.

Maintainability: over time maintaining current system and adapting it to new use cases should be easy.

Operability — Operators should be able to quickly view the state of the system and correct any issues easily. So they need metrics, automated deployment etc

Simplicity - simple things are easy to reason about. So strive to keep things as simple as possible. Good abstractions help with that.

Evolvability - the system should be able to evolve as needed. Again good processes like agile help in making change easy. Good abstractions also help by being easy to reason about.

Source: Design of data intensive applications - Martin Kleppman (book)

Monday, August 13, 2018

Statistics & Probability - Part 4 - Modelling data distributions

Modelling Data Distributions

Calculating Percentiles:

For any distribution we can calculate percentiles to express the percentage of data below a point.

For example in the below dot plot lets calculate the rank of the point whose value is 6.

Total number of points : 14
Number of points below  6:

Data that is below 6 = 50 percent
Data that is below or equal to 6  = 8/14 = 57

Cumulative Relative Frequency Graphs

If we have a relative frequency graph with the percentages on the y axis and data on the x axis
we can use that to estimate the percentiles at any point, calculate the median and IQR of the data.

Z- scores

Z -score of a data point is nothing but the number of standard deviations it is away from the mean. If it is less than the mean, the z -score is negative and if it is greater than the mean the z-score is positive.

We can use z-scores to compare distributions with different means also. This will make it easier to compare two distributions if we think in terms of z-score.

Effect of Linear Transformation on data 

When a constant value is added to the data (data is shifted), both the mean and median increase by that constant value.
The statistics like standard deviation and IQR dont change.

When a constant value is multiplied  to the data (data is scaled), all the values mean, median, standard deviation and IQR are all scaled by the same factor.

Density Curves

Density curve is an idealized representation of a distribution where the area under the curve is 1.
We can estimate the mean median and skew from the density curves.
For a symmetric density curve, the mean and median are exactly at the center.
For a non-symmetric density curve, the median is still the center, but the mean is the balancing point on a fulcrum.

For a left skewed distribution, the mean is to the left of the median and the opposite for a right skewed distribution.

We can calculate the area under parts of density curve by approximating the areas to be rectangles and triangles.

Normal Distribution

A function that represents the distribution of many random variables as a symmetrical bell shaped graph. 
In a normal distribution 68% of the values are distributed between 1 std deviation from the mean, 95% of the data between 2 standard deviations from the mean and 99.7 of the data between 3 standard deviations from mean. This is called the empirical rule for normal distributions and can be used to calculate the proportions above and below a certain value. First we can find the z-score and a table to lookup the percentile of data below that z-score for a normal distribution and we can use that to find the values lower than, higher than or between two values. 

We can also do a reverse lookup to find out the z-score from the percentile tables to estimate the data point above or below a percentile. 

Monday, August 6, 2018

Statistics & Probability - Unit 3 - Summarizing Quantitative Data

Quantitative data can be summarized by trying to find the central tendency of the data.
There are various methods to find the central tendency. Common ones are

Mean : The arithmetic mean of the data. It is sensitive to outliers. So one large or small value can skew the mean.

Median: The central value in the sorted dataset from least to greatest. If there are even number of data, then it is the arithmetic mean of the central two values. Median is not affected by outliers that much.

Mode: Mode is the most repeating item in the data set. If none is repeating then there is no mode.

If some data is removed, it changes the mean more than the median.
If some data is changed, it will not change the median but will change the mean.

Next we look for some measures to identify the spread of the population. How far are each of the data points etc.

Range: Range is the difference between the largest and the smallest value.

InterQuartile Range:
IQR is the difference between the 3rd quartile and the 1st quartile. The median divides the data into exactly half. So half of the population is below the median and half equal or above the median.
First quartile is the the median of the first half of the data.
Third quartile is the median of the second half of the data.
So the data with the different quartiles can be show as 5 number summary

Min   Q1    Median Q3  Max

These numbers divide the data into 4 quardrants. The data between min and Q1 is first quadrant, Q1 and median is 2nd quadrant, Median and Q3 is third quadrant and Q3 and max is the 4th quadrant.

IQR is the difference between Q3 and Q1.

1.5 IQR Rule to find outliers:
This rule is used to identify the outliers in the data. The data less than 1.5IQR from Q1 and more than 1.5IQR from Q3 are considered outliers.

Outliers < Q1 - 1.5IQR
Outlers > Q3 + 1.5 IQR

Variance of a population:

Variance is defined as the mean of the square of the distances of each point from the mean.

If there are 4 values v1, v2 ,v3, v4, then the mean u (greek mu) is

v1 + v2 + v3 + v4 / (number of values = 4)

The variance of the population is

(v1 - u) ^2 + (v2-u)^2 + (v3-u) ^2 + (v4-u) ^2 / 4

If we don't have a way to find all the data of the population, we work with a small representative sample from the population.

Then this formula underestimates the variance and intuitively it makes sense as we will be considering only few points and the real mean is always outside of our sample. So this will underestimate the variance and hence this estimate is called biased estimate of variance

The correct estimate of sample variance is

Sum (mean square differences) / (n-1 = 1 less than the sample size)

Standard Deviation
Standard Deviation (Greek sigma)  is the square root of the variance (Greek sigma square).

Box and whisker plots give the 5 number summary of the population.

Other ways of measuring spread is a Mean absolute deviation.
It is the mean of the absolute distances from mean instead of square of the differences.
The MAD for the above distribution with v1, v2, v3, v4 as values and u as the mean is

i.e |v1 - u| + |v2 - u| + |v3-u| + |v4-u| / 4