原谅链接: https://mp.weixin.qq.com/s/PgDXSFGnA7kZNX7Qpxg04A
之前写了一篇通过zkCli操作zookeeper的文章,这一篇是通过Java操作zookeeper的文章,代码在这: https://github.com/liangyt/ZookeeperTest/tree/master/base
因为使用的是 zookeeper-3.5.5 版本,所以使用对应的版本。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
有一些回调的定义类比较简单,这里不就列表出来了,可以进代码直接翻看就行了。
一、连接服务
要操作zookeeper第一件事就是连接上zookeeper服务器:
在类 {common.ZkConnect}
/**
* 创建zk连接
* @return
* @throws IOException
*/
public static ZooKeeper instance() throws IOException {
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 连接的服务地址
5000, // 会话超时时间, 在超时时间内会进行心跳检测;如果超过这个时间没有心跳检测,则服务端认为这个会话超时了
new DefaultWatcher() // 默认的会话监听器, 如果设置为 null 则表示没有默认的监听器了
);
return zk;
}
/**
* 创建zk连接
* @return
* @throws IOException
*/
public static ZooKeeper instance(CountDownLatch latch) throws IOException {
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 连接的服务地址
5000, // 会话超时时间, 在超时时间内会进行心跳检测;如果超过这个时间没有心跳检测,则服务端认为这个会话超时了
new DefaultWatcher(latch) // 默认的会话监听器, 如果设置为 null 则表示没有默认的监听器了
);
return zk;
}
/**
* 创建zk连接
* @param sessionId 会话id zk.getSessionId()
* @param sessionpwd 会话密码 zk.getSessionPasswd()
* @return
* @throws IOException
*/
public static ZooKeeper instance(long sessionId, byte[] sessionpwd) throws IOException {
ZooKeeper zk = new ZooKeeper(
"127.0.0.1:2181",
5000,
new DefaultWatcher(),
sessionId,
sessionpwd
);
return zk;
}
/**
* 创建zk连接
* @param sessionId 会话id zk.getSessionId()
* @param sessionpwd 会话密码 zk.getSessionPasswd()
* @Param latch 同步对象
* @return
* @throws IOException
*/
public static ZooKeeper instance(long sessionId, byte[] sessionpwd, CountDownLatch latch) throws IOException {
ZooKeeper zk = new ZooKeeper(
"127.0.0.1:2181",
5000,
new DefaultWatcher(latch),
sessionId,
sessionpwd
);
return zk;
}
创建了几个连接zookeeper服务器的简易方法,设置了默认的服务器地址[127.0.0.1:2181],这个地址可以是单台服务,也可以是集群服务,如果是群集的话则格式为 [ip1:port1,ip2:port2 ...]。
二、创建节点
节点的创建方法有几个:
这6个方法分同步和异步,不带 callback 的是同步方法。
具体使用方法看这个类:{base1.CreateTest}
private static void normalCreate(ZooKeeper zk) throws KeeperException, InterruptedException {
zk.create(
"/zk-java-01", // 节点路径
"你们好".getBytes(), // 节点内容
/**
* 节点权限 ZooDefs.Ids.OPEN_ACL_UNSAFE -> 是 world anyone 所有的权限
* 可以自已定义一个权限列表: ArrayList<ACL>
* new ACL("per", "ID") 构成一个权限对象 可以设置多个
* per 表示操作权限:READ = 1; WRITE = 2; CREATE = 4; DELETE = 8; ADMIN = 16; ALL = 31;
* ID 表示授权对象 new ID("scheme", "id") 如 new ID("world", "anyone")
*/
ZooDefs.Ids.OPEN_ACL_UNSAFE,
/**
* 节点类型: 使用 zkCli 客户端创建节点一般是只有前四种
* PERSISTENT 持久型
* PERSISTENT_SEQUENTIAL 持久有序型
* EPHEMERAL 临时型
* EPHEMERAL_SEQUENTIAL 临时有序型
* CONTAINER 容器节点,用于Leader、Lock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点
* PERSISTENT_WITH_TTL 有TTL[存活时间]的永久节点,节点在TTL时间之内没有得到更新并且无子节点,就会被自动删除 需要配合另外一个参数一起
* PERSISTENT_SEQUENTIAL_WITH_TTL 有TTL[存活时间]和有序的永久节点,节点在TTL时间之内没有得到更新并且无子节点,就会被自动删除 需要配合另外一个参数一起
*/
CreateMode.PERSISTENT // 永久节点
);
}
public static void statCreate(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
// 用于保存节点创建完成后的状态信息
Stat stat = new Stat();
String p = zk.create(
path,
"带自定义状态".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
/**
* 节点状态:
* czxid;
* mzxid;
* ctime;
* mtime;
* version;
* cversion;
* aversion;
* ephemeralOwner;
* dataLength;
* numChildren;
* pzxid;
*/
stat
);
System.out.println("path ->" + p);
System.out.println("stat -> " + stat);
}
private static void asyncCreate(ZooKeeper zk) throws InterruptedException {
zk.create(
"/zk-java-async-01",
"带自定义状态".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
new BaseStringCallback(), // 节点创建完成的回调
"我是异步创建节点"
);
}
三、获取节点数据
获取节点数据有四个方法,使用主要看类:{base1.GetTest}
public static String getData(ZooKeeper zk, Stat stat) throws KeeperException, InterruptedException {
byte[] data = zk.getData(
"/zk-java-stat-01", // 节点路径
true, // 是否使用默认监听器
stat // 用于存放服务器返回的 stat
);
System.out.println("data -> " + new String(data));
System.out.println("aversion -> " + stat.getAversion());
System.out.println("ctime -> " + stat.getCtime());
System.out.println("cversion -> " + stat.getCversion());
System.out.println("dataLength -> " + stat.getDataLength());
System.out.println("version -> " + stat.getVersion());
return new String(data);
}
public static String getDataNewWatcher(ZooKeeper zk) throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] data = zk.getData(
"/zk-java-stat-01",
new CustomWatcher(), // 注册节点内容变更监听
stat
);
System.out.println("data -> " + new String(data));
return new String(data);
}
private static void asyncGetData(ZooKeeper zk) {
zk.getData(
"/zk-java-stat-01",
false, // 不使用监听器
new DataCallback(), // 异步获取数据的回调
"异步获取数据"
);
zk.getData(
"/zk-java-stat-01",
new CustomWatcher(), // 使用自定义数据变更监听器 节点内容变更监听
new DataCallback(), // 异步获取数据的回调
"异步获取监听数据"
);
}
四、更新节点数据
更新节点数据方法只有两个了,使用方法看类:{base1.UpdateTest}
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = ZkConnect.instance(latch);
latch.await();
Stat gStat = new Stat();
String oldData = GetTest.getData(zk, gStat);
System.out.println("oldData -> " + oldData);
System.out.println("oldVersion -> " + gStat.getVersion());
Stat stat = zk.setData(
"/zk-java-stat-01", // 节点路径
"我是新数据".getBytes(), // 新数据
gStat.getVersion() // 如果版本号跟服务器上保存的不一样, 则此时出现异常 org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /zk-java-stat-01
);
// 此时版本号已更新
System.out.println("newVersion -> " + stat.getVersion());
CountDownLatch latch1 = new CountDownLatch(1);
zk.setData(
"/zk-java-stat-01", // 节点路径
"我是异步更新新数据".getBytes(), // 新数据
stat.getVersion(),
new StatCallback(latch1),
"异步更新数据"
);
latch1.await();
}
五、删除节点
两个方法,一个同步删除,一个异步删除。代码在类:{base1.DeleteTest}
private static void baseDelete(ZooKeeper zk) throws KeeperException, InterruptedException {
zk.delete(
"/zk-java-01", // 需要删除的节点全路径
0 // 删除节点的版本号 如果版本号对不上的话则删除失败
);
}
private static void asyncDelete(ZooKeeper zk) {
zk.delete(
"/zk-java-01",
0,
new BaseVoidCallback(), // 删除回调
"删除基本节点"
);
}
六、获取子节点列表
获取子节点列表的方法比较多,这里试了一个同步一个异步的,类:{base1.GetChildrenTest}
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = ZkConnect.instance(latch);
latch.await();
// 直接获取节点的子节点
List<String> children = getChildren(zk, "/zk-java-stat-01");
System.out.println("children -> " + children);
// 添加一个子节点
// 重复添加子节点出现节点重复异常
// org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zk-java-stat-01/child
CreateTest.statCreate(zk, "/zk-java-stat-01/child01");
// 再次获取子节点
children = getChildren(zk, "/zk-java-stat-01");
System.out.println("children -> " + children);
// 异常获取子节点列表 并添加一个自定义子节点列表变更监听
latch = new CountDownLatch(1);
zk.getChildren(
"/zk-java-stat-01",
new CustomWatcher(),
new ChildrenCallback(latch),
"异步获取子节点列表"
);
latch.await();
}
private static List<String> getChildren(ZooKeeper zk, String pPath) throws KeeperException, InterruptedException {
// 返回的子节点列表路径都是相对于父节点的路径,而不是全路径
return zk.getChildren(
pPath, // 需要获取子节点列表的节点路径
false // 不添加监听
);
}
七、权限
设置权限有两个方法,也比较简单;类:{base1.AclTest}
public static void main(String[] args) throws Exception {
String node = "/zk-java-acl";
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = ZkConnect.instance(latch);
latch.await();
// 设置超级权限
/**
* 设置超级账户连接, super:id 这是 digest:id 的一种特殊方式
* 启用超级账户需要启动zookeeper服务的时候配置对应的参数(这种提供两种方式):
* 1. 启动的时候直接添加 -Dzookeeper.DigestAuthenticationProvider.superDigest=zookeeper:qW/HnTfCSoQpB5G8LgkwT3IbiFc=
* 2. 在配置文件 zoo.cfg 里面配置 DigestAuthenticationProvider.superDigest=zookeeper:qW/HnTfCSoQpB5G8LgkwT3IbiFc=
* 这两种方式都可以配置超级账户,账户可以自定义,结果: BASE64(SHA-1)
* 可以命令行生成也可以java代码运行生成:
* java: DigestAuthenticationProvider.generateDigest(name:password)
*
*/
zk.addAuthInfo("digest", "zookeeper:admin".getBytes());
// 先创建一个永久节点, 权限为 ZooDefs.Ids.OPEN_ACL_UNSAFE
// 如果报节点已存在异常 则把这一行注释掉
// org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zk-java-acl
CreateTest.statCreate(zk, node);
// 获取节点的权限
Stat stat = new Stat();
List<ACL> acls = zk.getACL(
node, // 需要获取权限信息的节点
stat // 回设节点状态
);
//
// // [31,s{'world,'anyone}] 这是默认创建的节点权限
System.out.println("acls ->" + acls);
System.out.println("aclVersion -> " + stat.getAversion());
// 异步获取节点权限
zk.getACL(node, stat, new AsyncCallback.ACLCallback() {
@Override
public void processResult(int resultCode, String path, Object ctx, List<ACL> list, Stat stat) {
System.out.println("async acls ->" + list);
}
}, "异步获取权限");
//
String auth = "name:password";
// 对节点设置权限
acl(node, zk, stat, auth);
// 另起一个无权限会话 读取该节点的数据看看
// noAuthGetData(node);
// 另起一个会话并设置权限 读取该节点数据
authGetData(node, auth);
// 删除节点
zk.getData(node, null, stat);
zk.delete(node, stat.getVersion());
}
// 给节点设置权限
private static void acl(String node, ZooKeeper zk, Stat stat, String auth) throws NoSuchAlgorithmException, KeeperException, InterruptedException {
// 创建授权对象
Id id = new Id("digest", DigestUtil.digest(auth));
// 可以定义一个 ip 模式的授权对象
// Id ipId = new Id("ip", "192.168.3.17");
// 定义操作权限
ACL aclRead = new ACL(ZooDefs.Perms.READ, id); // 读取权限
ACL aclCrd = new ACL(ZooDefs.Perms.CREATE, id); // 创建权限
ACL aclDel = new ACL(ZooDefs.Perms.DELETE, id); // 删除权限
ACL aclUpd = new ACL(ZooDefs.Perms.WRITE, id); // 更新权限
ACL aclAdm = new ACL(ZooDefs.Perms.ADMIN, id); // 权限管理权限
// 对节点 /zk-java-acl 设置权限
List<ACL> acls = new ArrayList<>();
acls.add(aclRead);
acls.add(aclUpd);
// 返回状态 权限版本号已改变了
Stat aclStat = zk.setACL(node, acls, stat.getAversion());
System.out.println("aclStatVersion -> " + aclStat.getAversion());
}
private static void authGetData(String node, String auth) throws IOException, InterruptedException, KeeperException {
CountDownLatch latch1 = new CountDownLatch(1);
ZooKeeper authZk = ZkConnect.instance(latch1);
latch1.await();
// 设置会话权限
authZk.addAuthInfo("digest", auth.getBytes());
Stat stat = new Stat();
byte[] dataAuth = authZk.getData(node, false, stat);
System.out.println("dataAuth -> " + new String(dataAuth));
// 没有更新权限,看看能否更新
authZk.setData(node, "试试能否更新成功".getBytes(), stat.getVersion());
}
// 测试没有读取权限的会话去读取数据是什么样的
private static void noAuthGetData(String node) throws IOException, InterruptedException, KeeperException {
CountDownLatch latch1 = new CountDownLatch(1);
ZooKeeper noAuthZk = ZkConnect.instance(latch1);
latch1.await();
// 出现异常
// org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-java-acl
byte[] dataNoAuth = noAuthZk.getData(node, false, null);
System.out.println("dataNoAuth -> " + new String(dataNoAuth));
}