本文基于实际生产环境中的Thrift+zookeeper实现的rpc调用总结,大致有以下几个部分:
1: 服务端将服务注册在zk中
1.1 解析服务端的网卡IP;
1.2 获取zookeeper客户端对象;
1.3 实现服务接口的注册;
2: 基于zookeeper实现服务接口的自动发现
3: 实现客户端连接池和客户端通过代理调用服务
一 服务端将服务注册在zk中
代码展示
1.1解析thrift-server端IP地址,用于注册服务
接口
public interface ThriftServerIpResolve {
// 获取服务所在机器的Ip
String getServerIp() throws Exception;
String getServerIp(boolean publicIpOnly) throws Exception;
void reset();
//当IP变更时,将会调用reset方法
static interface IpRestCalllBack{
public void rest(String newIp);
}
}
实现
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
private Logger logger = LoggerFactory.getLogger(getClass());
//缓存
private String serverIp;
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
@Override
public String getServerIp() {
return getServerIp(false);
}
@Override
public String getServerIp(boolean publicIpOnly) {
if (serverIp != null) {
return serverIp;
}
// 一个主机有多个网络接口
try {
Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
while (netInterfaces.hasMoreElements()) {
NetworkInterface netInterface = netInterfaces.nextElement();
// 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if(address instanceof Inet6Address){
continue;
}
if (!address.isLoopbackAddress()) {
if (publicIpOnly && !address.isSiteLocalAddress()) {
serverIp = address.getHostAddress();
logger.info("resolve server ip :" + serverIp);
continue;
} else if (!publicIpOnly && address.isSiteLocalAddress()) {
serverIp = address.getHostAddress();
logger.info("resolve server ip :" + serverIp);
continue;
}
}
}
}
} catch (SocketException e) {
e.printStackTrace();
}
return serverIp;
}
@Override
public void reset() {
serverIp = null;
}
}
1.2 获取zookeeper客户端链接对象
public class ZookeeperFactory implements FactoryBean<CuratorFramework> ,Closeable{
private String zkHosts;
// session超时
private int sessionTimeout = 30000;
private int connectionTimeout = 30000;
// 共享一个zk链接
private boolean singleton = true;
// 全局path前缀,常用来区分不同的应用
private String namespace;
private final static String ROOT = "rpc";
private CuratorFramework zkClient;
public void setZkHosts(String zkHosts) {
this.zkHosts = zkHosts;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public void setSingleton(boolean singleton) {
this.singleton = singleton;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public void setZkClient(CuratorFramework zkClient) {
this.zkClient = zkClient;
}
@Override
public CuratorFramework getObject() throws Exception {
if (singleton) {
if (zkClient == null) {
zkClient = create();
zkClient.start();
}
return zkClient;
}
return create();
}
@Override
public Class<?> getObjectType() {
return CuratorFramework.class;
}
@Override
public boolean isSingleton() {
return singleton;
}
public CuratorFramework create() throws Exception {
if (StringUtils.isEmpty(namespace)) {
namespace = ROOT;
} else {
namespace = ROOT +"/"+ namespace;
}
return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
}
// 使用CuratorFramework创建zk客户端对象
/**
* connectString zk集群的地址,包括ip和端口
* sessionTimeout
* connectionTimeout
* namespace 不同的应用可以使用不同的命名空间区分
* ExponentialBackoffRetry表示重试机制,重连的时间间隔随着重
* 试的次数递增的,如果时间间隔计算出来大于默认的最大sleep时
* 间的话,则取最大sleep时间。ExponentialBackoffRetry 除了时间
* 的限制以外,还有最大重连次数的限制。而
* ExponentialBackoffRetry策略只是让用户设置最大sleep时间而
* 已。默认的最大时间是Integer.MAX_VALUE毫秒。
**/
public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null).build();
}
public void close() {
if (zkClient != null) {
zkClient.close();
}
}
}
1.3 将服务接口注册到zookeeper中
1.3.1 服务端注册工厂调用注册服务的方法,异步启动服务
/**
* 服务端注册服务工厂
*/
public class ThriftServiceServerFactory implements InitializingBean ,Closeable{
// 服务注册本机端口
private Integer port = 8299;
// 优先级
private Integer weight = 1;// default
// 服务实现类
private Object service;// serice实现类
//服务版本号
private String version;
// 是否只取公网ip,如果是true,zk中只注册公网ip;如果是false,zk中只注册私网ip
private boolean publicIpOnly = false;
// 解析本机IP
private ThriftServerIpResolve thriftServerIpResolve;
//服务注册
private ThriftServerAddressRegister thriftServerAddressRegister;
private ServerThread serverThread;
private boolean zkUse = true;
public void setPort(Integer port) {
this.port = port;
}
public void setWeight(Integer weight) {
this.weight = weight;
}
public void setService(Object service) {
this.service = service;
}
public void setVersion(String version) {
this.version = version;
}
public void setZkUse(boolean zkUse) {
this.zkUse = zkUse;
}
public void setPublicIpOnly(boolean publicIpOnly) {
this.publicIpOnly = publicIpOnly;
}
public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
this.thriftServerIpResolve = thriftServerIpResolve;
}
public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
this.thriftServerAddressRegister = thriftServerAddressRegister;
}
@Override
public void afterPropertiesSet() throws Exception {
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
if (StringUtils.isEmpty(serverIP)) {
throw new ThriftException("cant find server ip...");
}
String hostname = serverIP + ":" + port + ":" + weight;
Class<?> serviceClass = service.getClass();
// 获取实现类接口
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
TProcessor processor = null;
String serviceName = null;
for (Class<?> clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
serviceName = clazz.getEnclosingClass().getName();
String pname = serviceName + "$Processor";
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> pclass = classLoader.loadClass(pname);
if (!TProcessor.class.isAssignableFrom(pclass)) {
continue;
}
Constructor<?> constructor = pclass.getConstructor(clazz);
processor = (TProcessor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要单独的线程,因为serve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// 注册服务
if (zkUse && thriftServerAddressRegister != null) {
thriftServerAddressRegister.register(serviceName, version, hostname);
}
}
class ServerThread extends Thread {
private TServer server;
ServerThread(TProcessor processor, int port) throws Exception {
---------------------
/** TThreadedSelectorServer模式是thrift-server最高级的工作模
式:主要有以下几个不分组成
TThreadedSelectorServer模式是目前Thrift提供的最高级的模式,
它内部有如果几个部分构成:
(1) 一个AcceptThread线程对象,专门用于处理监听socket上的新连接;
(2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成;
(3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
(4) 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;
---------------------
**/
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
TProcessorFactory processorFactory = new TProcessorFactory(processor);
tArgs.processorFactory(processorFactory);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
tArgs.selectorThreads(4); // 设置selector线程数,默认是2
tArgs.workerThreads(32); // 设置工作线程数,默认是5,在数据库负载高时有可能会堵塞
server = new TThreadedSelectorServer(tArgs);
}
@Override
public void run(){
try{
//启动服务
server.serve();
}catch(Exception e){
//
}
}
public void stopServer(){
server.stop();
}
}
public void close() {
serverThread.stopServer();
}
}
1.3.1 真正的注册实现
第一步:监听zk连接变化;
第二步:将服务暴露的ip和port以虚拟节点的形式创建在zk的接口服务路径下,切换到指定的组;
第三步:注册成功后使用NodeCache监听节点内容变化,如果原本节点不存在,那么Cache就会在节点被创建时触发监听事件,如果该节点被删除,就无法再触发监听事件。任意节点的内容变化都会重新注册。
// 启动服务
@Override
public void afterPropertiesSet() throws Exception {
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
if (StringUtils.isEmpty(serverIP)) {
throw new ThriftException("cant find server ip...");
}
String hostname = serverIP + ":" + port + ":" + weight;
Class<?> serviceClass = service.getClass();
// 获取实现类接口
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
TProcessor processor = null;
String serviceName = null;
for (Class<?> clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
serviceName = clazz.getEnclosingClass().getName();
String pname = serviceName + "$Processor";
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> pclass = classLoader.loadClass(pname);
if (!TProcessor.class.isAssignableFrom(pclass)) {
continue;
}
Constructor<?> constructor = pclass.getConstructor(clazz);
processor = (TProcessor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要单独的线程,因为serve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// 注册服务
if (zkUse && thriftServerAddressRegister != null) {
thriftServerAddressRegister.register(serviceName, version, hostname);
}
}
class ServerThread extends Thread {
private TServer server;
ServerThread(TProcessor processor, int port) throws Exception {
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
TProcessorFactory processorFactory = new TProcessorFactory(processor);
tArgs.processorFactory(processorFactory);
tArgs.transportFactory(new TFramedTransport.Factory());
tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
tArgs.selectorThreads(4); // 设置selector线程数,默认是2
tArgs.workerThreads(32); // 设置工作线程数,默认是5,在数据库负载高时有可能会堵塞
server = new TThreadedSelectorServer(tArgs);
}
@Override
public void run(){
try{
//启动服务
server.serve();
}catch(Exception e){
//
}
}
public void stopServer(){
server.stop();
}
}
// 实现注册和节点监听
/**
* 初始化注册方法
* @param service 服务接口名称,一个产品中不能重复
* @param version 服务接口的版本号,默认1.0.0
* @param address 服务发布的地址和端口
*/
@Override
public void register(String service, String version, String address) throws InterruptedException {
// 输入校验
String[] parts = address.split(":");
if (parts.length < 3) {
logger.error("ThriftZookeeper Register Error: address invalid '" + address + "'");
throw new ThriftException("ThriftZookeeper Register Error: address invalid '" + address + "'");
}
// 拆解内容
String ip = parts[0];
String port = parts[1];
String weight = parts[2];
// 增加连接状态监听
setConnectionListener(service, version, ip, port, weight);
// 注册
generalRegister(service, version, ip, port, weight);
//监听组别变化后重新注册
setGroupListener(service, version, ip, port, weight);
}
public void setConnectionListener(final String service, final String version, final String ip, final String port, final String weight) {
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
logger.info("设置ZK连接状态监听的Listener");
zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("ZkClient状态变化:" + newState.name());
if (newState == ConnectionState.LOST) { //处理session过期
logger.info("ZkClient连接断开,session过期");
int i = 0;
while (true) {
logger.info("尝试重新连接到zk..." + (i++));
try {
if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
logger.info("尝试重新注册zk的临时节点...");
// 重新注册
generalRegister(service, version, ip, port, weight);
break;
}
} catch (InterruptedException e) {
logger.info("尝试重新注册zk的临时节点异常(InterruptedException)", e);
break;
} catch (Exception e) {
logger.info("尝试重新注册zk的临时节点异常(Exception)", e);
break;
}
}
}
}
});
}
/**
* 监听数据节点的变化重新注册
* @param version 版本号
* @param weight 服务权重信息
* @throws InterruptedException
*/
public void setGroupListener(final String service, final String version, final String ip, final String port, final String weight) throws InterruptedException {
final String ipServicePath = getInstanceGroupPath(localInstance, ip);
try {
createGroupNodeIfNotExists(localInstance, ip);
final NodeCache nodeCache = new NodeCache(zkClient, ipServicePath, false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
new NodeCacheListener() {
public void nodeChanged() throws Exception {
synchronized (lock) {
logger.info("服务端监听到节点: " + ipServicePath + " 变化, for service:" + service);
generalRegister(service, version, ip, port, weight);
}
}
}
);
} catch (Exception e) {
logger.error("nodeCache start exception:",e);
throw new ThriftException("nodeCache start exception:", e);
}
}
/**
* 通用注册方法
* @param service 服务名
* @param version 版本号
* @param ip 服务地址,IP
* @param port 服务端口
* @param weight 权重信息,格式为1-10的整数字符串形式,例如"5"
*
* 获取当前zk中组别配置,如果和本地不同,则删除zk中旧组别下的注册地址,在新组别下注册
*/
public void generalRegister(String service, String version, String ip, String port, String weight) {
logger.info("开始注册ServiceLocate, 服务: " + service);
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
String ipPort = ip + ":" + port;
// 获取组别
String ipInstancePath = getInstanceGroupPath(localInstance, ip);
String groupJson = null;
String group = "";
try {
createGroupNodeIfNotExists(localInstance, ip);
groupJson = new String(zkClient.getData().forPath(ipInstancePath));
GroupConfig groupConfig = JSON.parseObject(groupJson, GroupConfig.class);
if (groupConfig == null || groupConfig.getGroup() == null) {
throw new ThriftException("获取到错误的分组配置,分组注册不会进行,请检查配置内容是否正确");
}
group = groupConfig.getGroup();
} catch (Exception e) {
logger.error("获取组别失败, 按默认组别执行", e);
group = defaultGroup;
}
logger.info("注册到group: " + group);
// 注册
RegisterConfig registerConfig = new RegisterConfig();
registerConfig.setWeight(weight);
registerConfig.setGroup(group);
String groupWeightString = JSON.toJSONString(registerConfig);
try {
String serviceLocatePath = getServiceLocatePath(service);
// 创建当前服务定位存储节点,结构:
// serviceLocatePath
// |-- address1(服务定位存储节点,临时节点)
// |-- address2(服务定位存储节点,临时节点)
if (zkClient.checkExists().forPath(serviceLocatePath + "/" + ipPort) != null) {
// 已经存在则修改Data
zkClient.setData().forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
} else {
// 不存在则创建
// 使用creatingParentContainersIfNeeded创建服务定位根目录(固定)
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
}
// 切换当前注册组别
currentRegisterGroup = group;
logger.info("服务: " + service + " 注册到group: " + group + " 成功");
} catch (Exception e) {
logger.error("zk分组注册异常:", e);
throw new ThriftException("zk分组注册异常:", e);
}
}
/**
* 如果本地服务在ZK中不存在分组注册信息,则创建一个分组信息节点
*/
private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
GroupConfig groupConfig = new GroupConfig();
groupConfig.setGroup(defaultGroup);
groupConfig.setWeight(defaultWeight);
if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
logger.info("创建group_config,localInstance:" + localInstance + ",ip:" + ip);
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
}
}
二 基于zk的服务自动发现
1.使用NodeCache监听zk节点,也就是服务的变化;
2.使用NodePathChild监听子节点,也就是ip:port的变化;
public void afterPropertiesSet() throws Exception {
logger.info("Provider初始化开始");
// 本机IP获取
if (thriftServerIpResolve == null) {
thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
}
String ipAddress = thriftServerIpResolve.getServerIp(publicIpOnly);
if (StringUtils.isEmpty(ipAddress)) {
throw new ThriftException("can not find server ip...");
}
logger.info("Provider创建ServiceLocate监听器. targetService:" + targetService);
buildServiceLocateListener();
cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
countDownLatch.await();
logger.info("Provider创建GroupConfig监听器,localInstance:" + localInstance);
buildInstanceGroupListener(ipAddress);
logger.info("Provider初始化完成");
}
private void buildInstanceGroupListener(final String ip) {
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
final String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
try {
// 如果不存在则先创建节点
createGroupNodeIfNotExists(localInstance, ip);
groupNodeCache = new NodeCache(zkClient, serviceGroupPath, false);
groupNodeCache.getListenable().addListener(
new NodeCacheListener() {
public void nodeChanged() throws Exception {
synchronized (lock) {
groupNodeCache.rebuild();
createGroupNodeIfNotExists(localInstance, ip);
rebuildGroup();
}
}
}
);
groupNodeCache.start(true);
rebuildGroup();
} catch (Exception e) {
logger.error("nodeCache start exception:",e);
throw new ThriftException("nodeCache start exception:", e);
}
}
/**
* 如果本地服务在ZK中不存在分组注册信息,则创建一个分组信息节点
*/
private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
GroupConfig groupConfig = new GroupConfig();
groupConfig.setGroup(defaultGroup);
groupConfig.setWeight(defaultWeight);
if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
}
}
private void rebuildGroup() throws UnsupportedEncodingException {
String data = new String(groupNodeCache.getCurrentData().getData(), "utf-8");
logger.info("客户端监听到分组变化,当前内容 " + data + " 进行同步");
GroupConfig groupConfig = JSON.parseObject(data, GroupConfig.class);
if (groupConfig == null || groupConfig.getGroup() == null) {
logger.error("分组数据错误,缓存区域不会变更,请查看分组配置区数据,localInstance:" + localInstance
+ "targetService:" + targetService + ",data: " + data);
return;
}
currentGroup = groupConfig.getGroup();
}
private void buildServiceLocateListener() throws Exception {
// 如果zk尚未启动,则启动
if (zkClient.getState() == CuratorFrameworkState.LATENT) {
zkClient.start();
}
// 服务地址监听
// 寻找目标分组
final String serviceLocatePath = getServiceLocatePath(targetService);
cachedPath = new PathChildrenCache(zkClient, serviceLocatePath, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
logger.info("Connection is reconection.");
break;
case CONNECTION_SUSPENDED:
logger.info("Connection is suspended.");
break;
case CONNECTION_LOST:
logger.warn("Connection error,waiting...");
// 这个return很讲究,ZK挂掉后,目的是当ZK挂掉后,不影响本地缓存
return;
case INITIALIZED:
// countDownLatch.countDown();
logger.warn("Connection init ...");
break;
default:
}
// 任何节点的数据变动,都会rebuild,此处为一个"简单的"做法.
cachedPath.rebuild();
synchronized (lock) {
rebuild();
}
countDownLatch.countDown();
}
};
cachedPath.getListenable().addListener(childrenCacheListener);
}
protected void rebuild() {
logger.info("即将更新本地地址缓存, localService: " + localInstance + ", targetService: " + targetService);
List<ChildData> children = cachedPath.getCurrentData();
if (children == null || children.isEmpty()) {
// 有可能所有的thrift server都与zookeeper断开了链接
// 但是有可能,thrift client与thrift server之间的网络是良好的
// 因此此处是否需要清空container,是需要多方面考虑的.
container.clear();
trace.clear();
ipPortQueue.clear();
logger.error("在注册服务区无法找到子节点");
return;
}
String path = null;
Map<String, List<InetSocketAddress>> currentMap = new HashMap<String, List<InetSocketAddress>>();
try {
for (ChildData data : children) {
path = data.getPath();
String address = new String(path.getBytes(), "utf-8");
String[] parts = address.split("/");
String ipPort = parts[parts.length-1];
String jsonString = new String(data.getData(), "utf-8");
RegisterConfig registerConfig = JSON.parseObject(jsonString, RegisterConfig.class);
if (registerConfig.getWeight() == null) {
throw new ThriftException("获取权重失败");
}
String weight = registerConfig.getWeight();
String group = registerConfig.getGroup();
// 当前InetAddress列表
List<InetSocketAddress> addressList = transfer(weight, ipPort);
// 添加到容器currentMap
if (!currentMap.containsKey(group)) {
currentMap.put(group, new ArrayList<InetSocketAddress>());
}
List<InetSocketAddress> groupList = currentMap.get(group);
groupList.addAll(addressList);
currentMap.put(group, groupList);
}
trace.clear();
container.clear();
ipPortQueue.clear();
for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
String group = entry.getKey();
List<InetSocketAddress> current = entry.getValue();
Collections.shuffle(current);
// 先组装到备份容器
if (!trace.containsKey(group)) {
trace.put(group, new HashSet<InetSocketAddress>());
}
Set<InetSocketAddress> traceGroup = trace.get(group);
traceGroup.addAll(current);
// 组装容器
if (!container.containsKey(group)) {
container.put(group, new ArrayList<InetSocketAddress>());
}
List<InetSocketAddress> groupContainer = container.get(group);
groupContainer.addAll(current);
// 组装队列
if (!ipPortQueue.containsKey(group)) {
ipPortQueue.put(group, new LinkedList<InetSocketAddress>());
}
Queue<InetSocketAddress> groupQueue = ipPortQueue.get(group);
groupQueue.addAll(current);
}
logger.info("分组缓存重建完毕");
for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
logger.info("group:" + entry.getKey() + ", target:" + entry.getValue());
}
} catch (Exception e) {
logger.error("重建缓存失败" + e.getMessage());
throw new ThriftException("重建缓存失败", e);
}
}
/**
* 根据权重分配初始化"IP:PORT"集合
* @param weight 权重字符串,例如"5"
* @param ipPort 例如10.0.0.1:9050
* @return 根据权重信息返回类似10.0.0.1:9050集合
*/
private List<InetSocketAddress> transfer(String weight, String ipPort) {
List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
InetAddress ipAddress = null;
try {
ipAddress = InetAddress.getByName(ipPort.split(":")[0]);
} catch (UnknownHostException e) {
logger.error("获取IP地址失败:" + e.getMessage());
}
int port = Integer.parseInt(ipPort.split(":")[1]);
InetSocketAddress inetSocketAddress = new InetSocketAddress(ipAddress, port);
for (int i = 0; i < Integer.parseInt(weight); i++) {
result.add(inetSocketAddress);
}
return result;
}
/**
* 根据权重情况随机获取IP:PORT
* 取当前分组的队列
*/
@Override
public synchronized InetSocketAddress selector() {
Queue<InetSocketAddress> currentQueue = ipPortQueue.get(currentGroup);
List<InetSocketAddress> currentContainer = container.get(currentGroup);
Set<InetSocketAddress> currentTrace = trace.get(currentGroup);
if (currentQueue == null || currentQueue.isEmpty()) {
if (currentContainer != null && !currentContainer.isEmpty()) {
currentQueue.addAll(currentContainer);
} else if(currentTrace != null && !currentTrace.isEmpty()) {
synchronized (lock) {
currentContainer.addAll(currentTrace);
Collections.shuffle(currentContainer);
currentQueue.addAll(currentContainer);
}
}
}
if (currentQueue == null || currentQueue.size() == 0) {
logger.error("找不到可用服务,localInstance:" + localInstance + "目标服务:" + targetService);
throw new ThriftException("找不到可用服务,localInstance:" + localInstance + "目标服务:" + targetService);
}
return currentQueue.poll();
}
@Override
public boolean validateGroup(String group, InetSocketAddress address) {
// 当前trace中是否存在该address若无则排除
Set<InetSocketAddress> cTrace = trace.get(currentGroup);
if (cTrace == null) {
logger.error("找不到可用服务,Trace为空");
return false;
}
return cTrace.contains(address);
}
@Override
public String getGroup() {
return currentGroup;
}
@Override
public List<InetSocketAddress> findServerAddressList() {
List<InetSocketAddress> currentContainer = container.get(currentGroup);
return Collections.unmodifiableList(currentContainer);
}
@Override
public String getService() {
return targetService;
}
@Override
public String getServiceUrl() {
return "";
}
public void close(){
try {
cachedPath.close();
groupNodeCache.close();
} catch (IOException e) {
}
zkClient.close();
}
三 客户端jdk代理实现客户端代理调用远程服务,客户端代理交给GenericObjectPool实现创建,销毁。
@Override
public void afterPropertiesSet() throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 加载Iface接口
objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
// 加载Client.Factory类
Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
// 实现客户端连接池
ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
poolConfig.maxActive = maxActive;
poolConfig.maxIdle = 1;
poolConfig.minIdle = 0;
poolConfig.minEvictableIdleTimeMillis = idleTime;
poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L;
poolConfig.testOnBorrow=true;
poolConfig.testOnReturn=false;
poolConfig.testWhileIdle=false;
pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
// 创建客户端代理,实现远程调用,异常,超时重试机制
proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 记录调用开始时间t1
long t1 = System.currentTimeMillis();
// 调用远程的目标方法,重试3次
Object reObject = null;
Exception reException = null;
TServiceClient client = null;
boolean success = false;
int MAX_RETRY = 3, retry = 0;
while (++retry <= MAX_RETRY) {
client = pool.borrowObject();
boolean flag = true;
try {
reObject = method.invoke(client, args);
} catch (Exception e) {
flag = false;
reException = e;
logger.info("retry:"+e.getMessage(),e);
} finally {
logger.info("retry time:"+retry);
if (flag) {
pool.returnObject(client);
} else {
pool.invalidateObject(client);
}
}
// 执行成功,退出循环
if (flag) {
success = true;
break;
}
// 只有超时类异常进行重试
boolean needRetry = false;
if (reException != null && reException instanceof InvocationTargetException) {
Throwable cause1 = reException.getCause();
if (cause1 != null && cause1 instanceof TTransportException) {
Throwable cause2 = cause1.getCause();
if (cause2 != null && (cause2 instanceof SocketTimeoutException || cause2 instanceof ConnectTimeoutException)) {
if (method.getName() != null && method.getName().startsWith("get")) {
logger.info("timeout needRetry set true");
needRetry = true;
}
}
}
}
if (!needRetry)
break;
}
// 记录调用结束时间t2
long t2 = System.currentTimeMillis();
printLog(client, method, (t2 - t1), success, retry > MAX_RETRY ? MAX_RETRY : retry);
if (!success) {
if (reException instanceof InvocationTargetException)
throw ((InvocationTargetException) reException).getTargetException();
else
throw reException;
} else {
return reObject;
}
}
});
}
3.1 客户端连接池的实现
// 客户端销毁
@Override
public void destroyObject(TServiceClient client) throws Exception {
if (callback != null) {
try {
callback.destroy(client);
} catch (Exception e) {
logger.warn("destroyObject:{}", e);
}
}
clientGroupMap.remove(client);
clientAddressMap.remove(client);
logger.info("destroyObject:{}", client);
TTransport pin = client.getInputProtocol().getTransport();
pin.close();
TTransport pout = client.getOutputProtocol().getTransport();
pout.close();
}
// 客户端创建
@Override
public TServiceClient makeObject() throws Exception {
InetSocketAddress address = serverAddressProvider.selector();
String group = serverAddressProvider.getGroup();
if (address == null) {
new ThriftException("No provider available for remote service");
}
TTransport transport;
TProtocol protocol;
if (StringUtils.isEmpty(serverAddressProvider.getServiceUrl())) {
// 如果serviceUrl是空,则采用TFramedTransport,适用于Java的thrift服务
// socket超时30s,connect超时5s
TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000, 5000);
transport = new TFramedTransport(tsocket);
protocol = new TBinaryProtocol(transport);
} else {
// 如果serviceUrl不空,则采用THttpClient的transport,适用于php的thrift服务
String url = "";
try {
url = "http://" + address.getHostName() + ":" + address.getPort()
+ serverAddressProvider.getServiceUrl();
} catch (NullPointerException e) {
if (address == null) {
logger.error("address is null");
}
if (serverAddressProvider == null) {
logger.error("serverAddressProvider is null");
}
throw e;
}
transport = new THttpClient(url);
protocol = new TBinaryProtocol(transport);
}
TServiceClient client = this.clientFactory.getClient(protocol);
clientGroupMap.put(client, group);
clientAddressMap.put(client, address);
transport.open();
if (callback != null) {
try {
callback.make(client);
} catch (Exception e) {
logger.warn("makeObject:{}", e);
}
}
return client;
}