nacos的集群分为持久化和非持久化两种,持久化采用的Raft协议来实现,非持久化使用不同节点作为节点分片存储实现,以下主要分析持久化的实现
1.nacos持久化集群
nacos持久化的集群类似于zookeeper, 它分为leader角色和follower角色, 那么从这个角色的名字可以看出来,这个集群存在选举的机制。 因为如果自己不具备选举功能,角色的命名可能就是master/slave了
1. 1选举算法
使用raft算法协议保证数据一致性(和zookeeper一样弱一致性),以及相关的leader选举
raft参考:http://thesecretlivesofdata.com/raft/;
强烈建议先理解raft协议思想,下面看代码无非就是熟悉一下流程而已;
leader
follower
candidate选举
选举有两个时机
1.服务启动的时候
2.leader挂了的时候
所有节点启动的时候,都是follower状态。 如果在一段时间内如果没有收到leader的心跳(可能是没有 leader,也可能是leader挂了),那么follower会变成Candidate。然后发起选举,选举之前,会增加 term,这个term和zookeeper中的epoch的道理是一样的。
follower会投自己一票,并且给其他节点发送票据vote,等到其他节点回复
在这个过程中,可能出现几种情况 收到过半的票数通过,则成为leader 被告知其他节点已经成为leader,则自己切换为follower 一段时间内没有收到过半的投票,则重新发起选举
约束条件在任一term中,单个节点最多只能投一票
选举的几种情况
第一种情况,赢得选举之后,leader会给所有节点发送消息,避免其他节点触发新的选举
第二种情况,比如有三个节点A B C。A B同时发起选举,而A的选举消息先到达C,C给A投了一 票,当B的消息到达C时,已经不能满足上面提到的第一个约束,即C不会给B投票,而A和B显然都 不会给对方投票。A胜出之后,会给B,C发心跳消息,节点B发现节点A的term不低于自己的term, 知道有已经有Leader了,于是转换成follower
第三种情况, 没有任何节点获得超过半数的投票,可能是平票的情况。加入总共有四个节点 (A/B/C/D),Node C、Node D同时成为了candidate,但Node A投了NodeD一票,NodeB投 了Node C一票,这就出现了平票 split vote的情况。这个时候大家都在等啊等,直到超时后重新发 起选举。如果出现平票的情况,那么就延长了系统不可用的时间,因此raft引入了randomized election timeouts来尽量避免平票情况
源码分析:
Server节点数据获取是通过ServerListManager从文件和内存数据读取以及比较来的
public class ServerListManager{
@PostConstruct
public void init() {
//这个获取server列表,列表更新也会触发对应事件
GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
//服务状态监测
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
}
}
下面是Raft核心算法
public class RaftCore {
// 。。。
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);
//设置term,默认给0,如果本地文件获取有term会用数据中的term
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
//这里不是重点;就是简单service,instance的任务要保证先完成,再选举而已
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
initialized = true;
//这里是一个定时任务负责master的选举
GlobalExecutor.registerMasterElection(new MasterElection());
//这个负责定时心跳处理,就是客户端发送心跳给服务端
GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
}
1)leader选举
就是基于本地启动发起选举
//Master的选举
public class MasterElection implements Runnable {
@Override
public void run() {
try {
//表示peers状态好了,这个是通过listerner监控获取的
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
//这个是要等待多久开启发起选举,
//1.local.leaderDueMs的值一开始是随机生成的,范围是[0, 15000),单位是毫秒,
//2.此后按照500毫秒一个梯度进行递减,减少到≤0后,就会触发选举操作。当然,选举之前,把超时时间重置一下。
//3.resetLeaderDue()方法是把leaderDueMs变量重新赋值,但是并不是像初始化随机赋值一样的逻辑,而是在15000毫秒的基础上加上了一个随机值,其随机值的范围是[0, 5000)毫秒。
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
//发起选举
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
public void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
JSON.toJSONString(getLeader()), local.term);
peers.reset();
//本地的term增加一
local.term.incrementAndGet();
//先投给自己
local.voteFor = local.ip;
//更新自己的状态
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JSON.toJSONString(local));
//发给除了自己所有其他服务节点
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildURL(server, API_VOTE);
try {
//会请求RaftController的raft/vote接口做 处理;
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
//拿到投票结果,选举最终的leader
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}
}
//Raft节点集
public class RaftPeerSet {
//投票选出leader,每次投票结果返回都会执行这个操作
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
//比较最大投票数并且当投票数量 总投票节点 过半,将该节点设置为leader
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
//发布事件,leader已经选举完成了
applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
}
RaftPeer收到投票做处理
public class RaftController {
@NeedAuth
@PostMapping("/vote")
public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
RaftPeer peer = raftCore.receivedVote(
JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class));
return JSON.parseObject(JSON.toJSONString(peer));
}
}
//RaftCore
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
//如果当前的term大于远程的term,返回本地的给远端作为投票返回
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" +
", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
//第一个收到就返回先收到的投票
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
总结一下上面的leader节点选举:
脑子里一定要考虑到几个节点在投票的情景,比如下面这个:
1.启动后会先判定服务是否已经启动好,启动好后每一个服务有一个初始化leader选举到期时间,会不停的减去500ms,直到小于等于0才执行选举
2.选举的时候term增加1;状态设置为candidate,并把自己节点作为投票节点向其他所有节点发送投票选举;
3.节点拿到选举投票信息后,如果如果远端的节点term不能大于本地节点信息就返回本地投票结果给对方
4.选举获取投票结果后会加入本次 统计本地所有的投票信息,判定如果最多投票节点数过半了,就选出leader了;
2) 心跳检测leader健康
心跳检测 leader是否挂了,如果挂了是要重新选举的;
这个是leader主动发起心跳给每一个follower节点,重置服务端leader的选举发生
上面我们初始化了一个启动RaftCore的时候创建了
GlobalExecutor.registerHeartbeat(new HeartBeat());
我们看这个心跳节点信息
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
//和上面的道理相同,有一个心跳延时缓冲
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
//发起心跳
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
public void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
//这里表示在集群中,不是leader就不执行了
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
//同样重置leader选举时间
local.resetLeaderDue();
// build data
JSONObject packet = new JSONObject();
packet.put("peer", local);
JSONArray array = new JSONArray();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
}
//这个和命名服务service、instance有关,表示把当前节点的servive,instance打包发送给从节点判定变化情况
//默认也会走这个逻辑,除非手动给关闭了
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {
JSONObject element = new JSONObject();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp);
array.add(element);
}
}
packet.put("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JSON.toJSONString(packet));
String content = JSON.toJSONString(params);
//gizp压缩,如果带有service,instance避免文件过大
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
content.length(), compressedContent.length());
}
//给除了自己的所有节点发送心跳信息
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildURL(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
//异步发送url=vote/beat
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
response.getResponseBody(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return 1;
}
//成功返回结果更新本地ip,peer信息
peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
return 0;
}
@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}
上面就是简单遍历异步发送给除自己之外的所有节点自己的信息;
其他节点拿到请求处理
RaftController
@PostMapping("/beat")
public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");
JSONObject json = JSON.parseObject(value);
JSONObject beat = JSON.parseObject(json.getString("beat"));
//这里处理
RaftPeer peer = raftCore.receivedBeat(beat);
return JSON.parseObject(JSON.toJSONString(peer));
}
//收到心跳信息的处理
public RaftPeer receivedBeat(JSONObject beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
remote.ip = beat.getJSONObject("peer").getString("ip");
remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
remote.state, JSON.toJSONString(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
if (local.term.get() > remote.term.get()) {
Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
, remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
+ ", beat-to-term: " + local.term.get());
}
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
// mk follower,如果当前节点不是follower节点,强制设置为follower节点
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JSONArray beatDatums = beat.getJSONArray("datums");
//重置本地leader的选举过期,
local.resetLeaderDue();
//重置心跳时间
local.resetHeartbeatDue();
//1.这个表示获取到的是leader会把本地节点的leader应用更新为当前的远程主节点
//2.遍历自己所有的peer,将原来存储老旧的leader节点 状态由leader->follower状态
peers.makeLeader(remote);
//这个表示除了发送心跳还有其他信息(service,instance)发送,这下面也主要是解析这些信息
if (!switchDomain.isSendBeatOnly()) {
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
//本地key记录,如果有远程没有传递过来该key;就表示档期key为失效的key了,当前节点会删除key的所有信息
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums,这个表示会批量执行本地更新的datum数据
List<String> batch = new ArrayList<>();
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JSONObject entry = (JSONObject) object;
String key = entry.getString("key");
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.getLong("timestamp");
receivedKeysMap.put(datumKey, 1);
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
continue;
}
//如果当前内存不存在该key或者时间戳小于 远程记录时间搓,就会记录起来后面批量执行更新
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
//表示batch批量执行个数 大于50;或者最后一个处理了就会往下执行了
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
, getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
// update datum entry;从远程主节点再次拉取datum信息(service/instance)
String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
});
//对于远程的datum数据处理
for (JSONObject datumJson : datumList) {
OPERATE_LOCK.lock();
Datum newDatum = null;
try {
//获取本地旧的datum信息
Datum oldDatum = getDatum(datumJson.getString("key"));
if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
continue;
}
//满足service时,解析为servive的Datum
if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.getString("key");
serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
serviceDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
newDatum = serviceDatum;
}
//满足instance时,解析为instance的Datum
if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.getString("key");
instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
instancesDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
//对于变化的 service,instance写磁盘
raftStore.write(newDatum);
//更新内存中datum
datums.put(newDatum.key, newDatum);
//发布事件,服务信息变化了
notifier.addTask(newDatum.key, ApplyAction.CHANGE);
//重置本地leader的过期时间
local.resetLeaderDue();
//这里本地的term加上100大于远程的term值会更新本地leader的term值,并保持自己local的term一致
//为什么会+100;原因是每次service、instance更新会term的值都会递增100
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
//跟新本地元信息meta.properties文件内容
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
} finally {
OPERATE_LOCK.unlock();
}
}
TimeUnit.MILLISECONDS.sleep(200);
return 0;
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
//选出dead失效的datum信息,并且从内存和磁盘删除掉
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
return local;
}
总结下上面心跳信息处理
- leader节点将自己节点信息以及自己本地的所有的datum打包发送到follower节点
- follower节点收到信息后马上重置leaderDue时间,避免follower节点发现leader挂了发生下一次选举
- 另外的就是会用leader的datum和本地的datum进行比对,删除失效的datum并且并且更新datum信息,更新的时候回对于term作100判断,添加100;
注:datum就是注册中心的服务service和实例instance信息的包装
2.数据处理
对于事务操作,请求会转发给leader; 非事务操作,可以任意一个节点来处理 在发布内容的时候,做了两个事情
如果当前的节点不是leader,则转发给leader节点处理
如果是,则向所有节点发送onPublish,分析服务注册会使用到
RaftCore
public void signalPublish(String key, Record value) throws Exception {
if (!isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
//leader代理操作
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
try {
OPERATE_LOCK.lock();
long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
//这里会更新本地datum和文件缓存;另外term值会向上递增100
onPublish(datum, peers.local());
final String content = JSON.toJSONString(json);
//过半策略
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
final String url = buildURL(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
if (!peers.isLeader(source.ip)) {
Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",
JSON.toJSONString(source), JSON.toJSONString(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +
"data but wasn't leader");
}
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
JSON.toJSONString(source), JSON.toJSONString(local));
throw new IllegalStateException("out of date publish, pub-term:"
+ source.term.get() + ", cur-term: " + local.term.get());
}
local.resetLeaderDue();
// if data should be persistent, usually this is always true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
//更新内存中的值
datums.put(datum.key, datum);
//term值递增100
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}