thrift+zk实现服务的注册,发现,调用

本文基于实际生产环境中的Thrift+zookeeper实现的rpc调用总结,大致有以下几个部分:
1: 服务端将服务注册在zk中
1.1 解析服务端的网卡IP;
1.2 获取zookeeper客户端对象;
1.3 实现服务接口的注册;
2: 基于zookeeper实现服务接口的自动发现
3: 实现客户端连接池和客户端通过代理调用服务

一 服务端将服务注册在zk中

调用图:
服务注册流程图.png

代码展示

1.1解析thrift-server端IP地址,用于注册服务

接口
public interface ThriftServerIpResolve {
    // 获取服务所在机器的Ip
    String getServerIp() throws Exception;

    String getServerIp(boolean publicIpOnly) throws Exception;
    
    void reset();
    
    //当IP变更时,将会调用reset方法
    static interface IpRestCalllBack{
        public void rest(String newIp);
    }
}

实现
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
    
    private Logger logger = LoggerFactory.getLogger(getClass());

    //缓存
    private String serverIp;
    
    public void setServerIp(String serverIp) {
        this.serverIp = serverIp;
    }

    @Override
    public String getServerIp() {
        return getServerIp(false);
    }

    @Override
    public String getServerIp(boolean publicIpOnly) {
        if (serverIp != null) {
            return serverIp;
        }
        // 一个主机有多个网络接口
        try {
            Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
            while (netInterfaces.hasMoreElements()) {
                NetworkInterface netInterface = netInterfaces.nextElement();
                // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress address = addresses.nextElement();
                    if(address instanceof Inet6Address){
                        continue;
                    }
                    if (!address.isLoopbackAddress()) {
                        if (publicIpOnly && !address.isSiteLocalAddress()) {
                            serverIp = address.getHostAddress();
                            logger.info("resolve server ip :" + serverIp);
                            continue;
                        } else if (!publicIpOnly && address.isSiteLocalAddress()) {
                            serverIp = address.getHostAddress();
                            logger.info("resolve server ip :" + serverIp);
                            continue;
                        }
                    }
                }
            }
        } catch (SocketException e) {
            e.printStackTrace();
        }
        return serverIp;
    }

    @Override
    public void reset() {
        serverIp = null;
    }
}

1.2 获取zookeeper客户端链接对象

public class ZookeeperFactory implements FactoryBean<CuratorFramework> ,Closeable{

    private String zkHosts;
    // session超时
    private int sessionTimeout = 30000;
    private int connectionTimeout = 30000;

    // 共享一个zk链接
    private boolean singleton = true;

    // 全局path前缀,常用来区分不同的应用
    private String namespace;

    private final static String ROOT = "rpc";

    private CuratorFramework zkClient;

    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public void setSingleton(boolean singleton) {
        this.singleton = singleton;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    @Override
    public CuratorFramework getObject() throws Exception {
        if (singleton) {
            if (zkClient == null) {
                zkClient = create();
                zkClient.start();
            }
            return zkClient;
        }
        return create();
    }

    @Override
    public Class<?> getObjectType() {
        return CuratorFramework.class;
    }

    @Override
    public boolean isSingleton() {
        return singleton;
    }

    public CuratorFramework create() throws Exception {
        if (StringUtils.isEmpty(namespace)) {
            namespace = ROOT;
        } else {
            namespace = ROOT +"/"+ namespace;
        }
        return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
    }

    // 使用CuratorFramework创建zk客户端对象
    /**
     *  connectString zk集群的地址,包括ip和端口
     *  sessionTimeout 
     *  connectionTimeout
     *  namespace 不同的应用可以使用不同的命名空间区分
     *  ExponentialBackoffRetry表示重试机制,重连的时间间隔随着重
     *  试的次数递增的,如果时间间隔计算出来大于默认的最大sleep时 
     *  间的话,则取最大sleep时间。ExponentialBackoffRetry 除了时间 
     *  的限制以外,还有最大重连次数的限制。而 
     *  ExponentialBackoffRetry策略只是让用户设置最大sleep时间而 
     *  已。默认的最大时间是Integer.MAX_VALUE毫秒。 
     **/
    public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
        return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
                .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
                .defaultData(null).build();
    }

