Elasticsearch客户端源码分析

背景

简单的介绍一下客户端启动的大体流程,在这里客户端与ES集群连接的方式是通过TCP的形式连接,本次分析基于Elasticsearch-2.3.4的版本。


初始化

想要通过TCP的形式连接到ES集群,需要指定几个参数:

  • cluster.name:ES集群的名称
  • 初始化连接节点IP和端口(可多个)
  • client.transport.sniff:是否允许客户端嗅探集群状态,并把集群中的所有节点地址加入到客户端,这样做的话就不再需要手动指定所有的节点地址

初始化代码如下:

Settings settings = Settings.settingsBuilder().put("cluster.name", ES_CLUSTER_NAME).put("client.transport.sniff", true).build();
client = TransportClient.builder().settings(settings).build();
for (String esAddressPerNode : ES_ADDRESS.split("\\,")) {
   try {
      //添加初始化节点
      client = ((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esAddressPerNode), ES_PORT));
   } catch (UnknownHostException e) {
      log.warn("client.addTransportAddress error", e);
   }
}
log.info("connectedNodes:" + ((TransportClient)client).connectedNodes());

可以看到ES通过builder模式来初始化客户端,我们简单的跟到build()方法进去,看看ES是怎么构造TransportClient的:

public TransportClient build() {
    Settings settings = InternalSettingsPreparer.prepareSettings(this.settings);
    settings = settingsBuilder()
            .put(NettyTransport.PING_SCHEDULE, "5s") // enable by default the transport schedule ping interval
            .put(settings)
            .put("network.server", false) //是否作为一个Netty服务端启动
            .put("node.client", true) //该节点是客户端节点,ES集群中另外还会有其他三种类型的节点:master、data、ingest(5.0版本才支持的,用于加工转换index前的原数据)
            .put(CLIENT_TYPE_SETTING, CLIENT_TYPE) //客户端模式=transport
            .build();
 
    PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses);
    this.settings = pluginsService.updatedSettings();
 
    Version version = Version.CURRENT;
 
    final ThreadPool threadPool = new ThreadPool(settings); //初始化客户端线程池,客户端对ES的各类请求都独立创建了一个线程池,做到一定程度上的隔离
    NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
 
    boolean success = false;
    try {
        //开始添加客户端初始化需要的模块
        ModulesBuilder modules = new ModulesBuilder();
        modules.add(new Version.Module(version));
        // plugin modules must be added here, before others or we can get crazy injection errors...
        for (Module pluginModule : pluginsService.nodeModules()) {
            modules.add(pluginModule);
        }
        modules.add(new PluginsModule(pluginsService));
        modules.add(new SettingsModule(this.settings));
        modules.add(new NetworkModule(namedWriteableRegistry));
        modules.add(new ClusterNameModule(this.settings));
        modules.add(new ThreadPoolModule(threadPool));
        modules.add(new TransportModule(this.settings, namedWriteableRegistry));
        modules.add(new SearchModule() {
            @Override
            protected void configure() {
                // noop
            }
        });
        modules.add(new ActionModule(true));
        modules.add(new ClientTransportModule());
        modules.add(new CircuitBreakerModule(this.settings));
 
        pluginsService.processModules(modules);
 
        Injector injector = modules.createInjector();
        //利用Guice创建TransportService
        final TransportService transportService = injector.getInstance(TransportService.class);
        transportService.start(); //启动transportService,放到下面讲,相当于调用TransportService.doStart()
        transportService.acceptIncomingRequests(); //放开阻塞请求的CountDownLatch
 
        TransportClient transportClient = new TransportClient(injector); //实例化Client返回
        success = true;
        return transportClient;
    } finally {
        if (!success) {
            ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
        }
    }
}
  
TransportService.class {
    protected void doStart() {
        ...
        transport.start();  //在这里transport=NettyTransport,所以我们接着看NettyTransport启动做了什么
        ...
    }
}
  
