Tuesday, September 25, 2018

Some things I want to remember

Build routines

We have great ideas and goals. When we make a goal, distractions arise. So we cannot trust our will power. It is better to develop routines and habits which make us easy to do what we decided to do. 

Even great writers have routines where their muses visit them exactly at the same time of the day. The more scheduled time you have, the more peaceful and guilt free your unstructured time is.  

Compartmentalize our days and weeks so that we always have a chance to reflect and recommit to what we want to do. 

Do what you do with full focus for the day. After that have fun and sleep 

Fully Commit & Back Yourself to Win

Really commit to what you're doing. That moves mountains and when you burn your boats, you have no chance but to fight. 

Once you decide the whole world conspires to make it happen, provided you're ready to put in the work it requires. 

If you have less balls in your hand, you are more likely to care about making them work. 

 Have environment which enhances your positive traits 

Everyone has many thoughts - some positive, some negative. 
There are some people around with whom you always feel negative. And there are others who spread an air of positivity around them.... 

There are some articles which when you read makes you feel alive, positive and work to overcome your problems where are there are some which only cater to your base thoughts and make you addicted but really not helping you any way.. 

There are some places which by being noisy and distracting always make you lose your concentration.. 

Lot of internet products and websites are built to be addictive. There might be some articles which are good for you but most of them are addictive. And the worst part is they don't give the control to you choose what you want. It is better leave them than trying to look out for the positive. 

 It is a time of great opportunity 

It is a great time of development in technology and I'm really excited about the advances in software and AI to automate the boring parts of our work day. 

It is an exciting time to work towards shaping the future in the vision we have. It is a chance to work on problems you care about solving and make the lives of not only our people but all people better. 

 You cannot do it alone 

But this is not something you can do alone. By being generous with our ideas, being kind with our deeds, clear with our thoughts, we can work with like minded people to work together - enriching each others lives and achieving big goals. 

  Be patient & kind  

Hopes are high but that doesn't mean the results are also as fast. So it takes time and requires solving hard challenges. Sometimes you might doubt your own intentions.

 Be patient. Let the compounding magic work. Anything worthwhile takes a long time but with the magic of doing things consistently, you will reach it sooner rather than later. So just stay put. 

But during this time, be kind with people and more than anyone, be kind with yourself. 

Be Grateful & and let them know 

In this journey, you will come across many people. You will have been better for having known some people. Lot of people would have helped you with their gifts. You're where you are because of their generosity with their time, work and help. So take the time to count your blessings and let them know whenever you get a chance. Actually take the chance.  I take this chance to thank all of them who have helped me be where I am. 

Tuesday, September 18, 2018

Design of Data Intensive Systems - Part 5 Consistency & Consensus

Consistency Guarantees


make the system appear as if there is only one copy of data and all operations on it are atomic. It means all clients must be able to read the latest data once it is written. It is a recency guarantee

Linearizability is useful in

Locking and leader election
constraints and uniqueness guarantees (single up-to-date values which all nodes agree on)
When there are cross channel timing dependencies.
Implementing linearlizable systems

Single leader replication can be made linearlizable. But there could be stale replicas and split brain scenarios with the leaders and hence not completely linearlizable.

Multiple leader replication cannot be made linearizable as it needs conflict resolution which means there is no single copy of data

Leaderless replication can be made serializable if the reader performs read repair before returning and writer must read the status of a quorum of nodes before sending the writes. But in practice most of the leaderless system do not provide linearizability.

If a application requires linearizability it will not be available to some replicas during a network partition. It was work with the old replicas but then it is not linearlizable. And linearlizable systems are not that performant as the read and write timings are atleast proportional to the network delays.

Consensus alogorithms are linearizable. They have a way to avoid the split brain and stale replicas.

Linearizable systems are slow always not just during network faults. So latency sensitive systems might make some tradeoffs

Ordering Guarantees

Ordering is important to provide causality. In a single leader replication, all the writes go through the leader to determine the order of writes. In a serializable transaction, the writes are processed in a serial order. Timestamps and clocks are also an attempt to order the writes.

If a happens before b, then a could have caused b. So the causal order is a -> b. If a system obeys the causal order, it is said to be causally consistent. If you read something from database, if you see a piece of data, you should be able to see any data that precedes it.

Linearizable system maintains total order as it behaves as if there is only one piece of data

Two operations are consistent if they happen at the same time. If two events are ordered if they are causally related, but they are incomparable if they are concurrent. So causality defines a partial order not a total order