    public void close() {
        if (zkClient != null) {
            zkClient.close();
        }
    }
}

1.3 将服务接口注册到zookeeper中
1.3.1 服务端注册工厂调用注册服务的方法,异步启动服务

/**
 * 服务端注册服务工厂
 */
public class ThriftServiceServerFactory implements InitializingBean ,Closeable{
    // 服务注册本机端口
    private Integer port = 8299;
    // 优先级
    private Integer weight = 1;// default
    // 服务实现类
    private Object service;// serice实现类
    //服务版本号
    private String version;
    // 是否只取公网ip,如果是true,zk中只注册公网ip;如果是false,zk中只注册私网ip
    private boolean publicIpOnly = false;
    // 解析本机IP
    private ThriftServerIpResolve thriftServerIpResolve;
    //服务注册
    private ThriftServerAddressRegister thriftServerAddressRegister;

    private ServerThread serverThread;

    private boolean zkUse = true;
    
    public void setPort(Integer port) {
        this.port = port;
    }

    public void setWeight(Integer weight) {
        this.weight = weight;
    }

    public void setService(Object service) {
        this.service = service;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public void setZkUse(boolean zkUse) {
        this.zkUse = zkUse;
    }

    public void setPublicIpOnly(boolean publicIpOnly) {
        this.publicIpOnly = publicIpOnly;
    }

    public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
        this.thriftServerIpResolve = thriftServerIpResolve;
    }

    public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
        this.thriftServerAddressRegister = thriftServerAddressRegister;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (thriftServerIpResolve == null) {
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
        if (StringUtils.isEmpty(serverIP)) {
            throw new ThriftException("cant find server ip...");
        }

        String hostname = serverIP + ":" + port + ":" + weight;
        Class<?> serviceClass = service.getClass();
        // 获取实现类接口
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        // reflect,load "Processor";
        TProcessor processor = null;
        String serviceName = null;

        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            serviceName = clazz.getEnclosingClass().getName();
            String pname = serviceName + "$Processor";
            try {
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                Class<?> pclass = classLoader.loadClass(pname);
                if (!TProcessor.class.isAssignableFrom(pclass)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (TProcessor) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                //
            }
        }
        if (processor == null) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        //需要单独的线程,因为serve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // 注册服务
        if (zkUse && thriftServerAddressRegister != null) {
            thriftServerAddressRegister.register(serviceName, version, hostname);
        }

    }
    class ServerThread extends Thread {
        private TServer server;
        ServerThread(TProcessor processor, int port) throws Exception {
--------------------- 
        /** TThreadedSelectorServer模式是thrift-server最高级的工作模 
       式:主要有以下几个不分组成
      TThreadedSelectorServer模式是目前Thrift提供的最高级的模式, 
      它内部有如果几个部分构成:

     (1)  一个AcceptThread线程对象,专门用于处理监听socket上的新连接;

     (2)  若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成;

     (3)  一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。

     (4)  一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;
   --------------------- 
  **/
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
            TProcessorFactory processorFactory = new TProcessorFactory(processor);
            tArgs.processorFactory(processorFactory);
            tArgs.transportFactory(new TFramedTransport.Factory());  
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
            tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
            tArgs.selectorThreads(4); // 设置selector线程数,默认是2
            tArgs.workerThreads(32); // 设置工作线程数,默认是5,在数据库负载高时有可能会堵塞
            server = new TThreadedSelectorServer(tArgs);
        }

        @Override
        public void run(){
            try{
                //启动服务
                server.serve();
            }catch(Exception e){
                //
            }
        }
        