NettyTransport.class {
    protected void doStart() {
        boolean success = false;
        try {
            clientBootstrap = createClientBootstrap(); //创建Netty的客户端启动器
            if (settings.getAsBoolean("network.server", true)) { //客户端同样是Netty的客户端,可以从上文看到network.server=false
                ...
                ...
            }
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }
  
    private ClientBootstrap createClientBootstrap() {
        if (blockingClient) {
            clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings,                 TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
        } else { //默认都是NIO模式
            int bossCount = settings.getAsInt("transport.netty.boss_count", 1);
            clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
                bossCount,
                new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
                new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
            ... //Netty的一些启动配置
            return clientBootstrap; //到这里还没绑定绑定本地端口
        }
    }
}
//最后是实例化TransportClient
TransportClient.class {
    private TransportClient(Injector injector) {
        super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
        this.injector = injector;
        nodesService = injector.getInstance(TransportClientNodesService.class);
        proxy = injector.getInstance(TransportProxyClient.class);
    }
}

添加节点

从前面可以看到我们通过TransportClient.addTransportAddress(TransportAddress transportAddress)来实现的,具体深入看看细节点:

TransportClient.class {
    private final TransportClientNodesService nodesService;
    private final TransportProxyClient proxy;
    public TransportClient addTransportAddress(TransportAddress transportAddress) {
        nodesService.addTransportAddresses(transportAddress);
        return this;
    }
}
  
//再往下看TransportClientNodesService的实现
TransportClientNodesService.class {
    public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
        synchronized (mutex) {
            List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
            for (TransportAddress transportAddress : transportAddresses) {
                boolean found = false;
                for (DiscoveryNode otherNode : listedNodes) {
                    if (otherNode.address().equals(transportAddress)) {
                        found = true;
                        logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                        break;
                    }
                }
                if (!found) {
                    filtered.add(transportAddress);
                }
            }
            if (filtered.isEmpty()) {
                return this;
            }
            List<DiscoveryNode> builder = new ArrayList<>();
            builder.addAll(listedNodes());
            for (TransportAddress transportAddress : filtered) {
                DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion);
                logger.debug("adding address [{}]", node);
                builder.add(node);
            }
            listedNodes = Collections.unmodifiableList(builder);
            //上面仅仅只是简单的添加初始化的集群节点而已,随后客户端开始嗅探集群中的其他节点
            nodesSampler.sample();
        }
        return this;
    }
}
  
//接着看看nodesSampler.sample()的实现,调用了SniffNodesSampler.doSample()
class SniffNodesSampler extends NodeSampler {
 