Which means there are no concurrent operations in a linearlizable datastore.

Generally, the causal ordering is sufficient for most applications. So to provide causal ordering, we can use monotonically increasing sequence numbers to order the events. When it is single leader it is easy, but when there are multiple nodes processing, there are ways to do the sequence, but that sequence doesn’t provide causality.

One way to generate sequence numbers that provide causual ordering is Lamport Timestamps. Here each node will generate sequence numbers and in addition to that it will also have the node id It will be a pair of (counter, node id). It can provide causal order in the following way. Each client will always send the maximum counter it saw among all nodes in each request. So say a node 1 has a counter of 5 and node 2 has max counter of 1. A client got a request from node 1 (with counter 5) and it sent a request to node 2 ( with counter 1), node 2 will see that and increment its counter to 6. That way it provides total ordering of all the operations .

But these timestamps cannot solve the problem of unique constraints as we will know the total order after the fact. When a system is trying to generate a unique username, then it needs to know if it is unique or not and if other systems are already processing that username. In additional to knowing total order, we also need to know when it is finalized. This idea of saying when a total order is finalized is called total order broadcast

Total order broadcast can be used to solve the uniqueness problem.

Write a username request into the log using a compare and set operation.
Read the log
If your request was the first request when you read, then you can commit the transaction. If some other’s node request was the first request in the log, then you abort the transaction as the username is already taken.
The implementation of the atomic compare and set operation requires consensus if multiple nodes request the same username.

If we have a linearlizable increment and get register, then it is possible to have total order broadcast. Each message that needs to be sent via total order broadcast has to get a sequence number from the linearizable register and attach it to the message and send it to all nodes. If a node delivers a message with sequence 4 and it gets a message with sequence 6, it knows that it has to wait for sequence 5 before it can deliver 6. For lamport timestamps, the counter may have gaps for e.g. if a node sees 6, for the next message it will send 7, but it will not wait for 5, but with total order broadcast, it has to wait for 5.

Implementing linearizable compare and set or total order broadcast both require consensus.

Consensus is required for

leader election to avoid multiple leaders and split brain. Consensus is required to select and agree on one leader.
Atomic commit — In a db supporting transactions on multiple nodes, if it fails on some nodes, then all the nodes should agree to commit or rollback on all nodes to maintain the atomicity of the transaction.

2 Phase Commit

For a transaction over multiple nodes, it is not possible to have an atomic transaction if each node can do its own commit/rollback process. It needs to go through a process of 2 phase commit. In this case, there is a coordinator to coordinate the transaction.

The application can send the changes across multiple nodes. Once the transaction is ready to commit, the coordinator starts phase 1 called prepare, where each node will prepare to commit by writing it in the log and when both the nodes are ok to commit, the coordinator will start phase 2, write the transaction in its logs and send the ok to both nodes. The both nodes then commit the transaction. If any of the nodes didn’t say an ok to the transaction in phase 1, then the coordinator aborts the transaction.

If any of the nodes or network fails during transaction, the coordinator aborts.

If the coordinator fails before confirming the phase 2, both the nodes will be waiting for coordinator to send the ok. Otherwise they cannot abort or commit because doing anything unilaterally will violate the atomicity. So it will block till the coordinator recovers and issues the commit or rollback.

There is something called 3PC (three phase commit) which assumes a system with bounded network delays and bounded response times. Basically if the crash of a node can be detected correctly, then 3PC can provide non-blocking transactions. But that cannot be assumed in general.

In practice 2 phase commit is implemented by distributed transactions XA (which is a coordinator api with which multiple applications interface). As discussed above during coordinator failure, there is a possibility for the locks to be held indefinitely. And either admin users have to resolve the locks or applications can use heuristics to unilaterally abort or commit.

XA can work with different applications, database and a message queue etc. If a transaction is within a homogeneous distributed system like a database, it can work well. but with heterogeneous distributed systems, are more challenging as they have to work with different technologies and cannot be optimized.

Disadvantages of 2PC

If a coordinator is not replicated and runs on single machine it become single point of failure
The application becomes stateful because it has to maintain the state in the coordinator log
Since it needs to work with disparate systems, it has only basic functions. I cannot detect deadlocks and doesn’t work with SSI (Serializable snapshot isolation)
For distributed systems, 2PC will amplify failures instead of helping to build fault tolerant systems.


In consensus, one or mode nodes propose values and the algorithm decides a value. More formally the following properties must be satisfied

No two nodes decide differently
No node decides twice for same decision
If a node decides value v, then value v was proposed by some node.
Every node that doesn’t crash eventually decides a value.
Best known consensus algorithms are

View stamped replication VSR
These algorithms decide a sequence of values which makes them total order broadcast algorithms. In each round the nodes propose what value they want to send next and decide the next message in total order.

These algorithms expect to work when a quorum of nodes are available.

To select a leader, we need consensus. When a leader is down, they start an election process which has an epoch number. When say an old leader from a previous epoch comes and tries to write something, the nodes which elected leader in a new epoch will not agree to it and hence the write fails. Basically it is a two step process. First a leader will be elected by consensus and when the leader wants to write something, again they need to agree and one of the node that participated in the leader election has to agree to this write. If any of the node knows of a different leader, it will not agree and hence quorum cannot happen and the write fails.

Limitations of Consensus:

The election process is like synchronous replication
It requires strict majority to operate
Since they rely on timeouts to detect failed nodes, transient network errors can cause frequent leader elections and terrible performance
Raft is particularly sensitive to network problems. If there is a weak network link, the leadership continually bounces and the system doesn’t work.

Membership and Coordination services:

Zookeeper and etcd are designed to hold small amount of data in memory. This data is replicated across multiple nodes through a fault tolerant total order broadcast algorithm

It has the following features

Linearlizable atomic operations
Using an atomic compare and set operation, a lock can be implemented. If multiple nodes try to set the value, only one will succeed. Distributed locks are implemented as leases which expires and eventually gets released if the client fails.

Total ordering of operations
When a resource is protected by a lock or lease, it needs a fencing token to prevent clients from conflicting with each other in case of a process pause. Zookeeperr implements this by totally ordering all operations and giving each operation a monotonically increasing transaction id called zxid and a version cversion

Failure detection
Clients maintain long lived session on Zookeeper servers. If there is a temporary failure or a zookeeper node goes down, the session is active. But if the heart beat fails for a more than a configured timeout, Zookeeper will expire the session and can be configured to release all the locks and leases held by that client. [These are called ephemeral nodes in zookeeper ]

Change Notifications
Clients can read the locks and values set by other clients and also watch for changes to them. Thus a client can find out when a new node joined the cluster and also when a node leaves the cluster (loss of ephemeral nodes). So the client can get these without polling by just subscribing to the changes.


Allocation work to nodes - To identify a leader in a database scenario and also in a job scheduler scenario.
If there are lot of nodes, the Zookeeper doesn’t need to do leader election on all nodes, it can just work with 3 or 5 nodes, but can serve all the clients.

Zookeeper is useful to save only data which doesn’t change often. If you need to replicate application state, use another application called Apache BookKeeper.

2. Service discovery - In a cloud datacenter env, when a new service comes up, it can register with zookeeper so that its endpoints can be found by other services. It doesn’t need consensus but it can be useful for other services to find the leader since ZK already knows it.

3. Membership Services - A membership service determines which nodes are alive and are part of a cluster. A failure detection mechanism with a consensus mechanism can help determine which nodes are part of a cluster and which are not.

Source : https://books.google.com/books/about/Designing_Data_Intensive_Applications.html?id=zFheDgAAQBAJ

Monday, September 10, 2018

Design of Data Intensive applications - 4 Problems of Distributed Systems

Faults & Partial Failures
On a single machine if there is any internal fault, we expect the system to crash instead of giving wrong values.

But in a distributed system, there are always systems that are broken while others are working as expected. This is called a partial failure. Partial failures are non-deterministic. Also we don’t even know if something has succeeded or failed because the time taken for message travelling across a network is non-deterministic. So we don’t know which systems have failed in between also we don’t know if a system has failed or not. That is why working with distributed systems is difficult .

Unreliable Networks
Distributed systems are shared-nothing systems. They communicate only via asynchronous message passing through the network. But the network is unreliable.

If a request is sent many things could go wrong

Request may have been lost (unplugged network cable)
Request may have been queued
Remote node has crashed and is unavailable.
Remote node has paused temporarily (for garbage collection)
Remote node has processed the request, but response got lost in the network.
Response is delayed because our network is overloaded.
In all these scenarios, it is difficult to tell why we didn’t get a response. We usually handle these with a timeout.

When part of a network is cut off from another part due to a network fault, it is called a network partition

Timeout is only way to detect a fault, but there are trade-offs

If timeout is too long, then during this time the system is unavailable and users might see error messages