        public void stopServer(){
            server.stop();
        }
    }
    
    public void close() {
        serverThread.stopServer();
    }
}

1.3.1 真正的注册实现
第一步:监听zk连接变化;
第二步:将服务暴露的ip和port以虚拟节点的形式创建在zk的接口服务路径下,切换到指定的组;
第三步:注册成功后使用NodeCache监听节点内容变化,如果原本节点不存在,那么Cache就会在节点被创建时触发监听事件,如果该节点被删除,就无法再触发监听事件。任意节点的内容变化都会重新注册。

// 启动服务
@Override
    public void afterPropertiesSet() throws Exception {
        if (thriftServerIpResolve == null) {
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String serverIP = thriftServerIpResolve.getServerIp(publicIpOnly);
        if (StringUtils.isEmpty(serverIP)) {
            throw new ThriftException("cant find server ip...");
        }

        String hostname = serverIP + ":" + port + ":" + weight;
        Class<?> serviceClass = service.getClass();
        // 获取实现类接口
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        // reflect,load "Processor";
        TProcessor processor = null;
        String serviceName = null;

        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            serviceName = clazz.getEnclosingClass().getName();
            String pname = serviceName + "$Processor";
            try {
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                Class<?> pclass = classLoader.loadClass(pname);
                if (!TProcessor.class.isAssignableFrom(pclass)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (TProcessor) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                //
            }
        }
        if (processor == null) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        //需要单独的线程,因为serve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // 注册服务
        if (zkUse && thriftServerAddressRegister != null) {
            thriftServerAddressRegister.register(serviceName, version, hostname);
        }

    }
    class ServerThread extends Thread {
        private TServer server;
        ServerThread(TProcessor processor, int port) throws Exception {
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
            TProcessorFactory processorFactory = new TProcessorFactory(processor);
            tArgs.processorFactory(processorFactory);
            tArgs.transportFactory(new TFramedTransport.Factory());  
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
            tArgs.maxReadBufferBytes = 1024 * 1024L; // 防止direct memory oom
            tArgs.selectorThreads(4); // 设置selector线程数,默认是2
            tArgs.workerThreads(32); // 设置工作线程数,默认是5,在数据库负载高时有可能会堵塞
            server = new TThreadedSelectorServer(tArgs);
        }

        @Override
        public void run(){
            try{
                //启动服务
                server.serve();
            }catch(Exception e){
                //
            }
        }
        
        public void stopServer(){
            server.stop();
        }
    }


// 实现注册和节点监听
/**
     * 初始化注册方法
     * @param service 服务接口名称,一个产品中不能重复
     * @param version 服务接口的版本号,默认1.0.0
     * @param address 服务发布的地址和端口
     */
    @Override
    public void register(String service, String version, String address) throws InterruptedException {
        // 输入校验
        String[] parts = address.split(":");
        if (parts.length < 3) {
            logger.error("ThriftZookeeper Register Error: address invalid '" + address + "'");
            throw new ThriftException("ThriftZookeeper Register Error: address invalid '" + address + "'");
        }

        // 拆解内容
        String ip = parts[0];
        String port = parts[1];
        String weight = parts[2];

        // 增加连接状态监听
        setConnectionListener(service, version, ip, port, weight);

        // 注册
        generalRegister(service, version, ip, port, weight);

        //监听组别变化后重新注册
        setGroupListener(service, version, ip, port, weight);
    }

    public void setConnectionListener(final String service, final String version, final String ip, final String port, final String weight)  {
        // 如果zk尚未启动,则启动
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        logger.info("设置ZK连接状态监听的Listener");
        zkClient.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                logger.info("ZkClient状态变化:" + newState.name());
                if (newState == ConnectionState.LOST) { //处理session过期
                    logger.info("ZkClient连接断开,session过期");

                    int i = 0;
                    while (true) {
                        logger.info("尝试重新连接到zk..." + (i++));
                        try {
                            if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
                                logger.info("尝试重新注册zk的临时节点...");

                                // 重新注册
                                generalRegister(service, version, ip, port, weight);

                                break;
                            }
                        } catch (InterruptedException e) {
                            logger.info("尝试重新注册zk的临时节点异常(InterruptedException)", e);
                            break;
                        } catch (Exception e) {
                            logger.info("尝试重新注册zk的临时节点异常(Exception)", e);
                            break;
                        }
                    }
                }
            }
        });
    }

    /**
     * 监听数据节点的变化重新注册
     * @param version 版本号
     * @param weight 服务权重信息
     * @throws InterruptedException
     */
    public void setGroupListener(final String service, final String version, final String ip, final String port, final String weight) throws InterruptedException {
        final String ipServicePath = getInstanceGroupPath(localInstance, ip);
        try {
            createGroupNodeIfNotExists(localInstance, ip);
            final NodeCache nodeCache = new NodeCache(zkClient, ipServicePath, false);
            nodeCache.start(true);
            nodeCache.getListenable().addListener(
                    new NodeCacheListener() {
                        public void nodeChanged() throws Exception {
                            synchronized (lock) {
                                logger.info("服务端监听到节点: " + ipServicePath + " 变化, for service:" + service);
                                generalRegister(service, version, ip, port, weight);
                            }
                        }
                    }
            );
        } catch (Exception e) {
            logger.error("nodeCache start exception:",e);
            throw new ThriftException("nodeCache start exception:", e);
        }
    }


    /**
     * 通用注册方法
     * @param service 服务名
     * @param version 版本号
     * @param ip 服务地址,IP
     * @param port 服务端口
     * @param weight 权重信息,格式为1-10的整数字符串形式,例如"5"
     *
     * 获取当前zk中组别配置,如果和本地不同,则删除zk中旧组别下的注册地址,在新组别下注册
     */
    public void generalRegister(String service, String version, String ip, String port, String weight) {
        logger.info("开始注册ServiceLocate, 服务: " + service);
        // 如果zk尚未启动,则启动
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }
        String ipPort = ip + ":" + port;

        // 获取组别
        String ipInstancePath = getInstanceGroupPath(localInstance, ip);
        String groupJson = null;
        String group = "";
        try {
            createGroupNodeIfNotExists(localInstance, ip);
            groupJson = new String(zkClient.getData().forPath(ipInstancePath));
            GroupConfig groupConfig = JSON.parseObject(groupJson, GroupConfig.class);
            if (groupConfig == null || groupConfig.getGroup() == null) {
                throw new ThriftException("获取到错误的分组配置,分组注册不会进行,请检查配置内容是否正确");
            }
            group = groupConfig.getGroup();
        } catch (Exception e) {
            logger.error("获取组别失败, 按默认组别执行", e);
            group = defaultGroup;
        }
        logger.info("注册到group: " + group);

        // 注册
        RegisterConfig registerConfig = new RegisterConfig();
        registerConfig.setWeight(weight);
        registerConfig.setGroup(group);
        String groupWeightString = JSON.toJSONString(registerConfig);
        try {
            String serviceLocatePath = getServiceLocatePath(service);

            // 创建当前服务定位存储节点,结构:
            // serviceLocatePath
            //    |-- address1(服务定位存储节点,临时节点)
            //    |-- address2(服务定位存储节点,临时节点)
            if (zkClient.checkExists().forPath(serviceLocatePath + "/" + ipPort) != null) {
                // 已经存在则修改Data
                zkClient.setData().forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
            } else {
                // 不存在则创建
                // 使用creatingParentContainersIfNeeded创建服务定位根目录(固定)
                zkClient.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(serviceLocatePath + "/" + ipPort, groupWeightString.getBytes());
            }

            // 切换当前注册组别
            currentRegisterGroup = group;
            logger.info("服务: " + service + " 注册到group: " + group + " 成功");
        } catch (Exception e) {
            logger.error("zk分组注册异常:", e);
            throw new ThriftException("zk分组注册异常:", e);
        }
    }

    /**
     * 如果本地服务在ZK中不存在分组注册信息,则创建一个分组信息节点
     */
    private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        String serviceGroupPath = getInstanceGroupPath(localInstance, ip);

        GroupConfig groupConfig = new GroupConfig();
        groupConfig.setGroup(defaultGroup);
        groupConfig.setWeight(defaultWeight);

        if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
            logger.info("创建group_config,localInstance:" + localInstance + ",ip:" + ip);
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
        }
    }

