首先先回顾一下关于Leader 的一些思考:
温馨提示:本文是从源码的角度分析 DLedger 选主实现原理,可能比较枯燥,文末给出了选主流程图。
DLedger 关于选主的核心类图
多副本模块相关的配置信息,例如集群节点信息。
节点状态机,即 raft 协议中的 follower、candidate、leader 三种状态的状态机实现。
DLedger 客户端协议,主要定义如下三个方法,在后面的日志复制部分会重点阐述。
DLedger服务端协议,主要定义如下三个方法。
DLedgerClientProtocolHandler、DLedgerProtocolHander 协议处理器。
DLedger Server(节点)之间的网络通信,默认基于 Netty 实现,其实现类为:DLedgerRpcNettyService。
Leader 选举实现器。
Dledger Server,Dledger 节点的封装类。
接下来将从 DLedgerLeaderElector 开始剖析 DLedger 是如何实现 Leader 选举的。(基于raft 协议)。
我们先一一来介绍其属性的含义:
允许最大的 N 个心跳周期内未收到心跳包,状态为 Follower 的节点只有超过 maxHeartBeatLeak * heartBeatTimeIntervalMs 的时间内未收到主节点的心跳包,才会重新进入 Candidate 状态,重新下一轮的选举。
通过 DLedgerLeaderElector 的 startup 方法启动状态管理机,代码如下:
DLedgerLeaderElector#startup
public void startup() {
stateMaintainer.start(); // @1
for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) { // @2
roleChangeHandler.startup();
}
}
代码@1:启动状态维护管理器。
代码@2:遍历状态改变监听器并启动它,可通过 DLedgerLeaderElector 的 addRoleChangeHandler 方法增加状态变化监听器。
其中的是启动状态管理器线程,其run方法实现:
public void run() {
while (running.get()) {
try {
doWork();
} catch (Throwable t) {
if (logger != null) {
logger.error("Unexpected Error in running {} ", getName(), t);
}
}
}
latch.countDown();
}
从上面来看,主要是循环调用 doWork 方法,接下来重点看其 doWork 的实现:
public void doWork() {
try {
if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) { // @1
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig); // @2
DLedgerLeaderElector.this.maintainState(); // @3
}
sleep(10); // @4
} catch (Throwable t) {
DLedgerLeaderElector.logger.error("Error in heartbeat", t);
}
}
代码@1:如果该节点参与 Leader 选举,则首先调用@2重置定时器,然后驱动状态机(@3),是接下来重点需要剖析的。
代码@4:每执行一次选主,休息10ms。
DLedgerLeaderElector#maintainState
private void maintainState() throws Exception {
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
maintainAsFollower();
} else {
maintainAsCandidate();
}
}
根据当前的状态机状态,执行对应的操作,从 raft 协议中可知,总共存在3种状态:
我们在继续往下看之前,需要知道 memberState 的初始值是什么?我们追溯到创建 MemberState 的地方,发现其初始状态为 CANDIDATE。那我们接下从 maintainAsCandidate 方法开始跟进。
温馨提示:在raft协议中,节点的状态默认为follower,DLedger的实现从candidate开始,一开始,集群内的所有节点都会尝试发起投票,这样第一轮要达成选举几乎不太可能。
整个状态机的驱动,由线程反复执行 maintainState 方法。下面重点来分析其状态的驱动。
DLedgerLeaderElector#maintainAsCandidate
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
long term;
long ledgerEndTerm;
long ledgerEndIndex;
Step1:首先先介绍几个变量的含义:
下一次发起的投票的时间,如果当前时间小于该值,说明计时器未过期,此时无需发起投票。
是否应该立即发起投票。如果为 true,则忽略计时器,该值默认为 false,当收到从主节点的心跳包并且当前状态机的轮次大于主节点的轮次,说明集群中 Leade r的投票轮次小于从当前收到请求节点的投票轮次,应该立即发起新的投票。
DLedgerLeaderElector#maintainAsCandidate
synchronized (memberState) {
if (!memberState.isCandidate()) {
return;
}
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();
term = memberState.nextTerm();
logger.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
ledgerEndIndex = memberState.getLedgerEndIndex();
ledgerEndTerm = memberState.getLedgerEndTerm();
}
Step2:初始化 team、ledgerEndIndex 、ledgerEndTerm 属性,其实现关键点如下:
如果上一次的投票结果为待下一次投票或应该立即开启投票,并且根据当前状态机获取下一轮的投票轮次,稍后会着重讲解一下状态机轮次的维护机制。
如果上一次的投票结果不是 WAIT_TO_VOTE_NEXT(等待下一轮投票),则投票轮次依然为状态机内部维护的轮次。
DLedgerLeaderElector#maintainAsCandidate
if (needIncreaseTermImmediately) {
nextTimeToRequestVote = getNextTimeToRequestVote();
needIncreaseTermImmediately = false;
return;
}
Step3:如果 needIncreaseTermImmediately 为 true,则重置该标记位为 false,并重新设置下一次投票超时时间,其实现代码如下:
private long getNextTimeToRequestVote() {
return System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);
}
下一次倒计时:当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) + (1000- 300 )之间的随机值。
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
Step4:向集群内的其他节点发起投票请求,并返回投票结果列表,稍后会重点分析其投票过程。可以预见,接下来就是根据各投票结果进行仲裁。
final AtomicLong knownMaxTermInGroup = new AtomicLong(-1);
final AtomicInteger allNum = new AtomicInteger(0);
final AtomicInteger validNum = new AtomicInteger(0);
final AtomicInteger acceptedNum = new AtomicInteger(0);
final AtomicInteger notReadyTermNum = new AtomicInteger(0);
final AtomicInteger biggerLedgerNum = new AtomicInteger(0);
final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
Step5:在进行投票结果仲裁之前,先来介绍几个局部变量的含义:
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
// 省略部分代码
}
Step5:遍历投票结果,收集投票结果,接下来重点看其内部实现。
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();
}
Step6:如果投票结果不是 UNKNOW,则有效投票数量增1。
synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
case ACCEPT:
acceptedNum.incrementAndGet();
break;
case REJECT_ALREADY_VOTED:
break;
case REJECT_ALREADY_HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true);
break;
case REJECT_TERM_SMALL_THAN_LEDGER:
case REJECT_EXPIRED_VOTE_TERM:
if (x.getTerm() > knownMaxTermInGroup.get()) {
knownMaxTermInGroup.set(x.getTerm());
}
break;
case REJECT_EXPIRED_LEDGER_TERM:
case REJECT_SMALL_LEDGER_END_INDEX:
biggerLedgerNum.incrementAndGet();
break;
case REJECT_TERM_NOT_READY:
notReadyTermNum.incrementAndGet();
break;
default:
break;
}
}
Step7:统计投票结构,几个关键点如下:
try {
voteLatch.await(3000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}
Step8:等待收集投票结果,并设置超时时间。
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (memberState.isQuorum(acceptedNum.get())) {
parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
Step9:根据收集的投票结果判断是否能成为 Leader。
温馨提示:在讲解关键点之前,我们先定义先将(当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) + (1000- 300 )之间的随机值)定义为“ 1个常规计时器”。
其关键点如下:
if (parseResult == VoteResponse.ParseResult.PASSED) {
logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
changeRoleToLeader(term);
}
Step10:如果投票成功,则状态机状态设置为 Leader,然后状态管理在驱动状态时会调用 DLedgerLeaderElector#maintainState 时,将进入到 maintainAsLeader方法。
经过 maintainAsCandidate 投票选举后,被其他节点选举成为领导后,会执行该方法,其他节点的状态还是 Candidate,并在计时器过期后,又尝试去发起选举。接下来重点分析成为 Leader 节点后,该节点会做些什么?
DLedgerLeaderElector#maintainAsLeader
private void maintainAsLeader() throws Exception {
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) { // @1
long term;
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) { // @2
//stop sending
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.currentTimeMillis(); // @3
}
sendHeartbeats(term, leaderId); // @4
}
}
代码@1:首先判断上一次发送心跳的时间与当前时间的差值是否大于心跳包发送间隔,如果超过,则说明需要发送心跳包。
代码@2:如果当前不是 leader 节点,则直接返回,主要是为了二次判断。
代码@3:重置心跳包发送计时器。
代码@4:向集群内的所有节点发送心跳包,稍后会详细介绍心跳包的发送。
当 Candidate 状态的节点在收到主节点发送的心跳包后,会将状态变更为 follower,那我们先来看一下在 follower 状态下,节点会做些什么事情?
private void maintainAsFollower() {
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
synchronized (memberState) {
if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
如果 maxHeartBeatLeak (默认为3)个心跳包周期内未收到心跳,则将状态变更为Candidate。
状态机的驱动就介绍到这里,在上面的流程中,其实我们忽略了两个重要的过程,一个是发起投票请求与投票请求响应、发送心跳包与心跳包响应,那我们接下来将重点介绍这两个过程。
节点的状态为 Candidate 时会向集群内的其他节点发起投票请求(个人觉得理解为拉票更好),向对方询问是否愿意选举我为 Leader,对端节点会根据自己的情况对其投赞成票、拒绝票,如果是拒绝票,还会给出拒绝原因,具体由 voteForQuorumResponses、handleVote 这两个方法来实现,接下来我们分别对这两个方法进行详细分析。
发起投票请求。
private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term, long ledgerEndTerm,
long ledgerEndIndex) throws Exception { // @1
List<CompletableFuture<VoteResponse>> responses = new ArrayList<>();
for (String id : memberState.getPeerMap().keySet()) { // @2
VoteRequest voteRequest = new VoteRequest(); // @3 start
voteRequest.setGroup(memberState.getGroup());
voteRequest.setLedgerEndIndex(ledgerEndIndex);
voteRequest.setLedgerEndTerm(ledgerEndTerm);
voteRequest.setLeaderId(memberState.getSelfId());
voteRequest.setTerm(term);
voteRequest.setRemoteId(id);
CompletableFuture<VoteResponse> voteResponse; // @3 end
if (memberState.getSelfId().equals(id)) { // @4
voteResponse = handleVote(voteRequest, true);
} else {
//async
voteResponse = dLedgerRpcService.vote(voteRequest); // @5
}
responses.add(voteResponse);
}
return responses;
}
代码@1:首先先解释一下参数的含义:
代码@3:构建投票请求。
代码@4:如果是发送给自己的,则直接调用 handleVote 进行投票请求响应,如果是发送给集群内的其他节点,则通过网络发送投票请求,对端节点调用各自的handleVote对集群进行响应。
接下来重点关注 handleVote 方法,重点探讨其投票处理逻辑。
由于 handleVote 方法会并发被调用,因为可能同时收到多个节点的投票请求,故本方法都被 synchronized 方法包含,锁定的对象为状态机 memberState 对象。
if (!memberState.isPeerMember(request.getLeaderId())) {
logger.warn("[BUG] [HandleVote] remoteId={} is an unknown member", request.getLeaderId());
return CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));
}
if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
logger.warn("[BUG] [HandleVote] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));
}
Step1:为了逻辑的完整性对其请求进行检验,除非有 BUG 存在,否则是不会出现上述问题的。
if (request.getTerm() < memberState.currTerm()) { // @1
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) { // @2
if (memberState.currVoteFor() == null) {
//let it go
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
//repeat just let it go
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER));
} else {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
}
} else { // @3
//stepped down by larger term
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//only can handleVote when the term is consistent
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}
Step2:判断发起节点、响应节点维护的 team 进行投票“仲裁”,分如下3种情况讨论:
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
}
if (request.getTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));
}
Step3:判断请求节点的 ledgerEndTerm 与当前节点的 ledgerEndTerm,这里主要是判断日志的复制进度。
memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
Step4:经过层层条件帅选,将宝贵的赞成票投给请求节点。
经过几轮投票,最终一个节点能成功被推举出来,选为主节点。主节点为了维持其领导地位,需要定时向从节点发送心跳包,接下来我们重点看一下心跳包的发送与响应。
Step1:遍历集群中的节点,异步发送心跳包。
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {
if (ex != null) {
throw ex;
}
switch (DLedgerResponseCode.valueOf(x.getCode())) {
case SUCCESS:
succNum.incrementAndGet();
break;
case EXPIRED_TERM:
maxTerm.set(x.getTerm());
break;
case INCONSISTENT_LEADER:
inconsistLeader.compareAndSet(false, true);
break;
case TERM_NOT_READY:
notReadyNum.incrementAndGet();
break;
default:
break;
}
if (memberState.isQuorum(succNum.get())
|| memberState.isQuorum(succNum.get() + notReadyNum.get())) {
beatLatch.countDown();
}
} catch (Throwable t) {
logger.error("Parse heartbeat response failed", t);
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
beatLatch.countDown();
}
}
});
}
Step2:统计心跳包发送响应结果,关键点如下:
这些响应值,我们在处理心跳包时重点探讨。
beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);
if (memberState.isQuorum(succNum.get())) { // @1
lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) { // @2
lastSendHeartBeatTime = -1;
} else if (maxTerm.get() > term) { // @3
changeRoleToCandidate(maxTerm.get());
} else if (inconsistLeader.get()) { // @4
changeRoleToCandidate(term);
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {
changeRoleToCandidate(term);
}
}
对收集的响应结果做仲裁,其实现关键点:
接下来我们重点看一下心跳包的处理逻辑。
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) {
if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
}
}
Step1:如果主节点的 term 小于 从节点的 term,发送反馈给主节点,告知主节点的 term 已过时;如果投票轮次相同,并且发送心跳包的节点是该节点的主节点,则返回成功。
下面重点讨论主节点的 term 大于从节点的情况。
synchronized (memberState) {
if (request.getTerm() < memberState.currTerm()) { // @1
return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));
} else if (request.getTerm() == memberState.currTerm()) { // @2
if (memberState.getLeaderId() == null) {
changeRoleToFollower(request.getTerm(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else if (request.getLeaderId().equals(memberState.getLeaderId())) {
lastLeaderHeartBeatTime = System.currentTimeMillis();
return CompletableFuture.completedFuture(new HeartBeatResponse());
} else {
//this should not happen, but if happened
logger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));
}
} else {
//To make it simple, for larger term, do not change to follower immediately
//first change to candidate, and notify the state-maintainer thread
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
//TOOD notify
return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));
}
}
Step2:加锁来处理(这里更多的是从节点第一次收到主节点的心跳包)
代码@1:如果主节的投票轮次小于当前投票轮次,则返回主节点投票轮次过期。
代码@2:如果投票轮次相同。
代码@3:如果主节点的投票轮次大于从节点的投票轮次,则认为从节点并为准备好,则从节点进入Candidate 状态,并立即发起一次投票。
心跳包的处理就介绍到这里。
RocketMQ 多副本之 Leader 选举的源码分析就介绍到这里了,为了加强对源码的理解,先梳理流程图如下:
本文就介绍到这里了,如果对您有一定的帮助,麻烦帮忙点个在看,谢谢。