If timeout is too short, and the node is actually alive and busy with some action, and another node takes over, the action can happen twice. When a node is declared dead, then its responsibilities are transferred to other nodes. If the system is busy, then removing a machine might exaggerate the problem. It could cause all nodes to declare dead and cause cascading system failure.

Most of the network delay is due to queuing. Since all the requests are sharing the same pipe, there will be some queuing when the traffic is high. Different places where queues are possible

If several nodes send data to the same destination, the network switch must queue them up and feed them into the destination network link one by one. On a busy network link, the data has to wait in the queue till it can get a slot. If the switch queue fills up, some packets may be dropped.
When data reaches the destination machine, if all CPUs are currently busy, then the OS queues up the request until application is ready to handle it. Here also the wait time is not bounded.
In virtualized environments, the running OS is paused for 10s of microseconds when the virtual machine wants to use the CPU code. In this case VM cannot consume any data from the network, so incoming data is queued by the VM monitor increasing the variability of network delays
TCP performs backpressure in which a node limits its own rate of sending to avoid overloading network links or destination node.
If some packets are lost TCP will retransmit them which adds to the delay
In cloud environments, the machines are shared. So based on the workload of neighbors, the network delays are variable.
So the best way to set timeouts is based on experience. Also a system could monitor the delays and change the timeouts accordingly.

Unreliable Clocks
In distributed systems, time handling is tricky because of variable network delays in communication.

Each machine has its own hardware clock, which has minor calibration differences compared to others and can be slightly faster or slower compared to other machines. The clocks can be synchronized using NTP (Network Time Protocol) which allows the time to be adjusted according to the time reported by a group of servers. These servers are in turn get their time from a more accurate source like GPS recievers.

Computers have two types of clocks

Time of day clock — Returns the current wall clock time. These clocks are adjusted using NTP. So they should ideally be same on all machines, but if a machine’s local clock is too fast, the time might be reset, so the time can appear to be going backwards.So these cannot be used to calculate a duration.

System.currentTimeMillis() or clock_getTime(CLOCK_REALTIME) (Linux) return wall clock time.

Monotonic Clock — These clocks have time that always increases or grows. So these can be used to calculate the duration. System.nanoTime in Java or clock_getTime(CLOCK_MONOTONIC) in Linux does this.

Multiple CPUs on a machine means it could be using multiple timers, but the system presents a uniform monotonic time.
When NTP needs to reset the time, it will fasten or slow the timer based on how much faster it is going, but it will never make the time go backward.
NTP synchronization issues

The local computer quartz might drift
If local time differs a lot from the NTP, then the system might refuse to sync or it might be forcibly reset which causes the time to go backwards
If a node is firewalled from internet, then it is difficult to detect the clock is out of sync
If there is large network delay, then the synchronization might not work correctly
NTP services might be reporting wrong time
Leap seconds (59 or 61 seconds in a minute)can cause problems with systems which are not designed with leap seconds in mind.
In virtual machines, the clock is virtualized. If a VM is paused for some time, then the application sees the clock jumps forward by those many seconds or minutes.
If the device is not controlled by the application, then it is difficult to assume the time accurate. User can change the time on the device.
So if your application depends on clock synchronization, you have monitor and take action if a clock drifts too far from the others.

If you use timestamp to determine the ordering, you might lose some database writes due to clock skew. Use version vectors for ordering

Clocks can have a confidence interval based on the precise clocks and transactions can wait for some time to be confident that transactions are linearizable. Google Spanners True Time API uses confidence intervals. In order to ensure that transaction timestamps reflect causality, Spanner deliberately waits for the length of the confidence interval before committing a read-write transaction. By doing so, it ensures that any transaction that may read the data is at a sufficiently later time, so their confidence intervals do not overlap. In order to keep thewait time as short as possible, Spanner needs to keep the clock uncertainty as small as possible; for this purpose, Google deploys a GPS receiver or atomic clock in each datacenter, allowing clocks to be synchronized to within about 7 ms

Process Pauses

Even if everything is synchronized properly, it is difficult to rely on time because of arbitrary pauses

It could be stop the world GC
It could be unexpected IO for e.g. a JVM will lazily load a classfile when first used
A virtual machine may be suspended and resumed
User may close the laptop and open it
CPU switches to different thread
swapping to disk because of paging
Unix process SIGSTOPed and resumed later with SIGCONT
In all these cases the paused code will start running when execution continues without knowing that time has elapsed.

Ideas to overcome process pauses

Treat a pause like a outage. If it is possible to know before that a process is going to need GC, the app can stop sending requests to the node and other nodes can take the work.
Rolling restart frequently so that long term memory is never GCed which will not cause the stop the world pause.
Knowledge, Truth & Lies
Truth is determined by a majority

