为什么需要leader选举
在多节点的集群中,如分布式缓存、分布式队列等分布式应用,为了保证一致性,一般采用一主多从的的架构,leader负责写请求的处理并将写操作同步到从节点上去写,follow负责读请求的处理,follow接收到写请求也会转发给leader,交由leader来处理。
所以leader的选举和leader崩溃重选是集群服务的核心。zookeeper本身也是这种架构,它采用的是paxos算法的变种zab协议,达到CAP理论的CA两个要求,是一个可靠的分布式应用协调者。
此处不讨论zk自身是如何实现leader选举和集群管理的,它作为一个有用的协调者,如何帮助我们进行我们的application的leader选举呢。
zk提供的leader选举思路
zk有一类node支持序列号,我们可以通过序列号的大小来选择最小序列号的作为leader把ip等信息写入父节点的data中,供外界访问和寻找leader。
另外,如果leader挂了,借助临时node的特性,这个临时node就会被移除,应用可以通过watch事件,重新选择最小的作为leader。
数据的结构大致为:
/parent/
-- node-1
-- node-2
-- node-3
watch同样有两种策略,
- watch 父节点
- watch 比自己的序列号小1的zNode,避免惊群效应
leader election
public class Election extends Connect implements Watcher {
private final static Logger LOGGER = LoggerFactory.getLogger(Election.class);
private final static String nodePrefix = "/node-";
/*有序节点名*/
private String path;
private String root;
/*启动时进行选举,确认leader*/
public Election(String address, String root, String name) {
super(address);
try {
this.root = formatRoot(root);
if (zooKeeper.exists(this.root, false) == null) {
zooKeeper.create(this.root, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 监控父节点
zooKeeper.getChildren(this.root, this);
path = zooKeeper.create(this.root + nodePrefix, name.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.info("this node path is :{}", path);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/*格式化根节点的name*/
private String formatRoot(String root) {
return root.startsWith("/") ? root : "/" + root;
}
/* 父节点变化时,重新进行leader选举*/
@Override
public void process(WatchedEvent event) {
try {
List<String> children = zooKeeper.getChildren(root, this);
long miniestNum = findMinima(children);
String path = root.concat(nodePrefix).concat(StringUtils.leftPad(String.valueOf(miniestNum), 10, "0"));
Stat stat = zooKeeper.exists(path, false);
byte[] data = zooKeeper.getData(path, false, stat);
Stat statRoot = zooKeeper.exists(root, false);
zooKeeper.setData(root, data, statRoot.getVersion());
System.out.println(new String(zooKeeper.getData(root, false, statRoot)));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/*找到最小的序列号*/
private Long findMinima(List<String> children) {
List<Long> seqs = new ArrayList<>();
for (String child : children) {
seqs.add(getNumber(child));
}
Collections.sort(seqs);
return seqs.get(0);
}
/*取nodeName中的数值*/
private long getNumber(String path) {
String seq = path.substring(root.length() + nodePrefix.length());
return Long.parseLong(seq);
}
}
测试
public static void main(String[] args) {
Election election = new Election(StaticUtils.ZK_ADDRESS, "myapp", "d");
// 模拟应用服务中
LockSupport.park();
}