1.zookeeper基本应用
1.1 ZooKeeper系统模型
ZooKeeper数据模型Znode
如图,在ZooKeeper中,数据信息被保存在⼀个个数据节点上,这些节点被称为znode。ZNode 是Zookeeper 中最⼩数据单位,在 ZNode 下⾯⼜可以再挂 ZNode,这样⼀层层下去就形成了⼀个层次化命名空间 ZNode 树,我们称为ZNode Tree。
ZNode Tree它采⽤了类似⽂件系统的层级树状结构进⾏管理。在 Zookeeper 中,每⼀个数据节点都是⼀个 ZNode,下图根⽬录下有两个节点,分别是:app1 和app2,其中 app1 下⾯⼜有三个⼦节点,所有ZNode按层次化进⾏组织,形成这么⼀颗树,ZNode的节点路径标识⽅式和Unix⽂件系统路径⾮常相似,都是由⼀系列使⽤斜杠(/)进⾏分割的路径表示,开发⼈员可以向这个节点写⼊数据,也可以在这个节点下⾯创建⼦节点。
ZNode 类型
Zookeeper 节点类型可以分为三⼤类:
持久性节点(Persistent)
临时性节点(Ephemeral)
顺序性节点(Sequential)
而在开发中在创建节点的时候通过组合可以⽣成以下四种节点类型
(1)持久节点:是Zookeeper中最常⻅的⼀种节点类型,所谓持久节点,就是指节点被创建后会⼀直存在服务器,直到删除操作主动清除
(2)持久顺序节点:就是有顺序的持久节点,节点特性和持久节点是⼀样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后⾯加上⼀个数字后缀,来表示其顺序。
(3)临时节点:就是会被⾃动清理掉的节点,它的⽣命周期和客户端会话绑在⼀起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建⼦节点
(4)临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后⾯加上数字后缀。
zookeeper事务ID
ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也称之为事务操作或更新操
作,⼀般包括数据节点创建与删除、数据节点内容更新等操作。对于每⼀个事务请求,ZooKeeper都会为其分配⼀个全局唯⼀的事务ID,⽤ZXID 来表示,通常是⼀个 64 位的数字。每⼀个 ZXID 对应⼀次更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序
ZNode 的状态信息
ZNode 节点内容包括两部分:节点数据内容和节点状态信息。图中quota 是数据内容,其他的属
于状态信息
cZxid 就是 Create ZXID,表示节点被创建时的事务ID。
ctime 就是 Create Time,表示节点创建时间。
mZxid 就是 Modified ZXID,表示节点最后⼀次被修改时的事务ID。
mtime 就是 Modified Time,表示节点最后⼀次被修改的时间。
pZxid 表示该节点的⼦节点列表最后⼀次被修改时的事务 ID。只有⼦节点列表变更才会更新 pZxid,⼦节点内容变更不会更新。
cversion 表示⼦节点的版本号。
dataVersion 表示内容版本号。
aclVersion 标识acl版本
ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0
dataLength 表示数据⻓度。
numChildren 表示直系⼦节点数。
Watcher模型--数据变更通知
Zookeeper使⽤Watcher机制实现分布式数据的发布/订阅功能
ZooKeeper 允许客户端向服务端注册⼀个 Watcher 监听,当服务端的⼀些指定事件触发了这个 Watcher,那么就会向指定客户端发送⼀个事件通知来实现分布式的通知功能。
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。
具体⼯作流程为:客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的
WatcherManager当中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执⾏回调逻辑。
ACL--保障数据的安全
Zookeeper作为⼀个分布式协调框架,其内部存储了分布式系统运⾏时状态的元数据,这些元数据会直接影响基于Zookeeper进⾏构造的分布式系统的运⾏状态,因此,如何保障系统中数据的安全,从⽽避免因误操作所带来的数据随意变更⽽导致的数据库异常⼗分重要,在Zookeeper中,提供了⼀套完善的
ACL(Access Control List)权限控制机制来保障数据的安全。
我们可以从三个⽅⾯来理解ACL机制:权限模式(Scheme)、授权对象(ID)、权限
(Permission),通常使⽤"scheme: id : permission"来标识⼀个有效的ACL信息。
权限模式:Scheme
权限模式⽤来确定权限验证过程中使⽤的检验策略,有如下四种模式:
(1). IP
IP模式就是通过IP地址粒度来进⾏权限控制,如"ip:192.168.0.110"表示权限控制针对该IP地址,
同时IP模式可以⽀持按照⽹段⽅式进⾏配置,如"ip:192.168.0.1/24"表示针对192.168.0.*这个⽹段
进⾏权限控制。
(2). Digest
Digest是最常⽤的权限控制模式,要更符合我们对权限控制的认识,其使
⽤"username:password"形式的权限标识来进⾏权限配置,便于区分不同应⽤来进⾏权限控制。
当我们通过“username:password”形式配置了权限标识后,Zookeeper会先后对其进⾏SHA-1加密
和BASE64编码。
(3). World
World是⼀种最开放的权限控制模式,这种权限控制⽅式⼏乎没有任何作⽤,数据节点的访问权限
对所有⽤户开放,即所有⽤户都可以在不进⾏任何权限校验的情况下操作ZooKeeper上的数据。
另外,World模式也可以看作是⼀种特殊的Digest模式,它只有⼀个权限标识,即“world:
anyone”
- Super
Super模式,顾名思义就是超级⽤户的意思,也是⼀种特殊的Digest模式。在Super模式下,超级
⽤户可以对任意ZooKeeper上的数据节点进⾏任何操作。
授权对象:ID
授权对象指的是权限赋予的⽤户或⼀个指定实体,例如 IP 地址或是机器等。在不同的权限模式下,授权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系。
权限Permission
在ZooKeeper中,所有对数据的操作权限分为
以下五⼤类:
CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建⼦节
DELETE(D):⼦节点的删除权限,允许授权对象删除该数据节点的⼦节点。
READ(R):数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或⼦节点列表等。
WRITE(W):数据节点的更新权限,允许授权对象对该数据节点进⾏更新操作。
ADMIN(A):数据节点的管理权限,允许授权对象对该数据节点进⾏ ACL 相关的设置操作。
1.2.zookeeper的命令行操作
首先启动一台zookeeper,进入zookeeper的bin目录后,启动客户端
./zkcli.sh 连接本地的zookeeper服务器 ./zkCli.sh -server ip:port 连接指定的服务器
创建节点
(1)创建顺序节点
使⽤ create -s /zk-test 123 命令创建zk-test顺序节点,执⾏完后,就在根节点下创建了⼀个叫做/zk-test的节点,该节点内容就是123,同时可以看到创建的zk-test节点后⾯添加了⼀串数字以示区别
(2)创建临时节点
使⽤ create -e /zk-temp 123 命令创建zk-temp临时节点,临时节点在客户端会话结束后,就会⾃动删除
(3)使⽤ create /zk-permanent 123 命令创建zk-permanent永久节点,到永久节点不同于顺序节点,不会⾃动在后⾯添加⼀串数字
查看与读取节点
ls path: path表示的是指定数据节点的节点路径;命令可以列出Zookeeper指定节点下的所有⼦节点,但只能查看指定节点下的第⼀级的所有⼦节点;
get path:get命令可以获取Zookeeper指定节点的数据内容和属性信息
更新节点
set path data [version]:set命令,可以更新指定节点的数据内容,data就是要更新的新内容,version表示数据版本,如将/zk-permanent节点的数据更新为456,可以使⽤如下命令:set /zk-permanent 456,更新后dataversion会发生改变
删除节点
delete path [version]:例如delete /zk-permanent 删除创建的节点,注意,若删除节点存在⼦节点,那么⽆法删除该节点,必须先删除⼦节点,再删除⽗节点
1.3 Zookeeper的api使⽤
准备⼯作:导⼊依赖
建⽴会话
如下代码,客户端可以通过创建一个zk实例来连接zk服务,但是,客户端和服务端会话的建⽴是⼀个异步的过程,也就是说在程序中,构造⽅法会在处理完客户端初始化⼯作后⽴即返回,在⼤多数情况下,此时并没有真正建⽴好⼀个可⽤的会话,在会话的⽣命周期中处于“CONNECTING”的状态。 当该会话真正创建完毕后ZooKeeper服务端会向会话对应的客户端发送⼀个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建⽴了会话。
package com.lagou.api;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateSession implements Watcher {
private static CountDownLatch countDownLatch;
/*
客户端可以通过创建⼀个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端⼝
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
public static void main(String[] args) throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper("192.168.0.104:2181", 5000, new
CreateSession());
System.out.println(zooKeeper.getState());
//计数工具,不让main方法结束,让线程处于等待阻塞
countDownLatch.await();
//表示会话真正建⽴
System.out.println("=========Client Connected to zookeeper==========");
}
/*
回调方法,处理来自服务端的watcher通知
*/
/* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的
watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上
的等待阻塞,⾄此,会话创建完
*/
public void process(WatchedEvent watchedEvent) {
//如果连接创建了,服务端会给客户端发SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除等待阻塞
countDownLatch.countDown();
}
}
}
创建节点
关键方法String node = zookeeper.create(path,data,acl,createMode);
package com.lagou.api;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
public class CreateNode implements Watcher {
private static CountDownLatch countDownLatch;
private static ZooKeeper zooKeeper;
/*
客户端可以通过创建⼀个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端⼝
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
public static void main(String[] args) throws IOException, InterruptedException {
zooKeeper = new ZooKeeper("192.168.0.104:2181", 5000, new
CreateNode());
System.out.println(zooKeeper.getState());
//计数工具,不让main方法结束,让线程处于等待阻塞
countDownLatch.await();
//表示会话真正建⽴
System.out.println("=========Client Connected to zookeeper==========");
}
/*
回调方法,处理来自服务端的watcher通知
*/
/* 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的
watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上
的等待阻塞,⾄此,会话创建完
*/
public void process(WatchedEvent watchedEvent) {
//如果连接创建了,服务端会给客户端发SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除等待阻塞
countDownLatch.countDown();
}
//调⽤创建节点⽅法
try {
createNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void createNodeSync() throws UnsupportedEncodingException, KeeperException, InterruptedException {
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何⼈
* AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替
换。
* OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)-->
world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/
String node_PERSISTENT =
zooKeeper.create("/lg_persistent", "持久节点内 容".getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String node_PERSISTENT_SEQUENTIAL =
zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes("utf-8"),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
String node_EPERSISTENT = zooKeeper.create("/lg_ephemeral", "临时节点内 容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建的持久节点是:"+node_PERSISTENT);
System.out.println("创建的持久顺序节点是:"+node_PERSISTENT_SEQUENTIAL);
System.out.println("创建的临时节点是:"+node_EPERSISTENT);
}
}
获取节点数据
关键api zk.getData(path, watch, stat);
- path : 获取数据的路径
- watch : 是否开启监听 true/false
- stat : 节点状态信息,值为null: 表示获取最新版本的数据
- zk.getData(path, watch, stat);
zooKeeper.getChildren(path, watch);获取某一个节点的子节点列表
path:路径
watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
如下代码例子
package com.lagou.api;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class GetNodeData implements Watcher {
//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, new GetNodeData());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//⼦节点列表发⽣变化时,服务器会发出NodeChildrenChanged通知,但不会把变化情况告诉给客户端
// 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听
if(watchedEvent.getType() ==Event.EventType.NodeChildrenChanged){
//再次获取节点数据
try {
List<String> children =
zooKeeper.getChildren(watchedEvent.getPath(), true);
System.out.println(children);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//当连接创建了,服务端发送给客户端SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
try {
//调⽤获取单个节点数据⽅法
getNoteData();
getChildrens();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void getNoteData() throws Exception {
/**
* path : 获取数据的路径
* watch : 是否开启监听
* stat : 节点状态信息
* null: 表示获取最新版本的数据
* zk.getData(path, watch, stat);
*/
byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true,
null);
System.out.println(new String(data,"utf-8"));
}
private static void getChildrens() throws KeeperException, InterruptedException {
/*
path:路径
watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
zooKeeper.getChildren(path, watch);
*/
List<String> children = zooKeeper.getChildren("/lg_persistent", true);
System.out.println(children);
}
}
修改节点数据
zooKeeper.setData(path, data,version);
path:路径
data:要修改的内容 byte[]
version:为-1,表示对最新版本的数据进⾏修改
private void updateNodeSync() throws Exception {
/*
path:路径
data:要修改的内容 byte[]
version:为-1,表示对最新版本的数据进⾏修改
zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改前的值:"+new String(data));
//修改 stat:状态信息对象 -1:最新版本
Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内
容".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改后的值:"+new String(data2));
}
删除节点
zooKeeper.exists(path,watch) :判断节点是否存在
zookeeper.delete(path,version) : 删除节点