zookeeper通过使用watcher可以实现发布订阅的功能,实际上就是基于监听的事件触发。
示例
以下是在zk上创建一个Node存储app的配置信息,然后监听配置变化来做出相应的动作。
模拟配置信息类
public class SampleConf {
private String url;
private int port;
private String name;
private String password;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("{");
sb.append("\"url\":\"")
.append(url).append('\"');
sb.append(",\"port\":")
.append(port);
sb.append(",\"name\":\"")
.append(name).append('\"');
sb.append(",\"password\":\"")
.append(password).append('\"');
sb.append('}');
return sb.toString();
}
}
简单封装的zk工具类
import com.alibaba.fastjson.JSON;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class SimpleZKUtils {
private SimpleZKUtils() {
}
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleZKUtils.class);
private static final String hostPort = "localhost:2181";
private static ZooKeeper zk;
static {
try {
zk = new ZooKeeper(hostPort, 3000, event -> System.out.println(JSON.toJSONString(event)));
} catch (IOException e) {
e.printStackTrace();
}
}
public static Stat set(String path, String data) {
try {
Stat stat = zk.exists(path, false);
if (null != stat) {
return zk.setData(path, data.getBytes(), stat.getVersion());
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return null;
}
public static String get(String path) {
try {
Stat stat = zk.exists(path, false);
if (null != stat) {
return new String(zk.getData(path, true, stat));
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return null;
}
public static String get(String path, Watcher watcher) {
try {
Stat stat = zk.exists(path, false);
if (null != stat) {
return new String(zk.getData(path, watcher, stat));
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return null;
}
public static void create(String path, String data) {
try {
Stat stat = zk.exists(path, false);
if (null == stat) {
zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
throw new RuntimeException("node is already existed");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
public static void del(String path) {
try {
Stat stat = zk.exists(path, false);
if (null != stat) {
zk.delete(path, stat.getVersion());
} else {
throw new RuntimeException("node is already existed");
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
测试
- 创建ZNode
public static final String basePath = "/app1";
public static final String path = "/app1/conf";
private static SampleConf sampleConf = null;
@Test
public void testCreate() {
SimpleZKUtils.create(basePath,"");
SimpleZKUtils.create(path, "");
}
@Test
public void configure() {
SampleConf sampleConf = new SampleConf();
sampleConf.setName("jk");
sampleConf.setUrl("localhost");
sampleConf.setPort(2181);
sampleConf.setPassword("helloworld");
SimpleZKUtils.set(path, JSON.toJSONString(sampleConf));
System.out.println(SimpleZKUtils.get(path));
}
- 监听
@Test
public void testPubSub() {
String conf = SimpleZKUtils.get(path, new ConfigWatcher((watcher) -> {
System.out.println("watcher execute");
sampleConf = JSON.parseObject(SimpleZKUtils.get(path, watcher), SampleConf.class);
System.out.println(JSON.toJSONString(sampleConf));
}));
System.out.println(conf);
// 阻塞线程以查看监听触发的动作
LockSupport.park();
}
// Watcher实现类
static class ConfigWatcher implements Watcher {
private Consumer<Watcher> myWatch;
ConfigWatcher(Consumer<Watcher> myWatch) {
this.myWatch = myWatch;
}
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDataChanged)) {
// 使用此方式是为了把watcher实例设置到zk的get方法里面去
myWatch.accept(this);
}
}
}
因为zk的watcher是一次性的,所以每次在触发事件时需要设置watcher才能在后续的事件发生时继续响应,此处我套了个Consumer接口来复用最外层的watcher实例,因为在lambda表达式里面没法直接传this。使用匿名内部类可以解决:
@Test
public void testPubSub2() {
String conf = SimpleZKUtils.get(path, new Watcher(){
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("watcher execute");
sampleConf = JSON.parseObject(SimpleZKUtils.get(path, this), SampleConf.class);
System.out.println(JSON.toJSONString(sampleConf));
}
});
System.out.println(conf);
LockSupport.park();
}