0.前言
如果想更新索引的某条文档数据,可以通过如下几种方式:
(1)构造document的完整field和数据,然后使用Elasticsearch的Index API重新创建索引
curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}'
(2)构造document的部分field和数据,然后使用Elasticsearch 的Update API 更新索引
curl -XPOST 'localhost:9200/twitter/tweet/1/_update' -d '{
"doc" : {
"user" : "new_name"
},
"doc_as_upsert" : true
}'
本篇文章讨论的是第二种方式,该请求对应的url pattern为 /{index}/{type}/{id}/_update
,且仅支持Post 请求
1 请求Handler
update url对应的handler为RestUpdateAction,支持的参数如下:
retry_on_conflict
: 控制在最终抛出异常之前重试update的次数。
routing
: routing用于将更新请求路由到正确的分片上,并在更新的文档不存在时为upsert请求设置routing。不能用于更新已经存在文档的routing。
parent
: parent用于将更新请求路由到正确的分片上,并在更新的文档不存在时为upsert请求设置parent。不能用于更新已经存在文档的parent。
timeout
: 等待shard变为可用的超时时间。
consistency
: 索引/删除操作的写入一致性。
refresh
: 在操作发生后立即刷新相关的主分片和副本分片(而不是整个索引),以便更新的文档立即显示在搜索结果中。
fields
: 返回更新文档中的相关字段。指定_source返回完整更新的source。
version & version_type
: update API在内部使用Elasticsearch的版本控制支持,以确保在更新期间文档不会更改。
解析完请求参数后,会调用client.update()更新索引,传入的action参数值为UpdateAction.INSTANCE,它绑定的action为TransportUpdateAction类。
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
registerAction(UpdateAction.INSTANCE,TransportUpdateAction.class);
}
}
因此,在执行TransportAction.execute() 时,会执行TransportUpdateAction.doExecute()方法,即为处理请求的Action入口
2 处理 Action
在更新索引前,使用shouldAutoCreate()方法判断是否需要创建索引
public boolean shouldAutoCreate(String index, ClusterState state) {
// action.auto_create_index 是 false
// 不再继续检查, 不创建索引
if (!needToCheck) {
return false;
}
// 如果索引或者别名中已经包含了index
if (state.metaData().hasConcreteIndex(index)) {
return false;
}
// action.auto_create_index 是 false
if (globallyDisabled) {
return false;
}
// matches not set, default value of "true"
// action.auto_create_index 是 null 或者是 true 或者是 false
if (matches == null) {
return true;
}
// 正则条件判断index是否满足action.auto_create_index
for (int i = 0; i < matches.length; i++) {
char c = matches[i].charAt(0);
if (c == '-') {
if (Regex.simpleMatch(matches2[i], index)) {
return false;
}
} else if (c == '+') {
if (Regex.simpleMatch(matches2[i], index)) {
return true;
}
} else {
if (Regex.simpleMatch(matches[i], index)) {
return true;
}
}
}
return false;
}
①如果需要创建索引,则,调用TransportCreateIndexAction.execute()方法创建索引,然后执行innerExecute()方法更新文档
②否则直接调用innerExecute()更新文档
在更新文档前,需要获取文档索引所在的主分片信息,然后请请求发送到对应分片的节点上,执行shardOperation()
class AsyncSingleAction {
protected boolean doStart() throws ElasticsearchException {
// 集群的节点信息
nodes = observer.observedState().nodes();
try {
// 获取要操作的primary shard
shardIt = shards(observer.observedState(), internalRequest);
} catch (Throwable e) {
}
// this transport only make sense with an iterator that returns a single shard routing (like primary)
// 由于一个存在的文档, 必然最多只需要一个主分片
assert shardIt.size() == 1;
internalRequest.request().shardId = shardIt.shardId().id();
// 如果shard 所在的节点为当前节点
if (shard.currentNodeId().equals(nodes.localNodeId())) {
internalRequest.request().beforeLocalFork();
try {
// 使用线程池方式执行操作
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
// 抽象方法, 需要子类实现
shardOperation(internalRequest, listener);
} catch (Throwable e) {
// ...
}
}
});
} catch (Throwable e) {
// ...
}
} else {
// 如果是远程节点, 需要将请求发送到对应节点
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions(), new BaseTransportResponseHandler<Response>() {
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
}
}
3 执行请求
执行索引operation的过程,主要是
①先构造索引请求,即先获取已经存在的文档信息
②merge已有文档和待更新的数据,或者执行请求中的脚本,获取完整的doc信息
③执行TransportIndexAction.execute() 创建索引重新创建文档,或者TransportDeleteAction.execute() 删除文档
/**
* shard 操作逻辑
* @param request InternalRequest
* @param listener ActionListener
* @param retryCount retryCount 重试次数
* @throws ElasticsearchException Elasticsearch 异常
*/
protected void shardOperation(final InternalRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) throws ElasticsearchException {
// 获取index 对应的service
IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
// 根据shard id 获取对应的IndexShard
IndexShard indexShard = indexService.shardSafe(request.request().shardId());
// 对更新请求进行转换, 获取最终要更新的索引信息
final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard);
switch (result.operation()) {
case UPSERT:
// 构造索引请求, 将result.action()中的type id routing 和source 拷贝到index request 对象中
IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action(), request.request());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
// 执行TransportIndexAction.execute(), 创建索引文档
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
// 处理请求
});
break;
case INDEX:
IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action(), request.request());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
// 处理请求
});
break;
case DELETE:
DeleteRequest deleteRequest = new DeleteRequest((DeleteRequest)result.action(), request.request());
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
// 执行请求
});
break;
case NONE:
// ...
break;
default:
throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation());
}
}
在上面代码中,final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard); 主要用来对更新请求进行转换, 获取最终要更新的索引信息
合并待更新数据和已经存在的数据的策略,主要是覆盖和补充更新的方式:
/**
将提供的数据变更(changes)更新到source中
1 已经存在source的字段将会被提供的变更数据(chanes)覆盖掉
2 不存source在的字段会被提供的变更数据(changes)补充更新
**/
public static boolean update(Map<String, Object> source, Map<String, Object> changes, boolean checkUpdatesAreUnequal) {
boolean modified = false;
for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
if (!source.containsKey(changesEntry.getKey())) {
// safe to copy, change does not exist in source
source.put(changesEntry.getKey(), changesEntry.getValue());
modified = true;
continue;
}
Object old = source.get(changesEntry.getKey());
if (old instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
modified |= update((Map<String, Object>) source.get(changesEntry.getKey()),
(Map<String, Object>) changesEntry.getValue(), checkUpdatesAreUnequal && !modified);
continue;
}
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
if (modified) {
continue;
}
if (!checkUpdatesAreUnequal) {
modified = true;
continue;
}
modified = !Objects.equal(old, changesEntry.getValue());
}
return modified;
}
4 处理异常
在更新lucene索引时,会先检查获取的文档版本和索引中当前文档版本是否冲突,
版本类型分为如下4种:
INTERNAL
EXTERNAL
EXTERNAL_GTE
FORCE
对于每种类型,都有不同的判断冲突的标准
private void innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
// 获取当前版本号 currentVersion
final long currentVersion;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
// 如果version map 中没有拿到当前version, 则需要从reader 中获取当前version
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
// 判断版本是否待删除
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
// 更新后的Version
long updatedVersion;
// 待更新索引的版本号
long expectedVersion = index.version();
// 使用当前version 和待更新索引version 判断是否存在版本冲突
// 判断条件为:
// INTERNAL 为当前版本和待更新版本不一致
// EXTERNAL 为当前版本大于等于待更新版本
// EXTERNAL_GTE 为当前版本大于待更新版本
// FORCE 为待更新版本未指定
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
// INTERNAL 如果当前版本为未找到或者未设置, 则为1, 否则为当前版本+1
// EXTERNAL 待更新索引版本号
// EXTERNAL_GTE 待更新索引版本号
// FORCE 待更新索引版本号
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.updateVersion(updatedVersion);
// 当前不存在文档版本, 则为create
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
index.created(true);
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs(), index.analyzer());
} else {
indexWriter.addDocument(index.docs().get(0), index.analyzer());
}
} else {
// 已经存在文档版本, 则update
if (versionValue != null) {
index.created(versionValue.delete()); // we have a delete which is not GC'ed...
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
}
}
// 增加translog
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
indexingService.postIndexUnderLock(index);
}
}
在更新文档时,如果发生VersionConflictEngineException或者DocumentAlreadyExistsException,则会重新执行shardOperation进行重试,最大默认重试次数为3次