Zookeeper集群Leader选举源码剖析
关于这篇的理论基础,即Paxos算法和ZAB协议
、选举流程分析。 已经在另一篇博客单独写出来了,这里就不赘述了。但是这里还是贴一下选举流程的图,便于理解源码:
QuorumPeer工作流程
QuorumCnxManager:负责各台服务器之间的底层Leader选举过程中的网络通信对应的类。每台服务器在启动的过程中,会启动一个QuorumPeer 。
Zookeeper 对于每个节点 QuorumPeer
的设计相当的灵活, QuorumPeer 主要包括四个组件:
- 客户端请求接收器( ServerCnxnFactory )
ServerCnxnFactory负责维护与客户端的连接(接收客户端的请求并发送相应的响应);(1001行)
- 数据引擎( ZKDatabase )
ZKDatabase负责存储/加载/查找数据(基于目录树结构的KV+操作日志+客户端Session);(129行)
- 选举器( Election )
Election负责选举集群的一个Leader节点;(998行)
- 核心功能组件(Leader/Follower/Observer )。
Leader/Follower/Observer确认是QuorumPeer节点应该完成的核心职责;(1270行)
QuorumPeer 工作流程比较复杂,如下图:
QuorumPeer工作流程:
1:初始化配置
2:加载当前存在的数据
3:启动网络通信组件
4:启动控制台
5:开启选举协调者,并执行选举(这个过程是会持续,并不是一次操作就结束了)
QuorumCnxManager源码分析
QuorumCnxManager
内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器,除接收队列以外,其他队列都按照SID
分组形成队列集合,如一个集群中除了自身还有3台机器,那么就会为这3台机器分别创建一个发送队列,互不干扰。
QuorumCnxManager.Listener : 为了能够相互投票,Zookeeper集群中的所有机器都需要建立起网络连接。QuorumCnxManager
在启动时会创建一个ServerSocket
来监听Leader选举的通信端口。开启监听后,Zookeeper能够不断地接收到来自其他服务器地创建连接请求,在接收到其他服务器地TCP连接请求时,会进行处理。为了避免两台机器之间重复地创建TCP连接,Zookeeper只允许SID
大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的SID值来判断是否接收连接请求,如果当前服务器发现自己的SID更大,那么会断开当前连接,然后自己主动和远程服务器将连接(自己作为“客户端”)。一旦连接建立,就会根据远程服务器的SID来创建相应的消息发送器SendWorker
和消息接收器RecvWorker
,并启动。
QuorumCnxManager.Listener 监听启动可以查看QuorumCnxManager.Listener 的 run 方法,源代码如下,可以断点调试看到此时监听的正是我们所说的投票端口:
上面是监听器,各个服务之间进行通信我们需要开启 ListenerHandler 线程,在QuorumCnxManager.Listener.ListenerHandler
的run
方法中有一个方法 acceptConnections()
调用,该方法就是用于接受每次选举投票的信息,如果只有一个节点或者没有投票信息的时候,此时方法会阻塞,一旦执行选举,程序会往下执行,我们可以先启动1台服务,再启动第2台、第3台,此时会收到有客户端参与投票链接,程序会往下执行,源码如下:
我们启动2台服务,效果如下:
上面虽然能证明投票访问了当前监听的端口,但怎么知道是哪台服务呢?我们可以沿着acceptConnections()
中调用的receiveConnection()
源码继续研究,源码如下:receiveConnection()
方法只是获取了数据流,并没做特殊处理,并且调用了 handleConnection()
方法,该方法源码如下:
这就表明了是通过网络连接获取数据sid,获取sid表示是哪一台连过来的。
FastLeaderElection算法源码分析
在 Zookeeper 集群中,主要分为三者角色,而每一个节点同时只能扮演一种角色,这三种角色分别是:
(1) Leader :接受所有Follower的提案请求并统一协调发起提案的投票,负责与所有的Follower进行内部的数据交换(同步);
(2) Follower : 直接为客户端提供服务并参与提案的投票,同时与 Leader 进行数据交换(同步);
(3) Observer : 直接为客户端服务但并不参与提案的投票,同时也与Leader 进行数据交换(同步);
FastLeaderElection
选举算法是标准的 Fast Paxos 算法实现,可解决 LeaderElection
选举算法收敛速度慢的问题。创建 FastLeaderElection 只需要 new FastLeaderElection()
即可,如下代码:
创建 FastLeaderElection 会调用 starter() 方法,该方法会创建 sendqueue 、 recvqueue 队列、Messenger
对象,其中 Messenger 对象的作用非常关键,方法源码如下:
创建Messenger的时候,会创建 WorkerSender 并封装成 wsThread
线程,创建 WorkerReceiver 并封装成 wrThread
线程,看名字就很容易理解, wsThread 用于发送数据, wrThread 用于接收数据,Messenger 创建源码如下:
创建完 FastLeaderElection 后接着会调用它的 start() 方法启动选举算法,代码如下:
start()方法如下:
上图意味着 wsThread 和 wrThread 线程都将启动。
wsThread 由 WorkerSender 封装而来,此时会调用 WorkerSender 的 run 方法,run方法会调用process()
方法,源码如下:
process 方法调用了 manager 的toSend
方法,此时是把对应的sid作为了消息发送出去,这里其实是发送投票信息,源码如下:
投票可以投自己,也可以投别人,如果是选票选自己,只需要把投票信息添加到 recvQueue 中即可,源码如下:
在WorkerReceiver.run
方法中会从 recvQueue 中获取 Message ,并把发送给其他服务的投票封装到sendqueue
队列中,交给 WorkerSender 发送处理,部分源码如下:
由于这块代码太多,所以就只截取了run方法中关键的代码
Zookeeper选举投票剖析
选举是个很复杂的过程,要考虑很多场景,而且选举过程中有很多概念需要理解。
选举概念
1)ZK服务状态:
2)服务角色:
3)投票消息广播:
4)选票模型:
5)消息发送对象:
选举过程
QuorumPeer本身是个线程,在集群启动的时候会执行 quorumPeer.start();
,此时会调用它重写的start()
方法,最后会调用父类的 start() 方法,所以该线程会启动执行,因此会执行它的run
方法,而run方法正是选举流程的入口,我们看run方法关键源码如下:
所有节点初始状态都为LOOKING,会进入到选举流程,选举流程首先要获取算法,获取算法的方法是makeLEStrategy()
,该方法返回的是FastLeaderElection
实例,核心选举流程是FastLeaderElection 中的 lookForLeader()
方法。lookForLeader()
是选举过程的关键流程,源码分析如下:
上面多个地方都用到了过半数以上的判断方法: hasAllQuorums()
该方法用到了 QuorumMaj
类,代码如下:
QuorumMaj 构造函数中体现了过半数以上的操作,代码如下:
投票规则
来看一下选票PK的方法 totalOrderPredicate() ,该方法其实就是Leader选举规则,规则有如下三个:
① EPOCH大的直接胜出
② EPOCH相同,事务id大的胜出
③事务id相同,服务器id大的胜出
源码如下: