聊聊elasticsearch的TransportProxyClient

本文主要研究一下elasticsearch的TransportProxyClient

TransportProxyClient

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportProxyClient.java

final class TransportProxyClient {

    private final TransportClientNodesService nodesService;
    private final Map<Action, TransportActionNodeProxy> proxies;

    TransportProxyClient(Settings settings, TransportService transportService,
                                TransportClientNodesService nodesService, List<GenericAction> actions) {
        this.nodesService = nodesService;
        Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
        for (GenericAction action : actions) {
            if (action instanceof Action) {
                proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.proxies = unmodifiableMap(proxies);
    }

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
        ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
                                                                              final Request request, ActionListener<Response> listener) {
        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        assert proxy != null : "no proxy found for action: " + action;
        nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
    }
}
  • TransportProxyClient的构造器接收Settings、TransportService、TransportClientNodesService、List<GenericAction>四个参数
  • TransportProxyClient的构造器会根据actions来给每个action创建TransportActionNodeProxy,并放入到名为proxies的map中
  • TransportProxyClient主要是提供了execute方法,该方法从proxies取出对应的TransportActionNodeProxy,然后通过TransportClientNodesService的execute方法来执行proxy.execute方法

TransportActionNodeProxy

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/action/TransportActionNodeProxy.java

public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {

    private final TransportService transportService;
    private final GenericAction<Request, Response> action;
    private final TransportRequestOptions transportOptions;

    public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) {
        super(settings);
        this.action = action;
        this.transportService = transportService;
        this.transportOptions = action.transportOptions(settings);
    }

    public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }
        transportService.sendRequest(node, action.name(), request, transportOptions,
            new ActionListenerResponseHandler<>(listener, action::newResponse));
    }
}
  • TransportActionNodeProxy的构造器要求输入Settings、GenericAction、TransportService三个参数;TransportActionNodeProxy提供了execute方法,它的方法参数要求输入DiscoveryNode、Request、ActionListener,该方法主要是对ActionListener包装为ActionListenerResponseHandler,然后调用transportService.sendRequest

Nodes

TransportClientNodesService Nodes

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java

final class TransportClientNodesService extends AbstractComponent implements Closeable {

    private final TimeValue nodesSamplerInterval;

    private final long pingTimeout;

    private final ClusterName clusterName;

    private final TransportService transportService;

    private final ThreadPool threadPool;

    private final Version minCompatibilityVersion;

    // nodes that are added to be discovered
    private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();

    private final Object mutex = new Object();

    private volatile List<DiscoveryNode> nodes = Collections.emptyList();
    // Filtered nodes are nodes whose cluster name does not match the configured cluster name
    private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();

    private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();

    private final NodeSampler nodesSampler;

    private volatile ScheduledFuture nodesSamplerFuture;

    private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());

    private final boolean ignoreClusterName;

    private volatile boolean closed;

    private final TransportClient.HostFailureListener hostFailureListener;

    //......

    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
        synchronized (mutex) {
            if (closed) {
                throw new IllegalStateException("transport client is closed, can't add an address");
            }
            List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
            for (TransportAddress transportAddress : transportAddresses) {
                boolean found = false;
                for (DiscoveryNode otherNode : listedNodes) {
                    if (otherNode.getAddress().equals(transportAddress)) {
                        found = true;
                        logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                        break;
                    }
                }
                if (!found) {
                    filtered.add(transportAddress);
                }
            }
            if (filtered.isEmpty()) {
                return this;
            }
            List<DiscoveryNode> builder = new ArrayList<>(listedNodes);
            for (TransportAddress transportAddress : filtered) {
                DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(),
                        transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);
                logger.debug("adding address [{}]", node);
                builder.add(node);
            }
            listedNodes = Collections.unmodifiableList(builder);
            nodesSampler.sample();
        }
        return this;
    }

    public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) {
        synchronized (mutex) {
            if (closed) {
                throw new IllegalStateException("transport client is closed, can't remove an address");
            }
            List<DiscoveryNode> listNodesBuilder = new ArrayList<>();
            for (DiscoveryNode otherNode : listedNodes) {
                if (!otherNode.getAddress().equals(transportAddress)) {
                    listNodesBuilder.add(otherNode);
                } else {
                    logger.debug("removing address [{}] from listed nodes", otherNode);
                }
            }
            listedNodes = Collections.unmodifiableList(listNodesBuilder);
            List<DiscoveryNode> nodesBuilder = new ArrayList<>();
            for (DiscoveryNode otherNode : nodes) {
                if (!otherNode.getAddress().equals(transportAddress)) {
                    nodesBuilder.add(otherNode);
                } else {
                    logger.debug("disconnecting from node with address [{}]", otherNode);
                    transportService.disconnectFromNode(otherNode);
                }
            }
            nodes = Collections.unmodifiableList(nodesBuilder);
            nodesSampler.sample();
        }
        return this;
    }

    //......

}
  • TransportClientNodesService定义了三个关于DiscoveryNode的List属性,分别是listedNodes、nodes、filteredNodes
  • addTransportAddresses方法会更新listedNodes,然后调用nodesSampler.sample()更新nodes及filteredNodes;removeTransportAddress方法会更新listedNodes,nodes,然后调用nodesSampler.sample()更新nodes及filteredNodes
  • listedNodes即为通过addTransportAddresses方法添加的node(一般是通过配置文件指定的clusterNodes);nodesSampler.sample()方法会对listedNodes进行进一步检测,比如将clusterName不是当前配置的clusterName的放到filteredNodes,剩下的再进行连接的建立,成功的放到nodes里头