    @Override
    protected void doSample() {
        //listedNodes表示初始化时用户手动指定的节点
        //nodes表示上一轮更新的集群节点
        Set<DiscoveryNode> nodesToPing = Sets.newHashSet();
        for (DiscoveryNode node : listedNodes) {
            nodesToPing.add(node);
        }
        for (DiscoveryNode node : nodes) {
            nodesToPing.add(node);
        }
 
        final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
        final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
        for (final DiscoveryNode listedNode : nodesToPing) {
            threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (!transportService.nodeConnected(listedNode)) { //如果还没有与节点建立连接
                            try {
                                if (nodes.contains(listedNode)) {
                                    logger.trace("connecting to cluster node [{}]", listedNode);
                                    transportService.connectToNode(listedNode); //建立五组连接,五种请求类型的连接
                                } else {
                                    logger.trace("connecting to listed node (light) [{}]", listedNode);
                                    transportService.connectToNodeLight(listedNode); //作为轻连接,仅仅建立一个连接,所有请求类型都用这个连接
                                }
                            } catch (Exception e) {
                                logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
                                latch.countDown();
                                return;
                            }
                        }
                        //向每个节点请求集群节点信息,在这里特意说一点:ES节点在收到集群状态信息的请求都会统一再转给Master节点,只有Master才能修改和发布集群状态信息
                        transportService.sendRequest(listedNode, ClusterStateAction.NAME,
                                headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
                                TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                                new BaseTransportResponseHandler<ClusterStateResponse>() {
 
                                    @Override
                                    public ClusterStateResponse newInstance() {
                                        return new ClusterStateResponse();
                                    }
 
                                    @Override
                                    public String executor() {
                                        return ThreadPool.Names.SAME;
                                    }
 
                                    @Override
                                    public void handleResponse(ClusterStateResponse response) {
                                        clusterStateResponses.put(listedNode, response);
                                        latch.countDown();
                                    }
 
                                    @Override
                                    public void handleException(TransportException e) {
                                        logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode);
                                        transportService.disconnectFromNode(listedNode);
                                        latch.countDown();
                                    }
                                });
                    } catch (Throwable e) {
                        logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
                        transportService.disconnectFromNode(listedNode);
                        latch.countDown();
                    }
                }
            });
        }
 
        try {
            latch.await();
        } catch (InterruptedException e) {
            return;
        }
 
        HashSet<DiscoveryNode> newNodes = new HashSet<>(); //新的集群所有节点信息
        HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>(); //非本ES集群的节点
        for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
            if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                logger.warn("node {} not part of the cluster {}, ignoring...", entry.getValue().getState().nodes().localNode(), clusterName);
                newFilteredNodes.add(entry.getKey());
                continue;
            }
            for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().dataNodes().values()) {
                newNodes.add(cursor.value);
            }
        }
 
        nodes = validateNewNodes(newNodes); //校验新的集群节点,对于没有建立过连接的节点建立连接,失败就移除
        filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
        //到此我们就已经完成在初始化添加节点的时候完成嗅探集群中的其他节点
    }
  
    //TransportClientNodesService在实例化的时候也启动了一个定时任务,用来定时更新本地的集群节点信息,默认是5s一次
    class ScheduledNodeSampler implements Runnable {
        @Override
        public void run() {
            try {
                nodesSampler.sample(); //调用上文的SniffNodesSampler.doSample()
            } catch (Exception e) {
                logger.warn("failed to sample", e);
            }
        }
    }
}

再来具体说说与节点建立连接的时候,区分"轻"与"重"的连接方式:

  • 轻连接的方式:轻连接方式的连接,往往只是客户端初始化时连接ES集群最初的若干个节点,用来嗅探集群中的其他节点
protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
    InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    ChannelFuture connect = clientBootstrap.connect(address);
    connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
    if (!connect.isSuccess()) {
        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.getCause());
    }
    Channel[] channels = new Channel[1];
    channels[0] = connect.getChannel();
    channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
    return new NodeChannels(channels, channels, channels, channels, channels);
}
  • 重连接的方式:之所以需要区分类型,主要是因为对于大数据量的请求耗时往往较高,可能会影响一些普通的请求,因此需要做到一定的资源隔离
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
    //建立五组不同请求类型的连接
    ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; //size=2
    ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; //size=3
    ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; //size=6
    ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; //size=1
    ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; //size=1
    InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    for (int i = 0; i < connectRecovery.length; i++) {
        connectRecovery[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectBulk.length; i++) {
        connectBulk[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectReg.length; i++) {
        connectReg[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectState.length; i++) {
        connectState[i] = clientBootstrap.connect(address);
    }
    for (int i = 0; i < connectPing.length; i++) {
        connectPing[i] = clientBootstrap.connect(address);
    }
 
    ...
}

负载均衡

通过轮询(Round Robbin)的方式从"重连接"的节点列表里发送请求,代码如下:

private final AtomicInteger randomNodeGenerator = new AtomicInteger();
  
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
    List<DiscoveryNode> nodes = this.nodes;
    int index = getNodeNumber();
    DiscoveryNode node = nodes.get((index) % nodes.size());
    ...
}
  
private int getNodeNumber() {
    int index = randomNodeGenerator.incrementAndGet();
    if (index < 0) {
        index = 0;
        randomNodeGenerator.set(0);
    }
    return index;
}

总结

到这,我们大概梳理了一遍ES客户端的启动过程,文中如果有写的不对的地方还请指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容