Skip to the content.

Partitioning

If no partitioning, there are several issues:

The main reason for wanting to partition data is scalability. And partition is always along with replication to make sure the availability of data.

partitioning-with-replication

Partitioning of Key-Value Data

Partitioning by Key Range

Assign a continuous range of keys as one partition. E.g. userID from 0-10000 to one node.

Pros:

Cons:

Partitioning by Hash Key

Using hash function to have a relatively evenly distributed hash value, store a range of hash value on a particular node.

partition-by-hash-key

Important: Some programming language’s built-in hash function is not suitable for partitioning. Java’s Object.hashCode() could have different hash values in different processes.

Pros:

Cons:

Solutions for range query

Using multiple columns as a compound primary key, only the first part of that key is hashed to determine the partition, the rest of columns are sorted to provide a better range query performance. E.g. (userID, updateTimestamp), we use userID as the hash key to find the partition, and updateTimestamp is sorted. So that we could easily query the entries within an interval of a particular user.

The only problem is that all the data of a particualr user is stored within one partition which could cause the celebrity problem.

Solution for celebrity problem

A hot topic or a celebrity could cause the requests for that topic or celebrity to be directed to the same partition. A simple solution could be adding a random decimal number at the beginning or end of the key, e.g. two-digit decimal number could generate 100 unique keys, and allow those keys to be distributed across different partitions. Now reading needs to read from all 100 keys and combine it.

Partitioning of Secondary Indexes

Sometimes people would add secondary index for a better performance on filtering, e.g. query on the cars which has red color. But how the secondary indexes are partitioned ?

Document based partitioning or Local index

document-based-partitioning

Each partition maintains its own secondary indexes which cover only the data in that partition. On write, only add/update/delete the secondary indexes on the partition where data is located. On read, need to read from all partitions and combine the result.

Pros:

Cons:

Term based partitioning or Global index

term-based-partitioning

Instead of letting each partition has its own secondary indexes, we construct a global one and store it on one of the partition. Which partition should store the secondary indexes ? We could partition by the term or a hash of the term.

Pros:

Cons:

Rebalancing Partitions

DO NOT USE HASH MOD N(number of nodes or number of partitions)

Need to say three times that DO NOT USE HASH MOD N ! DO NOT USE HASH MOD N ! DO NOT USE HASH MOD N ! The reason is that when N changes, most of the keys will need to be moved from one node to another. One important rule is to move as less as possible. We use consistent hashing !

Fixed number of patitions

fix-partition-rebalancing

We have 10 nodes, and fix the total number of partitions to be 1000, so we have 100 partitions per node.

This approach is used by Elasticsearch, Couchbase and Voldemort.

Pros:

Cons:

Dynamic partitioning

Set a max and min ahead. When the data within one partition grows over the max, it is split into two partitions. One of the two halves can be transferred to another node in order to balance the load. When the data shrinks below the min, it can be merged into an adjacent partition. DBs like MongoDB and HBase usually set a pre-spliting to make sure an empty database can also have a pre-defined number of partitions. This is to mitigate the issue when empty database with multiple nodes, there is a period of time that all reqests are sent to that single partition until the partition split happens.

Have fixed number of partitions per node

E.g. 100 partitions per node.

Request Routing

Client knows the key to query, and send the request to backend service. How does the backend service know which partition and which node the data stays on ?

Using the hash function, we could calculate the hash value of a key. Once we have the hash value, we could know which partition holds the key (This could be easitly done by consistent hashing, which is next clockwise partition greater than current hash value position). The system needs to maintain a mapping between partition and node IP(Zookeeper), so we could easily know which node we need redirect the requests to.

Usually there are three approaches:

service-discovery

If using ZooKeeper, it will track the cluster metadata. When a partition changes ownership, a node is added/removed, ZooKeeper notifies the routing tier so that it can keep its routing info up to date.

zookeeper-service-discovery

For redis, above three ways are equivalent to the following:

Disadvantages of partitioning