TransportClient Nodes

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClient.java

public abstract class TransportClient extends AbstractClient {

    private final TransportClientNodesService nodesService;

    private final TransportProxyClient proxy;

    //......

    /**
     * Returns the current connected transport nodes that this client will use.
     * <p>
     * The nodes include all the nodes that are currently alive based on the transport
     * addresses provided.
     */
    public List<DiscoveryNode> connectedNodes() {
        return nodesService.connectedNodes();
    }

    /**
     * The list of filtered nodes that were not connected to, for example, due to
     * mismatch in cluster name.
     */
    public List<DiscoveryNode> filteredNodes() {
        return nodesService.filteredNodes();
    }

    /**
     * Returns the listed nodes in the transport client (ones added to it).
     */
    public List<DiscoveryNode> listedNodes() {
        return nodesService.listedNodes();
    }

    /**
     * Adds a transport address that will be used to connect to.
     * <p>
     * The Node this transport address represents will be used if its possible to connect to it.
     * If it is unavailable, it will be automatically connected to once it is up.
     * <p>
     * In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
     */
    public TransportClient addTransportAddress(TransportAddress transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }

    /**
     * Adds a list of transport addresses that will be used to connect to.
     * <p>
     * The Node this transport address represents will be used if its possible to connect to it.
     * If it is unavailable, it will be automatically connected to once it is up.
     * <p>
     * In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
     */
    public TransportClient addTransportAddresses(TransportAddress... transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }

    /**
     * Removes a transport address from the list of transport addresses that are used to connect to.
     */
    public TransportClient removeTransportAddress(TransportAddress transportAddress) {
        nodesService.removeTransportAddress(transportAddress);
        return this;
    }

    //......
}
  • TransportClient提供了connectedNodes、filteredNodes、listedNodes方法,可以看到它们内部都是调用的TransportClientNodesService对应的方法;从注释上可以看到,connectedNodes返回的是当前已经建立连接的nodes,供client端使用;filteredNodes返回的是因为clusterName不匹配导致被过滤掉的nodes,这些nodes不会被client使用;listedNodes返回的是通过addTransportAddresses添加的nodes

NodeSampler

ScheduledNodeSampler

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java

    TransportClientNodesService(Settings settings, TransportService transportService,
                                       ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
        super(settings);
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        this.transportService = transportService;
        this.threadPool = threadPool;
        this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();

        this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
        this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
        this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);

        if (logger.isDebugEnabled()) {
            logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
        }

        if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
            this.nodesSampler = new SniffNodesSampler();
        } else {
            this.nodesSampler = new SimpleNodeSampler();
        }
        this.hostFailureListener = hostFailureListener;
        this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
    }

    //......

    class ScheduledNodeSampler implements Runnable {
        @Override
        public void run() {
            try {
                nodesSampler.sample();
                if (!closed) {
                    nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);
                }
            } catch (Exception e) {
                logger.warn("failed to sample", e);
            }
        }
    }

    //......
  • TransportClientNodesService的构造器里头会根据settings的client.transport.sniff配置(默认是false)来判断是创建SniffNodesSampler还是SimpleNodeSampler,通过threadPool注册一个调度任务,每隔nodesSamplerInterval执行ScheduledNodeSampler;ScheduledNodeSampler实现了Runnable接口,其fun方法主要是调用nodesSampler.sample(),之后只要TransportClientNodesService没有close,则会继续注册调度任务,并更新nodesSamplerFuture

NodeSampler

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java

    abstract class NodeSampler {
        public void sample() {
            synchronized (mutex) {
                if (closed) {
                    return;
                }
                doSample();
            }
        }

        protected abstract void doSample();

        /**
         * Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
         * node returned in the handshake response is different than the discovery node.
         */
        List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
            for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
                DiscoveryNode node = it.next();
                if (!transportService.nodeConnected(node)) {
                    try {
                        logger.trace("connecting to node [{}]", node);
                        transportService.connectToNode(node);
                    } catch (Exception e) {
                        it.remove();
                        logger.debug(() -> new ParameterizedMessage("failed to connect to discovered node [{}]", node), e);
                    }
                }
            }

            return Collections.unmodifiableList(new ArrayList<>(nodes));
        }
    }
  • NodeSampler是个抽象类,它定义了sample方法,其内部是调用定义的抽象方法doSample;NodeSampler还提供了establishNodeConnections方法,它通过transportService.nodeConnected(node)来判断node是否是connected的,如果不是则会通过transportService.connectToNode(node)再尝试连接一次,如果抛异常则将该节点移除掉,最后返回这次检测是connected的nodes;它有两个子类,分别是SimpleNodeSampler、SniffNodesSampler

SimpleNodeSampler

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java

    class SimpleNodeSampler extends NodeSampler {

        @Override
        protected void doSample() {
            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
            for (DiscoveryNode listedNode : listedNodes) {
                try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
                    final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
                        new FutureTransportResponseHandler<LivenessResponse>() {
                            @Override
                            public LivenessResponse read(StreamInput in) throws IOException {
                                LivenessResponse response = new LivenessResponse();
                                response.readFrom(in);
                                return response;
                            }
                        });
                    transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
                        TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                        handler);
                    final LivenessResponse livenessResponse = handler.txGet();
                    if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
                        logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
                        newFilteredNodes.add(listedNode);
                    } else {
                        // use discovered information but do keep the original transport address,
                        // so people can control which address is exactly used.
                        DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
                        newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
                            nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
                            nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
                    }
                } catch (ConnectTransportException e) {
                    logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
                    hostFailureListener.onNodeDisconnected(listedNode, e);
                } catch (Exception e) {
                    logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
                }
            }

            nodes = establishNodeConnections(newNodes);
            filteredNodes = Collections.unmodifiableList(newFilteredNodes);
        }
    }
  • SimpleNodeSampler的doSample方法会对nodes进行更进一步的存活检测,主要是发送LivenessRequest,如果能成功返回LivenessResponse,则判断clusterName是否一致,不一致的添加到newFilteredNodes,最后赋值给filteredNodes;一致的添加到newNodes中,最后通过establishNodeConnections方法建立连接并移除连接失败的node(重试一次)最后赋值给nodes

SniffNodesSampler

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java

    class SniffNodesSampler extends NodeSampler {

        @Override
        protected void doSample() {
            // the nodes we are going to ping include the core listed nodes that were added
            // and the last round of discovered nodes
            Set<DiscoveryNode> nodesToPing = new HashSet<>();
            for (DiscoveryNode node : listedNodes) {
                nodesToPing.add(node);
            }
            for (DiscoveryNode node : nodes) {
                nodesToPing.add(node);
            }

            final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
            final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
            try {
                for (final DiscoveryNode nodeToPing : nodesToPing) {
                    threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {

                        /**
                         * we try to reuse existing connections but if needed we will open a temporary connection
                         * that will be closed at the end of the execution.
                         */
                        Transport.Connection connectionToClose = null;

                        void onDone() {
                            try {
                                IOUtils.closeWhileHandlingException(connectionToClose);
                            } finally {
                                latch.countDown();
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            onDone();
                            if (e instanceof ConnectTransportException) {
                                logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e);
                                hostFailureListener.onNodeDisconnected(nodeToPing, e);
                            } else {
                                logger.info(() -> new ParameterizedMessage(
                                        "failed to get local cluster state info for {}, disconnecting...", nodeToPing), e);
                            }
                        }

                        @Override
                        protected void doRun() throws Exception {
                            Transport.Connection pingConnection = null;
                            if (nodes.contains(nodeToPing)) {
                                try {
                                    pingConnection = transportService.getConnection(nodeToPing);
                                } catch (NodeNotConnectedException e) {
                                    // will use a temp connection
                                }
                            }
                            if (pingConnection == null) {
                                logger.trace("connecting to cluster node [{}]", nodeToPing);
                                connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
                                pingConnection = connectionToClose;
                            }
                            transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
                                Requests.clusterStateRequest().clear().nodes(true).local(true),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
                                    .withTimeout(pingTimeout).build(),
                                new TransportResponseHandler<ClusterStateResponse>() {

                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }

                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }

                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(nodeToPing, response);
                                        onDone();
                                    }

                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info(() -> new ParameterizedMessage(
                                                "failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
                                        try {
                                            hostFailureListener.onNodeDisconnected(nodeToPing, e);
                                        } finally {
                                            onDone();
                                        }
                                    }
                                });
                        }
                    });
                }
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            HashSet<DiscoveryNode> newNodes = new HashSet<>();
            HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
            for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
                if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                    logger.warn("node {} not part of the cluster {}, ignoring...",
                            entry.getValue().getState().nodes().getLocalNode(), clusterName);
                    newFilteredNodes.add(entry.getKey());
                    continue;
                }
                for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
                    newNodes.add(cursor.value);
                }
            }

            nodes = establishNodeConnections(newNodes);
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        }
    }
  • SniffNodesSampler的doSample方法首先将listedNodes及nodes合并为名为nodesToPing的Set,之后就挨个将nodesToPing的node放入到线程池异步执行检测,这里通过CountDownLatch来等待所有节点异步执行完毕
  • 异步线程池检测的逻辑是对node发送Requests.clusterStateRequest().clear().nodes(true).local(true)请求,如果成功则返回ClusterStateResponse,并添加到clusterStateResponses这个ConcurrentMap中
  • 之后遍历clusterStateResponses这个ConcurrentMap,clusterName不一致的node添加到newFilteredNodes,最后赋值给filteredNodes;clusterName一致的则遍历ClusterStateResponse.getState().nodes().getDataNodes().values(),将这些node添加到newNodes,最后通过establishNodeConnections方法建立连接并移除连接失败的node(重试一次)最后赋值给nodes