A node cannot trust itself completely as there could have been a GC pause and all the threads were oblivious to it. There could be a network breakage where a node is able to recieve messages and send but other nodes are not able to recieve it.

So the decision is made by majority ( half or more nodes )

So in case of leader locks, when a lock is held by a node and a gc pause occurs and when the node comes back up after the pause, its lock lease might have expired. In that case other node might have become the leader and it might have lock. So when the old node tries to write the db should check if the token is still valid. It can be implemented with fencing tokens (monotonically increasing token numbers )

If the nodes arbitrarily lie by saying it didn’t recieve a response when it did, then it is called a Byzantine fault and the consensus problem is called byzantine Generals problem.

Dealing with weak forms of lying

If there are corrupted packets, then protocols like TCP and UDP use checksums to determine and fix or retry them.

Sanitize inputs from users

Configure a client with multiple NTP server addresses so that one of them being bad can be tolerated.

System Models

To assume what assumptions an apply for each type of systems, we can consider the following systems

Synchronous: This model assumes bounded network delay, bounded process pauses and bounded clock error. This is not a realistic model for distributed systems because of unbounded network delay.

Partially-Synchronous: In this model, the system is synchronous most of the time, but sometimes there may be unbounded delays, or pauses or clock error. This is a good model for distributed systems as they work well most of the time except sometimes.

Asynchronous: Here there are no time assumptions. Some algorithms can work with asynchronous design but it is very restrictive.

Node failure models

Crash stop — In this model a node can go down by crashing and will never come back.

Crash Recovery — In this model a node can go down temporarily but will recover after some time and be available. The nodes have stable storage that is preserved across failures whereas the in-memory state is lost.

Arbitrary faults- Nodes may do absolutely anything including tricking other nodes.

For modelling distributed systems, partially synchronous system with crash-recovery is the best model.

Correctness Models

Properties of a distributed system that will indicate its correctness for a fencing tokens algorithm.

Uniqueness — No two tokens are same

Monotonic Sequence — If a request x was returned at tx and y was returned at ty and x completed before y, then tx
Availability — The node that requested a fencing token didn’t crash and eventually receives a response.

Safety and liveness

In the above example uniqueness and monotonicity are safety property, but availability is a liveness property. Safety is defined as nothing bad happens whereas liveness is defined as eventually something good happens. Safety properties should always hold, i.e never be violated in a system. But for liveness there could be some leeway, saying that the system will be available only if majority of the nodes are not crashed.

Theoretically, even if a system satisfies all the correctness properties, it is not guaranteed to run correctly in real life as the model is just a simplification and not necessarily holds always. For e.g. in a partially synchronous system, we assumed that stable storage survives restarts. What if it that storage becomes corrupt. In that case we still need to handle it. But the theoretical model becomes a good approximation to reason about.

Source : https://books.google.com/books/about/Designing_Data_Intensive_Applications.html?id=zFheDgAAQBAJ

Tuesday, September 4, 2018

Design of Data Intensive Application - Part 3 Partitioning


All data doesn’t fit in one machine. So different partitions are made on different machines and each piece of data will live in one partition. This is useful for scaling applications.

We have to see

How to distribute the data among partitions
How does indexes work with distributed partitions
How to rebalance the partition if some nodes go down or if one partition size increases more than limit.
Partitioning is combined with replication so that copies of each partition can be stored at multiple nodes. Even though one piece of data belongs to one node, it will be stored at multiple places for fault tolerance.

Skewed partitions and hotspots

Ideally, we would want the data to be distributed to partitions equally so that the load is evenly distributed. But if we don’t partition it correctly and all the data is on a single partition then it is called a skewed partition and it will cause a hotspot.

Random partitioning

To make sure data is partitioned equally, we can assign data to partitions randomly. But the drawback is that then when we need to search the data, we have to search in all the partitions which is wasteful.

Key Range Partitioning

Another way of partitioning is to assign a continuous range of keys to a partition like a encyclopedia. The key ranges don’t need to be evenly spaced, but can be changed based on the data. The partition boundaries need to adapt based on the data to avoid hotspots. The administrator can change the boundaries or the software can automatically change them.

If the keys are sorted, then range scans become very easy. For e.g. in the data is some sensor data, then if timestamp is the key for the data, it will be very easy to scan for temperatures in a week. But one disadvantage of that is that since all the data is together, when writing the data all the data will go into the same partition while all the other partitions sit idle.