How consistent hash work

To add the list of nodes to the ring hash, each one is hashed m.replicas times with slightly different names ( 0 node1, 1 node1, 2 node1, …). The hash values are added to the m.nodes slice and the mapping from hash value back to node is stored in m.hashMap. Finally the m.nodes slice is sorted so we can use a binary search during lookup.

func (m *Map) Add(nodes ...string) {
    for _, n := range nodes {
        for i := 0; i < m.replicas; i++ {
            hash := int(m.hash([]byte(strconv.Itoa(i) + " " + n)))
            m.nodes = append(m.nodes, hash)
            m.hashMap[hash] = n
        }
    }
    sort.Ints(m.nodes)
}

To see which node a given key is stored on, it’s hashed into an integer. The sorted nodes slice is searched to see find the smallest node hash value larger than the key hash (with a special case if we need to wrap around to the start of the circle). That node hash is then looked up in the map to determine the node it came from.

func (m *Map) Get(key string) string {
    hash := int(m.hash([]byte(key)))
    idx := sort.Search(len(m.keys),
        func(i int) bool { return m.keys[i] >= hash }
    )
    if idx == len(m.keys) {
        idx = 0
    }
    return m.hashMap[m.keys[idx]]
}

Downside of consistent hasing

First, the load distribution across the nodes can still be uneven. With 100 replicas (“vnodes”) per server, the standard deviation of load is about 10%. The 99% confidence interval for bucket sizes is 0.76 to 1.28 of the average load (i.e., total keys / number of servers). This sort of variability makes capacity planning tricky. Increasing the number of replicas to 1000 points per server reduces the standard deviation to ~3.2%, and a much smaller 99% confidence interval of 0.92 to 1.09. This comes with significant memory cost. For 1000 nodes, this is 4MB of data, with O(log n) searches (for n=1e6) all of which are processor cache misses even with nothing else competing for the cache.

Improve on consistent hashing

Jump Hash addresses the two disadvantages of ring hashes: it has no memory overhead and virtually perfect key distribution. (The standard deviation of buckets is 0.000000764%, giving a 99% confidence interval of 0.99999998 to1.00000002). Jump Hash is also fast. The loop executes O(ln n) times, faster by a constant amount than the O(log n) binary search for Ring Hash, and made faster even still by the fact that the computation is done entirely in a few registers and doesn’t pay the overhead of cache misses.

The basic idea is that instead of hashing the nodes multiple times and bloating the memory usage, the nodes are hashed only once but the key is hashed k times on lookup and the closest node over all queries is returned. The value of k is determined by the desired variance. For a peak-to-mean-ratio of 1.05 (meaning that the most heavily loaded node is at most 5% higher than the average), k is 21. With a tricky data structure you can get the total lookup cost from O(k log n) down to just O(k). My implementation uses the tricky data structure.

The idea is that you hash the node and the key together and use the node that provides the highest hash value. The downside is that it’s hard to avoid the O(n) lookup cost of iterating over all the nodes.

One of the primary goals was lookup speed and low memory usage as compared with ring hashing or rendezvous hashing. The algorithm effectively produces a lookup table that allows finding a node in constant time. The two downsides is that generating a new table on node failure is slow (the paper assumes backend failure is rare), and this also effectively limits the maximum number of backend nodes. Maglev hashing also aims for “minimal disruption” when nodes are added and removed, rather than optimal. For maglev’s use case as a software load balancer, this is sufficient. The table is effectively a random permutation of the nodes. A lookup hashes the key and checks the entry at that location. This is O(1) with a small constant (just the time to hash the key).

How to partition a tree or graph dataset

How to partition a graph dataset

How to partition billion nodes graph(pdf)

There are two ways mentioned in the paper above:

How industry handles data sharding

Redis cluster

https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding

https://redis.io/topics/cluster-spec#keys-distribution-model

https://developpaper.com/complete-steps-for-adding-and-deleting-redis-cluster/

References