TransportClientNodesService.execute

elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java

final class TransportClientNodesService extends AbstractComponent implements Closeable {

    private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());

    //......

    public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
        // we first read nodes before checking the closed state; this
        // is because otherwise we could be subject to a race where we
        // read the state as not being closed, and then the client is
        // closed and the nodes list is cleared, and then a
        // NoNodeAvailableException is thrown
        // it is important that the order of first setting the state of
        // closed and then clearing the list of nodes is maintained in
        // the close method
        final List<DiscoveryNode> nodes = this.nodes;
        if (closed) {
            throw new IllegalStateException("transport client is closed");
        }
        ensureNodesAreAvailable(nodes);
        int index = getNodeNumber();
        RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
        DiscoveryNode node = retryListener.getNode(0);
        try {
            callback.doWithNode(node, retryListener);
        } catch (Exception e) {
            try {
                //this exception can't come from the TransportService as it doesn't throw exception at all
                listener.onFailure(e);
            } finally {
                retryListener.maybeNodeFailed(node, e);
            }
        }
    }

    private void ensureNodesAreAvailable(List<DiscoveryNode> nodes) {
        if (nodes.isEmpty()) {
            String message = String.format(Locale.ROOT, "None of the configured nodes are available: %s", this.listedNodes);
            throw new NoNodeAvailableException(message);
        }
    }

    private int getNodeNumber() {
        int index = randomNodeGenerator.incrementAndGet();
        if (index < 0) {
            index = 0;
            randomNodeGenerator.set(0);
        }
        return index;
    }

    public static class RetryListener<Response> implements ActionListener<Response> {
        private final NodeListenerCallback<Response> callback;
        private final ActionListener<Response> listener;
        private final List<DiscoveryNode> nodes;
        private final int index;
        private final TransportClient.HostFailureListener hostFailureListener;

        private volatile int i;

        RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
                      List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
            this.callback = callback;
            this.listener = listener;
            this.nodes = nodes;
            this.index = index;
            this.hostFailureListener = hostFailureListener;
        }

        @Override
        public void onResponse(Response response) {
            listener.onResponse(response);
        }

        @Override
        public void onFailure(Exception e) {
            Throwable throwable = ExceptionsHelper.unwrapCause(e);
            if (throwable instanceof ConnectTransportException) {
                maybeNodeFailed(getNode(this.i), (ConnectTransportException) throwable);
                int i = ++this.i;
                if (i >= nodes.size()) {
                    listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
                } else {
                    try {
                        callback.doWithNode(getNode(i), this);
                    } catch(final Exception inner) {
                        inner.addSuppressed(e);
                        // this exception can't come from the TransportService as it doesn't throw exceptions at all
                        listener.onFailure(inner);
                    }
                }
            } else {
                listener.onFailure(e);
            }
        }

        final DiscoveryNode getNode(int i) {
            return nodes.get((index + i) % nodes.size());
        }

        final void maybeNodeFailed(DiscoveryNode node, Exception ex) {
            if (ex instanceof NodeDisconnectedException || ex instanceof NodeNotConnectedException) {
                hostFailureListener.onNodeDisconnected(node, ex);
            }
        }
    }

    //......
}
  • TransportClientNodesService提供的execute方法主要是做了两个事情,一个是对nodes节点进行客户端的负载均衡,一个是通过RetryListener对请求增加重试机制
  • ensureNodesAreAvailable方法首先确保nodes这个列表不为空,如果为空则抛出NoNodeAvailableException;之后通过getNodeNumber方法来确定index值,该方法使用randomNodeGenerator递增得到index,如果index大于等于0则返回,如果index小于0则重置randomNodeGenerator的值为0并返回0;这里randomNodeGenerator是AtomicInteger类型,其初始值为Randomness.get().nextInt()
  • RetryListener的构造器接收上一步计算出来的index值,它有一个i变量,初始为0,在onFailure的时候,如果是ConnectTransportException异常,则会进行重试,重试的时候首先将i递增,之后判断如果i>=nodes大小则停止重试,抛出NoNodeAvailableException,否则继续调用callback.doWithNode进行重试,重试时是通过getNode方法获取node,同时传入当前的listener;getNode方法采取的是(index + i) % nodes.size()来获取node的index,形成Round Robin的效果;对于RetryListener来说,内部重试时i会递增,对于execute方法来说,index值也是递增的,因而无论请求成功还是失败,对nodes的方法都形成Round Robin的效果