Hash Partitioning

To overcome the drawbacks with the key range partitioning, the key can be passed through a hash function. Each partition can contain a range of hash values. The hash values are different even for closely related keys, so this will distribute the data evenly among different partitions.

The partition boundaries can be evenly spaced or chosen pseudo randomly (called consistent hashing — it uses randomly chosen partition boundaries to avoid the need for central control or distributed consensus. )

The disadvantage of hash partitioning is that we need to scan all the partitions for range queries.

To overcome this Cassandra supports a compound primary key, which contains two keys. In this the first key is used for hash partitioning and the second key can be used for range scans. This is useful because if a key is setup as (user_id, update_timestamp), then it is easy to get all the latest updates for a user.

Relieving hotspots:

In a social media application, any power user having lot of followers can cause hot keys. i.e if they do anything, it has to be written to all the followers. Since the key is the user’s id, it will cause lot of activity for the key and can cause a hotspot. To overcome this their key can be split further by adding a random two digits to the key for some of the records. This way the load can be split into different partitions. Then the application should also remember which keys were split as most of the data will be regular without splitting.

Secondary indexes:

We have seen how the primary key is partitioned, but most of the databases support secondary indexes. Secondary index allows us to query the data by a different key or term.

There are two ways to partition the secondary index.

each partition can have the secondary index based on the documents that are part of the partition called local index. The advantages are that writes are faster as the data will be updating only one partition but the reads are difficult because for a given term, all the data will not be in one partition but the query will have to be done in parallel on all partitions (scatter) and then the results assembled (gather)
Another option is for the seconday index to be seperate from the main partitions and it could be partitioned in such a way that all the documents for a single term stay at same place (global index). The advantage of this is that search, range scan will be easy as all the data for an index is at the same place, but writing will be slow as one document has to touch multiple replicas
Re balancing Indexes:

Rebalancing is needed when we add new machines or we remove old failed nodes.

when rebalancing is complete,

load should be shared fairly between nodes
while rebalancing is happening, db should continue accepting reads and writes
no more data than necessary should be moved between nodes.
Rebalancing Stratgies

Hash mod n is a bad strategy because if the n changes, most of the keys have to be moved.
Fixed number of partitions: Create a large number of partitions more than the number of nodes and assign for e.g. 1000 partitions for 10 nodes and assign 100 for each node. If a new node is added, then few of the partitions for each node can be assigned to the new node and removal can proceed in the opposite way. Also the partitions need not be assigned equally to all nodes, bigger machines can take higher load compared to the others
Dynamic Partitioning: For dbs with key range partitioning with fixed number of partitions, if we choose wrong boundaries then all the data will be in one partition. For these dbs it is possible to have dynamic partitions where initially there will one partition and as the data size increases and once it goes more than a threshold, the partitions are split and it continues on. The disadvantage is that initially all the requests will be served by one node only. So to overcome this, the db can start with a fixed number of partitions and then increase as required. It can also decrease if large number of keys are deleted. Here the number of partitions adapt to the total data volume.
Partitioning Proportionally to nodes: In fixed number of partitions, size of each partition is proportional to the size of the dataset whereas in dynamic partitions the number of partitions is proportional to the size of the dataset. A third option used by Cassandra is to have number of partitions proportional to the number of nodes. initially it will have fixed number of partitions per node and when a new node joins the cluster, new partitions are added, so the size of the partition decreases.
Rebalancing can be fully automatic or manual, but since it is an involved process and if something goes wrong it is better to have an operator in the loop to review the processing.

Request Routing:

Since the data is now split into different partitions how does the client know which node to connect to ? That is the service discovery problem and there are multiple tools for this

Approaches to solve this are

Client can connect to any node. If that node has the data it will handle the request. Otherwise it will query the right node, get the result and pass the data to the client.
Client can send the request to a routing tier. It will send the request to the right node and forwards accordingly.
Clients can be aware of the partitions. If they are aware they can send to the right node without the need of an intermediary.
When the data is rebalanced, the routing tier or the partionaware client has to know of the change. Generally this is a consensus problem and there are different solutions.

One solution is to use a coordination tool like Zookeeper to maintain the mapping of partitions to nodes. The routing tier can subscribe to the changes for these mappings.

Some dbs like Cassandra and Riak use a gossip protocol where cluster is aware of changes and clients can send requests to any node and the cluster will forward the request as required.

Source: https://www.amazon.com/Designing-Data-Intensive-Applications-Reliable-Maintainable/dp/1449373321/