二 基于zk的服务自动发现


服务自动发现.png

1.使用NodeCache监听zk节点,也就是服务的变化;
2.使用NodePathChild监听子节点,也就是ip:port的变化;

public void afterPropertiesSet() throws Exception {
        logger.info("Provider初始化开始");
        // 本机IP获取
        if (thriftServerIpResolve == null) {
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String ipAddress = thriftServerIpResolve.getServerIp(publicIpOnly);
        if (StringUtils.isEmpty(ipAddress)) {
            throw new ThriftException("can not find server ip...");
        }
        logger.info("Provider创建ServiceLocate监听器. targetService:" + targetService);
        buildServiceLocateListener();
        cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
        countDownLatch.await();
        logger.info("Provider创建GroupConfig监听器,localInstance:" + localInstance);
        buildInstanceGroupListener(ipAddress);
        logger.info("Provider初始化完成");
    }

    private void buildInstanceGroupListener(final String ip) {
        // 如果zk尚未启动,则启动
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        final String serviceGroupPath = getInstanceGroupPath(localInstance, ip);
        try {
            // 如果不存在则先创建节点
            createGroupNodeIfNotExists(localInstance, ip);
            groupNodeCache = new NodeCache(zkClient, serviceGroupPath, false);
            groupNodeCache.getListenable().addListener(
                    new NodeCacheListener() {
                        public void nodeChanged() throws Exception {
                            synchronized (lock) {
                                groupNodeCache.rebuild();
                                createGroupNodeIfNotExists(localInstance, ip);
                                rebuildGroup();
                            }
                        }
                    }
            );
            groupNodeCache.start(true);
            rebuildGroup();
        } catch (Exception e) {
            logger.error("nodeCache start exception:",e);
            throw new ThriftException("nodeCache start exception:", e);
        }
    }

    /**
     * 如果本地服务在ZK中不存在分组注册信息,则创建一个分组信息节点
     */
    private void createGroupNodeIfNotExists(String localInstance, String ip) throws Exception {
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        String serviceGroupPath = getInstanceGroupPath(localInstance, ip);

        GroupConfig groupConfig = new GroupConfig();
        groupConfig.setGroup(defaultGroup);
        groupConfig.setWeight(defaultWeight);

        if (zkClient.checkExists().forPath(serviceGroupPath) == null) {
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(serviceGroupPath, JSON.toJSONString(groupConfig).getBytes());
        }
    }

    private void rebuildGroup() throws UnsupportedEncodingException {
        String data = new String(groupNodeCache.getCurrentData().getData(), "utf-8");
        logger.info("客户端监听到分组变化,当前内容 " + data + " 进行同步");
        GroupConfig groupConfig = JSON.parseObject(data, GroupConfig.class);
        if (groupConfig == null || groupConfig.getGroup() == null) {
            logger.error("分组数据错误,缓存区域不会变更,请查看分组配置区数据,localInstance:" + localInstance
                    + "targetService:" + targetService + ",data: " + data);
            return;
        }
        currentGroup = groupConfig.getGroup();
    }

    private void buildServiceLocateListener() throws Exception {
        // 如果zk尚未启动,则启动
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }

        // 服务地址监听
        // 寻找目标分组
        final String serviceLocatePath = getServiceLocatePath(targetService);
        cachedPath = new PathChildrenCache(zkClient, serviceLocatePath, true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        logger.info("Connection is reconection.");
                        break;
                    case CONNECTION_SUSPENDED:
                        logger.info("Connection is suspended.");
                        break;
                    case CONNECTION_LOST:
                        logger.warn("Connection error,waiting...");
                        // 这个return很讲究,ZK挂掉后,目的是当ZK挂掉后,不影响本地缓存
                        return;
                    case INITIALIZED:
                        //  countDownLatch.countDown();
                        logger.warn("Connection init ...");
                        break;
                    default:
                }
                // 任何节点的数据变动,都会rebuild,此处为一个"简单的"做法.
                cachedPath.rebuild();
                synchronized (lock) {
                    rebuild();
                }
                countDownLatch.countDown();
            }
        };

        cachedPath.getListenable().addListener(childrenCacheListener);
    }

    protected void rebuild() {
        logger.info("即将更新本地地址缓存, localService: " + localInstance + ", targetService: " + targetService);
        List<ChildData> children = cachedPath.getCurrentData();
        if (children == null || children.isEmpty()) {
            // 有可能所有的thrift server都与zookeeper断开了链接
            // 但是有可能,thrift client与thrift server之间的网络是良好的
            // 因此此处是否需要清空container,是需要多方面考虑的.
            container.clear();
            trace.clear();
            ipPortQueue.clear();
            logger.error("在注册服务区无法找到子节点");
            return;
        }

        String path = null;
        Map<String, List<InetSocketAddress>> currentMap = new HashMap<String, List<InetSocketAddress>>();
        try {
            for (ChildData data : children) {
                path = data.getPath();
                String address = new String(path.getBytes(), "utf-8");
                String[] parts = address.split("/");
                String ipPort = parts[parts.length-1];
                String jsonString = new String(data.getData(), "utf-8");
                RegisterConfig registerConfig = JSON.parseObject(jsonString, RegisterConfig.class);
                if (registerConfig.getWeight() == null) {
                    throw new ThriftException("获取权重失败");
                }
                String weight = registerConfig.getWeight();
                String group = registerConfig.getGroup();

                // 当前InetAddress列表
                List<InetSocketAddress> addressList = transfer(weight, ipPort);

                // 添加到容器currentMap
                if (!currentMap.containsKey(group)) {
                    currentMap.put(group, new ArrayList<InetSocketAddress>());
                }
                List<InetSocketAddress> groupList = currentMap.get(group);
                groupList.addAll(addressList);
                currentMap.put(group, groupList);
            }

            trace.clear();
            container.clear();
            ipPortQueue.clear();
            for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
                String group = entry.getKey();
                List<InetSocketAddress> current = entry.getValue();
                Collections.shuffle(current);

                // 先组装到备份容器
                if (!trace.containsKey(group)) {
                    trace.put(group, new HashSet<InetSocketAddress>());
                }
                Set<InetSocketAddress> traceGroup = trace.get(group);
                traceGroup.addAll(current);

                // 组装容器
                if (!container.containsKey(group)) {
                    container.put(group, new ArrayList<InetSocketAddress>());
                }
                List<InetSocketAddress> groupContainer = container.get(group);
                groupContainer.addAll(current);

                // 组装队列
                if (!ipPortQueue.containsKey(group)) {
                    ipPortQueue.put(group, new LinkedList<InetSocketAddress>());
                }
                Queue<InetSocketAddress> groupQueue = ipPortQueue.get(group);
                groupQueue.addAll(current);
            }


            logger.info("分组缓存重建完毕");
            for (Map.Entry<String, List<InetSocketAddress>> entry : currentMap.entrySet()) {
                logger.info("group:" + entry.getKey() + ", target:" + entry.getValue());
            }
        } catch (Exception e) {
            logger.error("重建缓存失败" + e.getMessage());
            throw new ThriftException("重建缓存失败", e);
        }
    }



    /**
     * 根据权重分配初始化"IP:PORT"集合
     * @param weight 权重字符串,例如"5"
     * @param ipPort 例如10.0.0.1:9050
     * @return 根据权重信息返回类似10.0.0.1:9050集合
     */
    private List<InetSocketAddress> transfer(String weight, String ipPort) {
        List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
        InetAddress ipAddress = null;
        try {
            ipAddress = InetAddress.getByName(ipPort.split(":")[0]);
        } catch (UnknownHostException e) {
            logger.error("获取IP地址失败:" + e.getMessage());
        }
        int port = Integer.parseInt(ipPort.split(":")[1]);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(ipAddress, port);
        for (int i = 0; i < Integer.parseInt(weight); i++) {
            result.add(inetSocketAddress);
        }
        return result;
    }


    /**
     * 根据权重情况随机获取IP:PORT
     * 取当前分组的队列
     */
    @Override
    public synchronized InetSocketAddress selector() {
        Queue<InetSocketAddress> currentQueue = ipPortQueue.get(currentGroup);
        List<InetSocketAddress> currentContainer = container.get(currentGroup);
        Set<InetSocketAddress> currentTrace = trace.get(currentGroup);
        if (currentQueue == null || currentQueue.isEmpty()) {
            if (currentContainer != null && !currentContainer.isEmpty()) {
                currentQueue.addAll(currentContainer);
            } else if(currentTrace != null && !currentTrace.isEmpty()) {
                synchronized (lock) {
                    currentContainer.addAll(currentTrace);
                    Collections.shuffle(currentContainer);
                    currentQueue.addAll(currentContainer);
                }
            }
        }

        if (currentQueue == null || currentQueue.size() == 0) {
            logger.error("找不到可用服务,localInstance:" + localInstance + "目标服务:" + targetService);
            throw new ThriftException("找不到可用服务,localInstance:" + localInstance + "目标服务:" + targetService);
        }
        return currentQueue.poll();
    }

    @Override
    public boolean validateGroup(String group, InetSocketAddress address) {
        // 当前trace中是否存在该address若无则排除
        Set<InetSocketAddress> cTrace = trace.get(currentGroup);
        if (cTrace == null) {
            logger.error("找不到可用服务,Trace为空");
            return false;
        }
        return cTrace.contains(address);
    }

    @Override
    public String getGroup() {
        return currentGroup;
    }

    @Override
    public List<InetSocketAddress> findServerAddressList() {
        List<InetSocketAddress> currentContainer = container.get(currentGroup);
        return Collections.unmodifiableList(currentContainer);
    }

    @Override
    public String getService() {
        return targetService;
    }

    @Override
    public String getServiceUrl() {
        return "";
    }

    public void close(){
        try {
            cachedPath.close();
            groupNodeCache.close();
        } catch (IOException e) {
        }
        zkClient.close();
    }