小结

  • TransportProxyClient主要是提供了execute方法,该方法从proxies取出对应的TransportActionNodeProxy,然后通过TransportClientNodesService的execute方法来执行proxy.execute方法;TransportActionNodeProxy提供了execute方法,它的方法参数要求输入DiscoveryNode、Request、ActionListener,该方法主要是对ActionListener包装为ActionListenerResponseHandler,然后调用transportService.sendRequest
  • TransportClientNodesService定义了三个关于DiscoveryNode的List属性,分别是listedNodes、nodes、filteredNodes;其中listedNodes是通过addTransportAddresses添加的nodes;nodes是当前已经建立连接的node列表,供client端使用;filteredNodes是因为clusterName不匹配导致被过滤掉的nodes,这些nodes不会被client使用
  • TransportClientNodesService的构造器里头会根据settings的client.transport.sniff配置(默认是false)来判断是创建SniffNodesSampler还是SimpleNodeSampler,通过threadPool注册一个调度任务,每隔nodesSamplerInterval执行ScheduledNodeSampler;ScheduledNodeSampler实现了Runnable接口,其fun方法主要是调用nodesSampler.sample(),之后只要TransportClientNodesService没有close,则会继续注册调度任务,并更新nodesSamplerFuture
  • NodeSampler是个抽象类,它定义了sample方法,其内部是调用定义的抽象方法doSample;NodeSampler还提供了establishNodeConnections方法,它通过transportService.nodeConnected(node)来判断node是否是connected的,如果不是则会通过transportService.connectToNode(node)再尝试连接一次,如果抛异常则将该节点移除掉,最后返回这次检测是connected的nodes;它有两个子类,分别是SimpleNodeSampler、SniffNodesSampler
  • SimpleNodeSampler的doSample方法会对nodes进行更进一步的存活检测,主要是发送LivenessRequest,如果能成功返回LivenessResponse,则判断clusterName是否一致,不一致的添加到newFilteredNodes,最后赋值给filteredNodes;一致的添加到newNodes中,最后通过establishNodeConnections方法建立连接并移除连接失败的node(重试一次)最后赋值给nodes
  • SniffNodesSampler的doSample方法首先将listedNodes及nodes合并为名为nodesToPing的Set,之后就挨个将nodesToPing的node放入到线程池异步执行检测,这里通过CountDownLatch来等待所有节点异步执行完毕;异步线程池检测的逻辑是对node发送Requests.clusterStateRequest().clear().nodes(true).local(true)请求,如果成功则返回ClusterStateResponse,并添加到clusterStateResponses这个ConcurrentMap中;之后遍历clusterStateResponses这个ConcurrentMap,clusterName不一致的node添加到newFilteredNodes,最后赋值给filteredNodes;clusterName一致的则遍历ClusterStateResponse.getState().nodes().getDataNodes().values(),将这些node添加到newNodes,最后通过establishNodeConnections方法建立连接并移除连接失败的node(重试一次)最后赋值给nodes
  • TransportClientNodesService提供的execute方法主要是做了两个事情,一个是对nodes节点进行客户端的负载均衡,一个是通过RetryListener对请求增加重试机制;其对nodes的负载均衡策略为Round Robin,而RetryListener只对ConnectTransportException异常进行重试,最大重试次数为nodes.size()-1

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容