一 客户端种类
zookeeper有三种Java客户端:
- Zookeeper,Zookeeper官方提供的原生Java客户端
- Zkclient,在原生Zookeeper基础上进行扩展的开源第三方Java客户端
- Curator,Netflix公司在原生zookeeper基础上开源的Java客户端
今天我们简单说一下第1种,下一篇详细说一下第3种。
二 Zookeeper官方客户端的使用
- 引入maven,具体版本最好和服务端zookeeper版本一致,因为我们安装的是apache-zookeeper-3.5.10
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.10</version>
</dependency>
- 连接zookeeper
connStr:ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成,每一个都代表一台ZooKeeper机器,如,host1:port1,host2:port2,host3:port3。另外,也可以在connectString中设置客户端连接上ZooKeeper后的根目录,方法是在host:port字符串之后添加上这个根目录,例如,host1:port1,host2:port2,host3:port3/zk-base,这样就指定了该客户端连接上ZooKeeper服务器之后,所有对ZooKeeper的操作,都会基于这个根目录。例如,客户端对/sub-node 的操作,最终创建 /zk-node/sub-node, 这个目录也叫Chroot,即客户端隔离命名空间。
sessionTimeout:会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有
会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳检
测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效
的心跳检测,会话就会失效。
private static final String ZK_ADDRESS = "192.168.59.130:2181";
private static final int SESSION_TIMEOUT = 5000;
public static ZooKeeper getZKclient(String connStr, int sessionTimeout){
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zkClient = new ZooKeeper(connStr, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
if(Event.EventType.None == watchedEvent.getType()){
//如果收到了服务端的响应事件,连接成功
System.out.println("zookeeper服务器连接成功!");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
System.out.println("状态:"+zkClient.getState());
return zkClient;
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
- 创建节点
public static void createNode(ZooKeeper zkClient){
try {
//需要手动创建父节点
zkClient.create(ZNODE_PATH,
"data1".getBytes(),
//zookeeper提供了几个默认的权限列表,在ZooDefs.Ids中
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("节点创建成功!");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 获取节点数据,可同步获取数据,异步获取数据,添加数据监听。
注意:
在做异步监听测试时,需要代码最后添加TimeUnit.SECONDS.sleep(100);线程睡眠防止程序结束,回调不触发。
public static void getData(ZooKeeper zkClient) {
Stat stat = new Stat();
try {
//1. 同步获取节点数据
byte[] data = zkClient.getData(ZNODE_PATH, false, stat);
System.out.println("nodeVal: "+new String(data)+",state:"+stat);
//2. 异步获取节点数据
String ctxParam = "ctxVal";
zkClient.getData(ZNODE_PATH, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc:"+rc+",path:"+path+",ctx:"+ctx+",data:"+new String(data)+",stat:"+stat);
}
},ctxParam);
//3. 同步获取数据添加监听
zkClient.getData(ZNODE_PATH,new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("触发同步监听:");
System.out.println("receive node:"+watchedEvent.getPath());
System.out.println("watchedEvent:"+watchedEvent.toString());
}
}, stat);
//修改节点数据,触发监听
zkClient.setData(ZNODE_PATH,"data2".getBytes(),stat.getVersion());
//4. 异步获取数据添加监听
zkClient.getData(ZNODE_PATH, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("触发异步监听:");
System.out.println("receive node:"+watchedEvent.getPath());
System.out.println("watchedEvent:"+watchedEvent.toString());
}
}, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc:"+rc+",path:"+path+",ctx:"+ctx+",data:"+new String(data)+",stat:"+stat);
}
},ctxParam);
Stat stat2 = new Stat();
zkClient.getData(ZNODE_PATH,false, stat2);
zkClient.setData(ZNODE_PATH,"data2".getBytes(),stat2.getVersion());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 删除节点
public static void deleteData(ZooKeeper zkClient) {
try {
String nodePath = ZNODE_PATH+"/node1";
//创建节点,用于测试删除
zkClient.create(nodePath,
"data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
List<String> children = zkClient.getChildren(ZNODE_PATH, false);
System.out.println("删除前children:"+children.toString());
Stat stat = new Stat();
byte[] data = zkClient.getData(nodePath, false, stat);
System.out.println("nodeVale:"+new String(data));
zkClient.delete(nodePath,stat.getVersion());
children = zkClient.getChildren(ZNODE_PATH, false);
System.out.println("删除后children:"+children.toString());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 查询子节点,并添加子目录监听
public static void getChildren(ZooKeeper zkClient){
try{
List<String> children = zkClient.getChildren(ZNODE_PATH, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("触发子节点列表变化通知:");
System.out.println("receive nodeChange:" + watchedEvent.getPath());
System.out.println("watchedEvent:" + watchedEvent);
}
});
String nodePath = ZNODE_PATH+"/node1";
//创建节点
zkClient.create(nodePath,
"data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}catch (Exception e){
e.printStackTrace();
}
}