三 客户端jdk代理实现客户端代理调用远程服务,客户端代理交给GenericObjectPool实现创建,销毁。

@Override
    public void afterPropertiesSet() throws Exception {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        // 加载Iface接口
        objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
        // 加载Client.Factory类
        Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
        TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
             // 实现客户端连接池
        ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
        GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
        poolConfig.maxActive = maxActive;
        poolConfig.maxIdle = 1;
        poolConfig.minIdle = 0;
        poolConfig.minEvictableIdleTimeMillis = idleTime;
        poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L;
        poolConfig.testOnBorrow=true;
        poolConfig.testOnReturn=false;
        poolConfig.testWhileIdle=false;
        pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
                // 创建客户端代理,实现远程调用,异常,超时重试机制
        proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 记录调用开始时间t1
                long t1 = System.currentTimeMillis();

                // 调用远程的目标方法,重试3次
                Object reObject = null;
                Exception reException = null;
                TServiceClient client = null;
                boolean success = false;
                int MAX_RETRY = 3, retry = 0;
                while (++retry <= MAX_RETRY) {
                    client = pool.borrowObject();
                    boolean flag = true;
                    try {
                        reObject = method.invoke(client, args);
                    } catch (Exception e) {
                        flag = false;
                        reException = e;
                        logger.info("retry:"+e.getMessage(),e);
                    } finally {
                        logger.info("retry time:"+retry);
                        if (flag) {
                            pool.returnObject(client);
                        } else {
                            pool.invalidateObject(client);
                        }
                    }

                    // 执行成功,退出循环
                    if (flag) {
                        success = true;
                        break;
                    }

                    // 只有超时类异常进行重试
                    boolean needRetry = false;
                    if (reException != null && reException instanceof InvocationTargetException) {
                        Throwable cause1 = reException.getCause();
                        if (cause1 != null && cause1 instanceof TTransportException) {
                            Throwable cause2 = cause1.getCause();
                            if (cause2 != null && (cause2 instanceof SocketTimeoutException || cause2 instanceof ConnectTimeoutException)) {
                                if (method.getName() != null && method.getName().startsWith("get")) {
                                    logger.info("timeout needRetry set true");
                                    needRetry = true;
                                }
                            }
                        }
                    }

                    if (!needRetry)
                        break;
                }

                // 记录调用结束时间t2
                long t2 = System.currentTimeMillis();
                printLog(client, method, (t2 - t1), success, retry > MAX_RETRY ? MAX_RETRY : retry);

                if (!success) {
                    if (reException instanceof InvocationTargetException)
                        throw ((InvocationTargetException) reException).getTargetException();
                    else
                        throw reException;
                } else {
                    return reObject;
                }
            }
        });
    }

