复习环境变量
-
export A=1
定义的变量,会对自己所在的shell进程以及子进程生效 -
B=1
定义的变量,只对自己所在的shell进程生效 - 在
script.sh
中定义的变量,在当前登录的shell进程中source script.sh
启动zookeeper集群
在mini1上启动mini2上的zookeeper
ssh mini2 "source /etc/profile;/apps/zookeeper/zookeeper-3.4.5/zkServer.sh start"
启动zookeeper集群中的所有zookeeper(mini1、mini2、mini3)的脚本
startzk.sh
#!/bin/sh
echo "start zkServer..."
for i in 1 2 3
do
ssh mini$i "source /etc/profile;/apps/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start"
done
Zookeeper-API
基本使用
org.apache.zookeeper.Zookeeper
是客户端入口主类,负责建立与server的会话
它提供了表 1 所示几类主要方法 :
Zookeeper的增删改查
package cn.huachao.zookeeper01;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
public class TestZK {
/**
* 如果zookeeper集群的配置文件zoo.conf使用的mini1,则在此必须使用mini1
*/
private static final String connectString = "mini1:2181,mini2:2181,mini3:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper client = null;
@Before
public void init() throws Exception{
client = new ZooKeeper(connectString , sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() +" --> "+ event.getPath());
try {
client.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 创建数据节点到zk中
@Test
public void save() throws Exception{
/*
* 参数1:要创建的节点的路径
* 参数2:节点大数据
* 参数3:节点的权限
* 参数4:节点的类型
*/
//上传的数据可以是任何类型,但都要转成byte[]
String res = client.create("/api", "this is my api".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("create-->"+res);
}
@Test
public void exist() throws Exception{
Stat stat = client.exists("/api", false);
System.out.println(stat==null?"not exist":"exist");
}
// 获取子节点
@Test
public void getChildren() throws Exception {
List<String> children = client.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//获取znode的数据
@Test
public void getData() throws Exception {
byte[] data = client.getData("/api", false, null);
System.out.println(new String(data));
}
//删除znode
@Test
public void deleteZnode() throws Exception {
//参数2:指定要删除的版本,-1表示删除所有版本
client.delete("/api", -1);
}
//删除znode
@Test
public void setData() throws Exception {
client.setData("/app1", "imissyou angelababy".getBytes(), -1);
byte[] data = client.getData("/api", false, null);
System.out.println(new String(data));
}
}
Zookeeper应用
实现分布式应用的(主节点HA)及客户端动态更新主节点状态
某分布式系统中,主节点可以有多台,可以动态上下线
任意一台客户端都能实时感知到主节点服务器的上下线
-
DistributedServer
,服务端,连接zookeeper,注册server节点
package cn.huachao.zkdist;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class DistributedServer {
private static final String connectString = "mini1:2181,mini2:2181,mini3:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";
private ZooKeeper zkClient = null;
/*
* 创建zk客户端
*/
public void getConnection() throws Exception{
zkClient = new ZooKeeper(connectString , sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() +" --> "+ event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 注册服务
* @param hostname
* @throws Exception
*/
public void registerServer(String hostname) throws Exception{
exist();
String res = zkClient.create(parentNode+"/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+" is online..."+res);
}
/**
* 根节点不在的,就创建
* @throws Exception
*/
@Test
public void exist() throws Exception{
Stat stat = zkClient.exists(parentNode, false);
if(stat==null){
zkClient.create(parentNode, "servers".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 业务处理
* @param hostname
* @throws Exception
*/
public void handleBussiness(String hostname) throws Exception{
System.out.println(hostname+" is working ...");
Thread.sleep(Integer.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
String hostname = "mini2";
DistributedServer server = new DistributedServer();
server.getConnection();
server.registerServer(hostname);
server.handleBussiness(hostname);
}
}
-
DistributeClient
,客户端,连接zookeeper,查看服务节点,并监听节点变化
package cn.huachao.zkdist;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DistributeClient {
private static final String connectString = "mini1:2181,mini2:2181,mini3:2181";
private static final int sessionTimeout = 2000;
private volatile List<String> serverList;
private static final String parentNode = "/servers";
private ZooKeeper zkClient = null;
/*
* 创建zk客户端
*/
public void getConnection() throws Exception{
zkClient = new ZooKeeper(connectString , sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getType() +" --> "+ event.getPath());
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 获取服务器信息列表
* @throws Exception
*/
public void getServerList() throws Exception{
List<String> children = zkClient.getChildren(parentNode, true);
List<String> servers = new ArrayList<>();
for(String child : children){
byte[] data = zkClient.getData(parentNode+"/"+child, false, null);
String server = new String(data);
servers.add(server);
System.out.println(server);
}
serverList = servers;
}
/**
* 业务功能
*
* @throws InterruptedException
*/
public void handleBussiness() throws InterruptedException {
System.out.println("client start working.....");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeClient client = new DistributeClient();
client.getConnection();
client.getServerList();
client.handleBussiness();
}
}
分布式共享锁的简单实现
package cn.huachao.zklock;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributedClientLock {
// 会话超时
private static final int SESSION_TIMEOUT = 2000;
// zookeeper集群地址
private String hosts = "mini1:2181,mini2:2181,mini3:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 记录自己创建的子节点路径
private volatile String thisPath;
/**
* 连接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 判断事件类型,此处只处理子节点变化事件
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//获取子节点,并对父节点进行监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比较是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//访问共享资源处理业务,并且在处理完成之后删除锁
doSomething();
//重新注册一把新的锁
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 1、程序一进来就先注册一把锁到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会,便于观察
Thread.sleep(new Random().nextInt(1000));
// 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果争抢资源的程序就只有自己,则可以直接去访问共享资源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 处理业务逻辑,并且在最后释放锁
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
//
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientLock dl = new DistributedClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}