前言
看分片算法的时候了解到了一致性哈希,梳理一下相关内容。
其主要的使用场景包括:分布式存储(缓存、DB、文件)、负载均衡、分布式计算等。
简单哈希函数的“扩容困境”
对于简单哈希函数m = hash(o) mod n
,其中,o
为对象名称,n
为机器的数量,m
为机器编号。当n
增大时,所有o
对应的m
都可能发生变化,这些变化意味着将对象o
的数据迁移到新的机器m
。
假设我们有以下对象和机器:
- 对象:
o1
,o2
,o3
,o4
- 初始机器数量:
n = 3
假设对每个对象应用哈希函数得到的哈希值如下:
hash(o1) = 10
hash(o2) = 15
hash(o3) = 20
hash(o4) = 25
可计算出初始数据分布如下:
- 机器 0:
o2
- 机器 1:
o1
,o4
- 机器 2:
o3
现在,我们将机器数量增加到 n = 5
,使用相同的哈希值,重新计算每个对象的新机器编号,得到的数据分布如下:
- 机器 0:
o1
,o2
,o3
,o4
- 机器 1:无
- 机器 2:无
- 机器 3:无
- 机器 4:无
可以发现,要迁移的数据将十分多。
一致性哈希(Consistent Hash)
一致性哈希(Consistent Hash)算法是1997年提出,是一种特殊的哈希算法,目的是解决分布式系统的数据分区问题:当分布式集群移除或者添加一个服务器时,必须尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系。
原理简述
一致性哈希算法本质上也是一种取模算法。只不过是对固定值2^32
取模,这就使得一致性算法具备良好的单调性,即不管集群中有多少个节点,只要key值固定,那所请求的服务器节点也同样是固定的。
其算法的工作原理如下:
- 一致性哈希算法将整个哈希值空间映射成一个虚拟的圆环,整个哈希空间的取值范围为
[0, 2^32 - 1]
; - 计算各服务器节点的哈希值,并映射到哈希环上;
- 将服务发来的数据请求使用哈希算法算出对应的哈希值;
- 将计算的哈希值映射到哈希环上,同时沿圆环顺时针方向查找,遇到的第一台服务器就是所对应的处理请求服务器。
- 当增加或者删除一台服务器时,受影响的数据仅仅是新添加或删除的服务器到其环空间中前一台的服务器(也就是顺着逆时针方向遇到的第一台服务器)之间的数据,其他都不会受到影响。
细节剖析
算法原理其实很容易理解,但是这里面涉及到一些细节问题:
- 数据请求如何寻找目标服务器?
- 计算请求的哈希值后进行顺序查找,查找到哈希环末尾时跳回第一个元素继续顺序查找。
- 在分布式存储的场景下扩容时,原定向到B节点的部分数据需要迁移到新节点C,如何对这部分存量数据进行迁移?
- 从C节点逆序定位到上一个节点B;
- 对B中全部数据进行重哈希,识别出需迁移数据的范围;
- 将需迁移数据复制到C节点中,确认一致;
- 删除B节点上已迁移的数据。
- 在分布式存储的场景下缩容时,原定向到B节点的所有数据需要迁移到新节点C,如何对这部分存量数据进行迁移?
- 与扩容相似,先复制再删除,只是不需要确定数据范围,因为被删除节点的所有数据都需要迁移。
- 在负载均衡场景下发生节点宕机时,一致性哈希有什么好处?
- 仅宕机节点的流量会去到下一个节点,其余节点的流量不受影响。
- 在分布式存储场景下发生节点故障怎么办?
- 采用数据副本机制,在多个节点之间存储数据的副本,确保在节点失效时可以通过其他副本恢复数据。
- 实现健康检查机制,定期检测节点的状态,并在发现故障时快速重新分配数据。
数据倾斜与虚拟节点
数据倾斜(Data Skew)是指在分布式系统或大数据处理场景中,数据在某些节点或分片之间分布不均匀的现象。这种不均匀性导致某些节点或处理单元负载过重,而其他节点则相对空闲,从而影响系统的性能和响应时间。
有两种情况可能导致数据倾斜:
- 当节点数量较少时,数据可能不均匀地分布在哈希环上,导致某些节点负载过重,而其他节点相对空闲。
- 如果节点在哈希环上的位置不够分散(用的哈希函数不够散列),可能导致某些节点的负载过高。
可以通过虚拟节点的机制来解决数据倾斜问题。虚拟节点机制对每一个物理服务节点映射多个虚拟节点,将这些虚拟节点计算哈希值并映射到哈希环上,当请求找到某个虚拟节点后,将被重新映射到具体的物理节点。虚拟节点越多,哈希环上的节点就越多,数据分布就越均匀,从而避免了数据倾斜的问题。
假设一台服务器有 n 个虚拟节点。那么哈希计算时,可以使用IP + 端口 + 编号
的形式进行哈希值计算。其中的编号就是 0 到 n 的数字。由于IP + 端口
是一样的,所以这 n 个节点都是指向的同一台机器。
Demo实现
数据节点:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class Node {
private static final int VIRTUAL_NODE_NO_PER_NODE = 200;
private final String ip;
private final List<Integer> virtualNodeHashes = new ArrayList<>(VIRTUAL_NODE_NO_PER_NODE);
private final Map<Object, Object> cacheMap = new HashMap<>();
public Node(String ip) {
Objects.requireNonNull(ip);
this.ip = ip;
initVirtualNodes();
}
private void initVirtualNodes() {
String virtualNodeKey;
for (int i = 1; i <= VIRTUAL_NODE_NO_PER_NODE; i++) {
virtualNodeKey = ip + "#" + i;
virtualNodeHashes.add(HashUtils.hashcode(virtualNodeKey));
}
}
public void addCacheItem(Object key, Object value) {
cacheMap.put(key, value);
}
public Object getCacheItem(Object key) {
return cacheMap.get(key);
}
public void removeCacheItem(Object key) {
cacheMap.remove(key);
}
public List<Integer> getVirtualNodeHashes() {
return virtualNodeHashes;
}
public String getIp() {
return ip;
}
}
一致性哈希:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75public class ConsistentHash {
private final TreeMap<Integer, Node> hashRing = new TreeMap<>();
public List<Node> nodeList = new ArrayList<>();
/**
* 增加节点
* 每增加一个节点,就会在闭环上增加给定虚拟节点
* 例如虚拟节点数是2,则每调用此方法一次,增加两个虚拟节点,这两个节点指向同一Node
* @param ip
*/
public void addNode(String ip) {
Objects.requireNonNull(ip);
Node node = new Node(ip);
nodeList.add(node);
for (Integer virtualNodeHash : node.getVirtualNodeHashes()) {
hashRing.put(virtualNodeHash, node);
System.out.println("虚拟节点[" + node + "] hash:" + virtualNodeHash + ",被添加");
}
}
/**
* 移除节点
* @param node
*/
public void removeNode(Node node){
nodeList.remove(node);
}
/**
* 获取缓存数据
* 先找到对应的虚拟节点,然后映射到物理节点
* @param key
* @return
*/
public Object get(Object key) {
Node node = findMatchNode(key);
System.out.println("获取到节点:" + node.getIp());
return node.getCacheItem(key);
}
/**
* 添加缓存
* 先找到hash环上的节点,然后在对应的节点上添加数据缓存
* @param key
* @param value
*/
public void put(Object key, Object value) {
Node node = findMatchNode(key);
node.addCacheItem(key, value);
}
/**
* 删除缓存数据
*/
public void evict(Object key) {
findMatchNode(key).removeCacheItem(key);
}
/**
* 获得一个最近的顺时针节点
* @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
* * @return 节点对象
* @return
*/
private Node findMatchNode(Object key) {
Map.Entry<Integer, Node> entry = hashRing.ceilingEntry(HashUtils.hashcode(key));
if (entry == null) {
entry = hashRing.firstEntry();
}
return entry.getValue();
}
}
哈希函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class HashUtils {
/**
* FNV1_32_HASH
*
* @param obj
* object
* @return hashcode
*/
public static int hashcode(Object obj) {
final int p = 16777619;
int hash = (int) 2166136261L;
String str = obj.toString();
for (int i = 0; i < str.length(); i++)
hash = (hash ^ str.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
if (hash < 0)
hash = Math.abs(hash);
//System.out.println("hash computer:" + hash);
return hash;
}
}
测试:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class ConsistentHashTest {
public static final int NODE_SIZE = 10;
public static final int STRING_COUNT = 100 * 100;
private static ConsistentHash consistentHash = new ConsistentHash();
private static List<String> sList = new ArrayList<>();
public static void main(String[] args) {
// 增加节点
for (int i = 0; i < NODE_SIZE; i++) {
String ip = new StringBuilder("10.2.1.").append(i)
.toString();
consistentHash.addNode(ip);
}
// 生成需要缓存的数据;
for (int i = 0; i < STRING_COUNT; i++) {
sList.add(RandomStringUtils.randomAlphanumeric(10));
}
// 将数据放入到缓存中。
for (String s : sList) {
consistentHash.put(s, s);
}
for(int i = 0 ; i < 10 ; i ++) {
int index = RandomUtils.nextInt(0, STRING_COUNT);
String key = sList.get(index);
String cache = (String) consistentHash.get(key);
System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache));
}
// 输出节点及数据分布情况
for (Node node : consistentHash.nodeList){
System.out.println(node);
}
// 新增一个数据节点
consistentHash.addNode("10.2.1.110");
for(int i = 0 ; i < 10 ; i ++) {
int index = RandomUtils.nextInt(0, STRING_COUNT);
String key = sList.get(index);
String cache = (String) consistentHash.get(key);
System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache));
}
// 输出节点及数据分布情况
for (Node node : consistentHash.nodeList){
System.out.println(node);
}
}
}
参考文献
[1] https://developer.aliyun.com/article/1082388
[2] https://zhuanlan.zhihu.com/p/129049724
[3] https://zhuanlan.zhihu.com/p/636719716
[4] https://zhuanlan.zhihu.com/p/53711866
后记
下一篇:Chord协议,第一代分布式哈希表(DHT)。
首发于 silencezheng.top,转载请注明出处。