Elasticsearch 获取写入Doc shardID的源码分析

前言

平时在研究ES的分布式Doc(文档)写入操作时,我们已经知道对将要写入的Doc,ES首先会计算其应该写入到索引的哪个分片,然后在根据集群metaData中的路由信息判断此分片所在的ES节点,最后将写入请求发送到这个节点并完成最终的写入操作。写入流程说明如下:

接下来我们主要研究步骤2中谈到的(节点使用文档的_id确定文档属于分片0)这个地方的源码实现,看看ES内部确定一个Doc应该被写入的Shard的具体实现逻辑

源码分析

当前ES版本为5.6.16,确定待写入Doc的Shard编号的主要代码部分如下:

1. # TransportBulkAction.java

protected void doRun() throws Exception {
    final ClusterState clusterState = observer.setAndGetObservedState();
    if (handleBlockExceptions(clusterState)) {
        return;
    }
    final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
    MetaData metaData = clusterState.metaData();
 
   Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
   for (int i = 0; i < bulkRequest.requests.size(); i++) {
    DocWriteRequest request = bulkRequest.requests.get(i);
    if (request == null) {
        continue;
    }
    String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
    // 根据路由,找出doc写入的目标shard id
    ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
    List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
    shardRequests.add(new BulkItemRequest(i, request));
    }   
}

    
1. # OperationRouting.java
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
    return shards(clusterState, index, id, routing).shardsIt();
}

protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
    int shardId = generateShardId(indexMetaData(clusterState, index), id, routing);
    return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}

static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id;
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
} 

TransportBulkAction类的doRun()方法中,ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); 这行代码获取最终Doc的ShardID信息。clusterService.operationRouting()方法返回OperationRouting对象,然后紧接着调用其indexShards(...)方法,接着进入shards(...)方法,最后可看到int shardId = generateShardId(indexMetaData(clusterState, index), id, routing); 这行代码。这里最终得到分片编号shardId,所以我们重点关注的逻辑就在generateShardId(...)方法中。generateShardId(...)方法接受indexMetaData(索引元数据)、id(文档Doc的id号,即为此次写入请求的id号)、routing(写入时自定义的routing信息)。下面我们重点看下generateShardId(...)方法内部的逻辑。

static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id;
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

首先方法内声明了String类型的effectiveRouting与int类型的partitionOffset。ES在Doc写入操作时通常有两种方式:一种是指定routing,另一种是不指定routing。

指定routing的写入方式类似为:
POST my_index/doc?routing=tony
{
"name": "tony",
"age": 10
}
routing的设置可以使得写入的数据分布到当前索引下的具体的某些分片中,引入routing机制也是为了更好的搜索性能,使得遍历的分片范围可以进一步的缩小;当然同时要面临着数据分布倾斜的风险。在routing机制下ES提供了一个有意义的设置项index.routing_partition_size,此参数在索引创建时结合着routing一起使用。其意义是使得写入的数据能够集中的落入到routing_partition_size个分片集合中。比如索引my_index包含3个分片,若此时routing_partition_size的值设为2,那经过routing写入到my_index的数据只会落入其中的两个分片,而另一个会处于闲置状态。ES官网指出routing_partition_size的值通常设置为大于1且小于number_of_shards。

当写入时不带有routing机制(对应到代码routing==null, effectiveRouting=id),此时数据会经过hash(doc_id) % number_primary_shards的方式均匀的写入到各个主分片中;通过routing机制写入,想要达到数据分布均匀,则上一种计算公式就不能满足条件了,需要结合doc_id以及routing值重新计算。只是平时大部分的时候我们在写入ES时并没有指定routing,在ES内部处理上默认会把doc_id当做_routing,因此我们对hash(doc_id) % number_primary_shards这个公式比较熟悉。带有routing的写入,effectiveRouting被赋予routing值。接下来代码中会判断当前索引是否设置了routing_partition_size选项,若存在则partitionOffset = hash(doc_id) % routing_partition_size值,否则partitionOffset=0。接着到了calculateScaledShardId(...)方法,方法如下:

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

方法中两行代码,本质上对应着ES官网上的公式shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards;hash(_routing) + hash(_id) % routing_partition_size等价于Murmur3HashFunction.hash(effectiveRouting) + partitionOffset,然后再对num_primary_shards做取模运算。整个公式的意思即_routing字段用于计算索引内的分片子集,然后_id用于选择该分片子集内的一个分片。这样就完整的结合了routing与doc_id信息计算出具体的分片编号。认真分析代码我们会发现以下两点可疑的地方:

  • 取模计算使用的是indexMetaData.getRoutingNumShards这个值,而不是number_of_primary对应的值
  • 取模计算后接着又做了除以indexMetaData.getRoutingFactor的除法运算

为啥此处要这样做呢?经过代码注释与实践发现,这个其实是包含索引shrink功能的计算方法。索引shrink允许我们将一个索引由比如原来的8个分片,shrink成为4、2、1三种数量的分片索引,是一个比较有用的功能。关于factor,这里做个简单的说明,比如数字6,存在6、3、2、1四个因子(Factor)。从索引shrink的角度看factor,比如8个shards同时存在4、2、1三个factor(8意义不大),所以indexMetaData.getRoutingFactor的值获取的就是这个因子数。另外这里一个重要的知识点是假定一个包含m(偶数)个分片的索引A,经过shrink之后(假定shrink为m/2个分片,自然factor=2)变为索引B,但此时索引B的getRoutingNumShards值依然为m,而非m/2。有了这个知识点作铺垫之后,我们就理解了为啥整个公式的计算结果后面要除以indexMetaData.getRoutingFactor的值了。因为公式中除数getRoutingNumShards没有做同步的减小,因此中间的计算结果需要同步除以getRoutingFactor的值。通常索引的getRoutingFactor的值默认为1,这个能够理解,因为通常索引都是没有做shrink操作的。到此,我们就分析完了shardID的整个计算过程了,计算的本质没有变化,因为要考虑routing以及shrink的功能,所以计算公式稍微变得复杂了些。

小结

到此结合着代码,我们分析完了ES内部计算一个将要写入的Doc对应的分片编号的整个过程。计算的本质当然是为了使得数据能够均匀的分布在满足条件的每个分片上。为了友好的支持其他的功能,计算会综合考虑到其他的一些影响因素,比如shrink,routing。但计算的本质没有发生变化。对于routing与shrink功能,文章中没有贴出具体的详细的实践步骤,这块希望大家后面动手实践起来,同时也结合着代码一起研究起来,一起学习ES,一起进步。

引用
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343