目录
1. PhxPaxos源码分析之关于PhxPaxos
2. PhxPaxos分析之网络基础部件
3. PhxPaxos源码分析之Proposer、Acceptor
4. PhxPaxos源码分析之Learner
5. PhxPaxos源码分析之状态机
6. PhxPaxos源码分析之归档机制
7. PhxPaxos源码分析之整体架构
3.1 基本概念
Paxos算法包含三个角色:
-
Proposer
提案发起者。负责发起提案,提案本身由编号(number)、值(value)两部分组成。 -
Acceptor
提案接收者。负责接收Proposer的提案,通过一定的规则首先确定提案编号、最终确定提案值。 -
Learner
选中提案值习得者。不参与提案确定过程,只负责学习已确定好的提案值。
简单来说,提案由每个节点的Proposer、Acceptor参与决策,如果某个节点由于网络故障等原因未参与决策,由Learner负责学习已经选中提案值。
Paxos算法本身并不好理解,这里按通俗易懂的说法讲述整个提案确定过程,完整的Paxos算计介绍请参见Paxos made simple 释译。
注意:一个完整的提案由{编号n,提案值v}两部分组成。我们只关心最终的提案值,提案编号的唯一用途在于确定提案值。
- Proposer选择新的编号n,并发送到所有的Acceptor。
- Acceptor接收到请求后完成如下工作:
2.1 承诺(promise):承诺不再accept所有编号小于n的请求,即reject编号小于n的请求。
2.2 返回(reply):返回当前小于n的最大编号所对应的提议值v(如果存在)。- 如果Proposer发起的提案编号被半数以上的Acceptor接受,此时它可以真正的发起提案值,否则回到步骤一重新确定提案编号。
- 发起的提案值不是Proposer随意确定的。确定规则如下:
4.1 如果步骤2.2中,一个或者多个Acceptor返回了已经被接受(accept)的提案值,新发起的提案值必须是最大编号返回的提案值。
4.2 如果步骤2.2中,没有任何Acceptor返回已被接受(accept)的提案值,由Proposer自行发起新的提案值。- Proposer发起的提案值如果被接受则提案结束,否则重复上述过程,直到确定提案值。
Paxos算法分为提案选定阶段(Prepare)、确定提案值(accept)两个阶段。每个阶段都有可能需要执行多次,每次都花费一个RTT网络耗时。为了减少网络耗时,达到产品级应用的目的,PhxPaxos采用选主并在主上执行Paxos算法的方式避免提案冲突,随后为所有的提案执行noop操作,有条件跳过Prepare阶段将网络花费降至理论最小值(一个RTT)。
本章先来看Proposer、Acceptor这两个角色,Learner将在下一节讲解。
3.2 Proposer
Proposer为提案的发起者,入口函数如下:
int Proposer :: NewValue(const std::string& sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
// 如果允许跳过Prepare阶段 && 当前未被其他节点拒绝
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
BP->GetProposerBP()->NewProposalSkipPrepare();
// 直接进入提案值确定阶段
PLGHead("skip prepare, directly start accept");
Accept();
}
else
{
// 先进入Prepare阶段,随后进入提案值确定阶段
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}
满足如下场景,允许跳过Prepare阶段:
- 本节点之前已经执行过Prepare阶段,并且Prepare阶段的执行结果为Accept。
当不满足上述场景时,需要执行完整的Paxos两阶段流程。来看Prepare阶段。
3.2.1 Prepare阶段
Prepare的入口函数如下:
void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
//重置Proposer状态,准备进入Prepare阶段。
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeone = false;
m_oProposerState.ResetHighestOtherPreAcceptBallot();
//分配新的提案编号
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
m_oMsgCounter.StartNewRound();
//Prepare超时定时器
AddPrepareTimer();
PLGHead("END OK");
//发送Prepare消息
BroadcastMessage(oPaxosMsg);
}
发起Prepare主要做4件事:
- Proposer状态重置。表明当前开始进入Prepare阶段。
- 按需分配新的提案编号。按需的意思是指,当有其他节点明确拒绝了该提案,按Paxos协议必须使用新的提案编号重写发起提案;而如果并无其他节点拒绝,即由于超时等原因导致的重新发起提案,可沿用原有的编号。
- 设置Prepare超时定时器。Prepare超时原因有很多,比如网络丢包。当Prepare超时时,处理方式也很简单,重新执行Prepare。
void Proposer::OnPrepareTimeout()
{
PLGHead("OK");
//本轮提案已经选举结束,不再执行任何操作。
if (GetInstanceID() != m_llTimeoutInstanceID)
{
PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
m_llTimeoutInstanceID, GetInstanceID());
return;
}
BP->GetProposerBP()->PrepareTimeout();
//重新发起Prepare
Prepare(m_bWasRejectBySomeone);
}
- 发送Prepare消息。消息采用UDP方式发送。发送内容包括本轮提案的实例编号、节点编号、提案编号、消息类型等。如果我们依次发起了多轮提案,每轮实例编号依次为1、2、3...,某些节点如果落后于其他节点,需要通过实例编号隔离不同的提案请求。关于整个实例的概念,后面后有详细的讲解。
Prepare消息是发往各个节点的Acceptor的,Acceptor处理完成后发送Replay消息。Proposer交由OnPrepareReply处理Replay消息,逻辑如下:
- 判定消息的有效性。包括实例编号、提案编号是否一致;当前是否处在Prepare阶段等。代码略。
- 根据消息内容更新Proposer状态。包括更新接收节点信息、接收或拒绝状态更新。
//更新:已从node id节点获取数据
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
//更新:接受该提案,并将该节点的promise id(承诺提案编号)、提案值更新到本地。
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
else
{
//更新:该提案被拒绝,标明本轮存在拒绝请求的节点(重新发起提案时需要更新提案值)。并将该节点已承诺的提案编号更新到本地。
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
- 根据已收到的各个节点的回复,判定是否进入Accept阶段,或者重新发起Prepare。
//已收到超过半数的Accept回复消息,直接进入Accept阶段。
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
//最近一次发起的Prepare被接受,后续可跳过Prepare阶段。
m_bCanSkipPrepare = true;
//进入Accept阶段。
Accept();
}
//已收到超过半数的Reject回复消息或者所有节点已回复(这个判断并不需要),重新进入Prepare阶段
else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
//重置Prepare超时定时器,提前触发Prepare超时。
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
3.2.2 Accept阶段
Accept的入口函数如下:
void Proposer::Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
//标识进入Accept阶段
ExitPrepare();
m_bIsAccepting = true;
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
//添加Accept超时定时器
AddAcceptTimer();
PLGHead("END");
//发送Accept消息
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}
Accept和Prepare阶段的request操作如出一辙:
- Proposer状态重置。表明当前开始进入Accept阶段。
- 设置Accept超时定时器。Accept超时原因有很多,比如网络丢包。当Accept超时时,处理方式也很简单,重新进入Prepare阶段。
void Proposer::OnAcceptTimeout()
{
PLGHead("OK");
//本轮提案已经选举结束,不再执行任何操作。
if (GetInstanceID() != m_llTimeoutInstanceID)
{
PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",
m_llTimeoutInstanceID, GetInstanceID());
return;
}
BP->GetProposerBP()->AcceptTimeout();
//重新发起Prepare
Prepare(m_bWasRejectBySomeone);
}
- 发送Accept消息到其他节点。消息采用UDP方式发送。发送内容包括本轮提案的实例编号、节点编号、提案编号、消息类型、提案值等。
OnAcceptReply处理逻辑和OnPrepareReply类似,包括如下几步:
- 判定消息有效性。包括实例编号、提案编号是否一致;当前是否处在Accept阶段等。
- 根据消息内容更新Proposer状态。包括更新接收节点信息、接收或拒绝状态更新。
- 根据已收到的各个节点的回复,判定Accept阶段是否已完成,或者重新发起Prepare。
//提案被接受,提前退出Accept阶段
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
//通知Learner,本轮提案已完成
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
//Accept失败,终止Accept阶段。
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
如果提案为获得半数通过,即未被选中(chosen)将从prepare阶段重新发起提案。如果获得半数通过,learner通知所有节点提案被选中,即从accept状态改为chosen状态。
3.3 Acceptor
Acceptor做为提案的被动参与者,也分为Prepare和Accept两个阶段
3.3.1 Prepare阶段
Proposer发送的Prepare消息由Acceptor的OnPrepare处理,逻辑如下:
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
//当前的提案编号大于已承诺的编号,接受该提案
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//返回之前承诺的提案编号
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//如果之前已经接受过某个提案的提案值,返回该提案值
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
//将承诺的提案更新为当前提案
m_oAcceptorState.SetPromiseBallot(oBallot);
//保证P2.c的不变性,数据需要入库
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
else
{
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
//已存在更高编号的提案,拒绝该提案并返回已承诺提案编号
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
//发送Replay消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}
OnPrepare函数看似并未做任何有效性校验,但这部分校验是必不可少的,并未省去,而是出现在了调用OnPrepare的Instance类的上层函数中。关于Instance后面也会单独说明,这里的校验主要是保证参数中的instance id和acceptor一致。
另外一点需要说明的是:前面一直提到的提案编号并不为完整的Paxos意义上的提案编号,完整的提案编号由提案编号(proposal id)+节点Id(node id)两部分组成。代码上看,BallotNumber代表了Paxos意义上的提案编号。
class BallotNumber
{
public:
......
uint64_t m_llProposalID;
nodeid_t m_llNodeID;
};
3.3.2 Accept阶段
Proposer发送的Accept消息由Acceptor的OnAccept处理,逻辑如下:
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
//当前的提案编号大于或等于已承诺的编号,接受该提案
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//更新提案编号、提案值
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
//按Paxos协议P2.c不变性要求,数据需要持久化(包括实例号、提案编号、提案值)
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
else
{
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
//已存在更高编号的提案,拒绝该提案并返回已承诺提案编号
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}
有一点需要说明,OnPrepare、OnAccept两个阶段中,如果处理结果为接受,那么需要保证不管在何种情况下,这个结果是不变的。因此,数据需要被持久化,但这个持久化数据只需要本地保存即可,PhxPaxos中采用leveldb做本地持久化数据库。
3.5 并发
前面的处理逻辑中,没有看到加锁或者其他并发保护机制。那么,如果一个instance正在处理Prepare请求时,同时接收到了Accept消息怎么办呢?其实,PhxPaxos中还有另外一个IOLoop线程,专门负责接收网络消息,并串行执行。
IOLoop线程的run函数实现如下:
void IOLoop :: run()
{
m_bIsEnd = false;
m_bIsStart = true;
while(true)
{
BP->GetIOLoopBP()->OneLoop();
int iNextTimeout = 1000;
DealwithTimeout(iNextTimeout);
//PLGHead("nexttimeout %d", iNextTimeout);
OneLoop(iNextTimeout);
if (m_bIsEnd)
{
PLGHead("IOLoop [End]");
break;
}
}
}
其中,OneLoop是我们本次关注的重点:
void IOLoop :: OneLoop(const int iTimeoutMs)
{
std::string * psMessage = nullptr;
m_oMessageQueue.lock();
bool bSucc = m_oMessageQueue.peek(psMessage, iTimeoutMs);
if (!bSucc)
{
m_oMessageQueue.unlock();
}
else
{
m_oMessageQueue.pop();
m_oMessageQueue.unlock();
if (psMessage != nullptr && psMessage->size() > 0)
{
m_iQueueMemSize -= psMessage->size();
m_poInstance->OnReceive(*psMessage);
}
delete psMessage;
BP->GetIOLoopBP()->OutQueueMsg();
}
DealWithRetry();
//must put on here
//because addtimer on this funciton
m_poInstance->CheckNewValue();
}
OneLoop从m_oMessageQueue中取出一个网络消息随后交给Instance处理。每个Group的Instance启动一个IOLoop线程,不同Group彼此隔离,并发处理互不影响。
3.4 总结
本章简要介绍了Paxos算法原理,了解到Paxos算法的三大角色:Proposer、Acceptor、Learner。讲解了Proposer、Learner两个角色的主要代码实现,以及二者如何参与到Prepare、Accept两个阶段中。
至于最后一个角色Learner,原本的理解认为应该是参与度最低的,逻辑最少的角色。但PhxPaxos中,Learner是三者中实现最复杂的,这部分内容将在下一章单独讲解。
【转载请注明】随安居士. 3. PhxPaxos源码分析之Proposer、Acceptor. 2017.11.14