3.1 客户端连接池的实现

// 客户端销毁
@Override
    public void destroyObject(TServiceClient client) throws Exception {
        if (callback != null) {
            try {
                callback.destroy(client);
            } catch (Exception e) {
                logger.warn("destroyObject:{}", e);
            }
        }
        clientGroupMap.remove(client);
        clientAddressMap.remove(client);
        logger.info("destroyObject:{}", client);
        TTransport pin = client.getInputProtocol().getTransport();
        pin.close();
        TTransport pout = client.getOutputProtocol().getTransport();
        pout.close();
    }
// 客户端创建
@Override
    public TServiceClient makeObject() throws Exception {
        InetSocketAddress address = serverAddressProvider.selector();
        String group = serverAddressProvider.getGroup();
        if (address == null) {
            new ThriftException("No provider available for remote service");
        }

        TTransport transport;
        TProtocol protocol;
        if (StringUtils.isEmpty(serverAddressProvider.getServiceUrl())) {
            // 如果serviceUrl是空,则采用TFramedTransport,适用于Java的thrift服务
            // socket超时30s,connect超时5s
            TSocket tsocket = new TSocket(address.getHostName(), address.getPort(), 10000, 5000);
            transport = new TFramedTransport(tsocket);
            protocol = new TBinaryProtocol(transport);
        } else {
            // 如果serviceUrl不空,则采用THttpClient的transport,适用于php的thrift服务
            String url = "";
            try {
                url = "http://" + address.getHostName() + ":" + address.getPort()
                        + serverAddressProvider.getServiceUrl();
            } catch (NullPointerException e) {
                if (address == null) {
                    logger.error("address is null");
                }
                if (serverAddressProvider == null) {
                    logger.error("serverAddressProvider is null");
                }
                throw e;
            }
            transport = new THttpClient(url);
            protocol = new TBinaryProtocol(transport);
        }

        TServiceClient client = this.clientFactory.getClient(protocol);
        clientGroupMap.put(client, group);
        clientAddressMap.put(client, address);
        transport.open();
        if (callback != null) {
            try {
                callback.make(client);
            } catch (Exception e) {
                logger.warn("makeObject:{}", e);
            }
        }
        return client;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容