上篇文章分析了elasticsearch从接受到restful请求后怎么生成一个索引创建任务,接下来分析索引创建任务的执行过程。我们之前讲到在TieBreakingPrioritizedRunnable中实现了run方法,所以任务执行的时候肯定是先进入其run方法,那么就从它的run函数开始一步步分析。
TieBreakingPrioritizedRunnable
public void run() {
synchronized (this) {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
}
runAndClean(runnable);
}
首先将started设置为true,表示任务开始执行。随后进入runAndClean方法。
private void runAndClean(Runnable run) {
try {
run.run();
} finally {
runnable = null;
timeoutFuture = null;
}
}
这里调用了run的run方法,而run就是TieBreakingPrioritizedRunnable的runnable成员变量。那么该runnable对象是个什么类型的对象呢?还记得之前讲到执行command时会先对command包装一下吧,在那个包装函数里生成了PrioritizedEsThreadPoolExecutor对象,这里再次看下wrapRunnable函数。
protected Runnable wrapRunnable(Runnable command) {
if (command instanceof PrioritizedRunnable) {
if ((command instanceof TieBreakingPrioritizedRunnable)) {
return command;
}
Priority priority = ((PrioritizedRunnable) command).priority();
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
} else if (command instanceof PrioritizedFutureTask) {
return command;
} else { // it might be a callable wrapper...
if (command instanceof TieBreakingPrioritizedRunnable) {
return command;
}
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
}
}
可以看到在构造TieBreakingPrioritizedRunnable对象的再次调用了TieBreakingPrioritizedRunnable父类的wrapRunnable函数对command封装了一下。TieBreakingPrioritizedRunnable的父类为EsThreadPoolExecutor。
EsThreadPoolExecutor
protected Runnable wrapRunnable(Runnable command) {
return contextHolder.preserveContext(command);
}
这里调用了contextHolder的preserveContext函数。contextHolder是一个ThreadContext对象。
ThreadContext
public Runnable preserveContext(Runnable command) {
if (command instanceof ContextPreservingAbstractRunnable) {
return command;
}
if (command instanceof ContextPreservingRunnable) {
return command;
}
if (command instanceof AbstractRunnable) {
return new ContextPreservingAbstractRunnable((AbstractRunnable) command);
}
return new ContextPreservingRunnable(command);
}
到这里最终返回的是ContextPreservingRunnable对象。所以回到TieBreakingPrioritizedRunnable的runAdnClean方法,此处的run对象就是ContextPreservingRunnable对象。
ContextPreservingRunnable
public void run() {
boolean whileRunning = false;
try (ThreadContext.StoredContext ignore = stashContext()){
ctx.restore();
whileRunning = true;
in.run();
whileRunning = false;
} catch (IllegalStateException ex) {
if (whileRunning || threadLocal.closed.get() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
}
}
这里设置了一下context后就直接调用了in对象的run方法,这里的in成员就是最开始被包装的command,也就是UpdateTask对象,它属于BatchedTask的子类。
BatchedTask
public void run() {
runIfNotProcessed(this);
}
TaskBatcher
void runIfNotProcessed(BatchedTask updateTask) {
// if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later,
// to give other tasks with different batching key a chance to execute.
if (updateTask.processed.get() == false) {
final List<BatchedTask> toExecute = new ArrayList<>();
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
}
if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
run(updateTask.batchingKey, toExecute, tasksSummary);
}
}
}
这里先是计算哪些task需要被执行,随后调用run方法执行需要执行的task。
Batcher
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
Batcher继承了TaskBatcher,所以会进入Batcher的run方法。这里先把batchingKey转成taskExecutor,batchingKey其实就是最初创建的IndexCreationTask对象。随后构造了一个TaskInputs对象,并调用MasterService的runtasks方法。
MasterService
protected void runTasks(TaskInputs taskInputs) {
final String summary = taskInputs.summary;
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, master service not started", summary);
return;
}
logger.debug("processing [{}]: execute", summary);
//首先获取集群的当前状态
final ClusterState previousClusterState = state();
//如果当前节点已经不是master节点,并且task只能再master节点执行,则拒绝执行。
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {
logger.debug("failing [{}]: local node is no longer master", summary);
taskInputs.onNoLongerMaster();
return;
}
long startTimeNS = currentTimeInNanos();
//此处是任务的执行过程,并将计算结果通过TaskOutputs返回
TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
taskOutputs.notifyFailedTasks();
if (taskOutputs.clusterStateUnchanged()) {
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", summary, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, summary);
} else {
ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodeSummary = nodesDelta.shortSummary();
if (nodeSummary.length() > 0) {
logger.info("{}, reason: {}", summary, nodeSummary);
}
}
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState));
} catch (Discovery.FailedToCommitClusterStateException t) {
final long version = newClusterState.version();
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", summary, version),
t);
taskOutputs.publishingFailed(t);
return;
}
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
try {
taskOutputs.clusterStatePublished(clusterChangedEvent);
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage(
"exception thrown while notifying executor of new cluster state publication [{}]",
summary),
e);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})", summary,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, summary);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
summary,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
}
}
}
先看下calculateTaskOutputs这个函数
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),
clusterTasksResult.executionResults);
}
在跟入executeTasks函数
protected ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, long startTimeNS, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
if (previousClusterState != clusterTasksResult.resultingState &&
previousClusterState.nodes().isLocalNodeElectedMaster() &&
(clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) {
throw new AssertionError("update task submitted to MasterService cannot remove master");
}
} catch (Exception e) {
...
}
...
return clusterTasksResult;
}
这个函数省略了一些代码,在这里通过调用taskInputs的executor成员来执行任务。这个executor其实就是IndexCreationTask对象。传入了两个参数,一个是上一个集群的状态,另一个是inputs对象,inputs里边包含的其实也是IndexCreationTask对象。这里先看下IndexCreationTask类的层次结构。
static class IndexCreationTask extends AckedClusterStateUpdateTask<ClusterStateUpdateResponse>
public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask implements AckedClusterStateTaskListener
public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener
可以看到,IndexCreationTask即是一个task,又是一个listener,又是一个config,同时还是一个executor。回忆一下ClusterService的submitStateUpdateTask函数
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
void submitStateUpdateTask(String source, T updateTask) {
submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
}
public <T> void submitStateUpdateTask(String source, T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener) {
submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
}
在执行这个函数的时候后面四个参数传的是同一个IndexCreateTask对象!回到正题,继续跟入executor的execute方法,其实是先进入了ClusterStateUpdateTask的execute方法。
ClusterStateUpdateTask
public final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks) throws Exception {
ClusterState result = execute(currentState);
return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
}
然后在进入TaskCreationTask的execute方法
IndexCreationTask
public ClusterState execute(ClusterState currentState) throws Exception {
Index createdIndex = null;
String removalExtraInfo = null;
IndexRemovalReason removalReason = IndexRemovalReason.FAILURE;
try {
...
//获取请求的recoverFromIndex,在前文中我提到过这个属性,可以往回找下该参数是怎么设置的
final Index recoverFromIndex = request.recoverFrom();
//目标索引的配置
Settings.Builder indexSettingsBuilder = Settings.builder();
// now, put the request settings, so they override templates
//目标索引元数据builder
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());
final int routingNumShards;
if (recoverFromIndex == null) {
Settings idxSettings = indexSettingsBuilder.build();
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
} else {
assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
: "index.number_of_routing_shards should be present on the target index on resize";
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
//目标索引的routingNumShard必须从源索引中获取
routingNumShards = sourceMetaData.getRoutingNumShards();
}
// remove the setting it's temporary and is only relevant once we create the index
indexSettingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey());
tmpImdBuilder.setRoutingNumShards(routingNumShards);
if (recoverFromIndex != null) {
assert request.resizeType() != null;
//一些准备工作,包括校验参数合法性,将源索引的一些配置设置到目标索引中。如果源索引没有被设置为阻塞写,会在这一步报错。
prepareResizeIndexSettings(
currentState, mappings.keySet(), indexSettingsBuilder, recoverFromIndex, request.index(), request.resizeType());
}
final Settings actualIndexSettings = indexSettingsBuilder.build();
tmpImdBuilder.settings(actualIndexSettings);
if (recoverFromIndex != null) {
/*
* We need to arrange that the primary term on all the shards in the shrunken index is at least as large as
* the maximum primary term on all the shards in the source index. This ensures that we have correct
* document-level semantics regarding sequence numbers in the shrunken index.
*/
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
final long primaryTerm =
IntStream
.range(0, sourceMetaData.getNumberOfShards())
.mapToLong(sourceMetaData::primaryTerm)
.max()
.getAsLong();
for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
tmpImdBuilder.primaryTerm(shardId, primaryTerm);
}
}
// Set up everything, now locally create the index to see that things are ok, and apply
final IndexMetaData tmpImd = tmpImdBuilder.build();
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
waitForActiveShards = tmpImd.getWaitForActiveShards();
}
if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
}
// create the index here (on the master) to validate it can be created, as well as adding the mapping
//此处创建了与索引对应的IndexService,并将其加入到IndicesService的indices map中
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
createdIndex = indexService.index();
// now add the mappings
final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
.settings(actualIndexSettings)
.setRoutingNumShards(routingNumShards);
//此时index处于open状态
indexMetaDataBuilder.state(request.state());
final IndexMetaData indexMetaData;
try {
indexMetaData = indexMetaDataBuilder.build();
} catch (Exception e) {
removalExtraInfo = "failed to build index metadata";
throw e;
}
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(),
indexMetaData.getSettings());
//此处将目标索引对应的IndexMetaData加入到MetaData对象的indices成员变量中。该成员变量是一个map,维护了集群中所有索引的metaData。
MetaData newMetaData = MetaData.builder(currentState.metaData())
.put(indexMetaData, false)
.build();
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}",
request.index(), request.cause(), templateNames, indexMetaData.getNumberOfShards(),
indexMetaData.getNumberOfReplicas(), mappings.keySet());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks().isEmpty()) {
for (ClusterBlock block : request.blocks()) {
blocks.addIndexBlock(request.index(), block);
}
}
blocks.updateBlocks(indexMetaData);
//此处将新的MetaData对象应用到集群中,改变集群状态,使新的集群状态包含目标索引。
ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();
//上一步新建的集群状态虽然包含了目标索引的metaData,但路由表中还没有包含目标索引的路由信息。在这一步中试图更新路由表信息。
if (request.state() == State.OPEN) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.index()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"index [" + request.index() + "] created");
}
removalExtraInfo = "cleaning up after validating index on master";
removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED;
return updatedState;
} finally {
if (createdIndex != null) {
// Index was already partially created - need to clean up
indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo);
}
}
}
这个函数非常长,这里删掉了很多细节,只列出几个比较重要的步骤。先设置一些配置属性,同时校验一些配置合法性后通过IndicesService的createIndex函数创建索引。
IndicesService
public synchronized IndexService createIndex(
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
}
final Index index = indexMetaData.getIndex();
if (hasIndex(index)) {
throw new ResourceAlreadyExistsException(index);
}
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
final IndexEventListener onStoreClose = new IndexEventListener() {
@Override
public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId);
}
};
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
final IndexService indexService =
createIndexService(
"create index",
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
indexingMemoryController);
boolean success = false;
try {
indexService.getIndexEventListener().afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
success = true;
return indexService;
} finally {
if (success == false) {
indexService.close("plugins_failed", true);
}
}
}
这里设置了一些索引上的listener进入到createIndexService函数。
private synchronized IndexService createIndexService(final String reason,
IndexMetaData indexMetaData,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
List<IndexEventListener> builtInListeners,
IndexingOperationListener... indexingOperationListeners) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
reason);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
return indexModule.newIndexService(
nodeEnv,
xContentRegistry,
this,
circuitBreakerService,
bigArrays,
threadPool,
scriptService,
client,
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry
);
}
最终在这里创建了IndexService对象。有关索引创建的细节这里暂时先不讨论,只需要暂时知道在elasticsearch中,每一个索引都对应一个IndexService对象。同时每个索引有个UUID,每个索引对应的IndexServie对象通过IndicesService中的indices成员维护。indices成员就是一个map。在createIndex函数中,创建了IndexService后,调用了如下命令将新建的索引加入到indices map中。
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
回到IndexCreationTask的execute函数中,当索引服务创建完,集群状态变更后,就需要更改路由表信息。在这里详细分析下路由表信息是怎么构建的,这对split过程理解比较重要。首先是获取到更新后的集群状态的路由表,注意,此时获取的路由表中还未包含目标索引的路由信息。然后拿到一个路由表builder,通过其addAsNew方法将新索引加入到路由表中。
RoutingTable.Builder
public Builder addAsNew(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsNew(indexMetaData);
add(indexRoutingBuilder);
}
return this;
}
这里传入的参数为目标索引的IndexMetaData,因为此时目标索引还处于open状态,所以为该索引生成一个新的IndexRoutingTable.Builder对象。然后调用其initializeAsNew方法。
IndexRoutingTable.Builder
public Builder initializeAsNew(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
}
private Builder initializeEmpty(IndexMetaData indexMetaData, UnassignedInfo unassignedInfo) {
assert indexMetaData.getIndex().equals(index);
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardNumber = 0; shardNumber < indexMetaData.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
final RecoverySource primaryRecoverySource;
if (indexMetaData.inSyncAllocationIds(shardNumber).isEmpty() == false) {
// we have previous valid copies for this shard. use them for recovery
primaryRecoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
} else if (indexMetaData.getResizeSourceIndex() != null) {
// this is a new index but the initial shards should merged from another index
primaryRecoverySource = LocalShardsRecoverySource.INSTANCE;
} else {
// a freshly created index with no restriction
primaryRecoverySource = StoreRecoverySource.EMPTY_STORE_INSTANCE;
}
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
for (int i = 0; i <= indexMetaData.getNumberOfReplicas(); i++) {
boolean primary = i == 0;
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, primary,
primary ? primaryRecoverySource : PeerRecoverySource.INSTANCE, unassignedInfo));
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
}
return this;
}
这里优先对目标索引中的每个shard确定其primary shard的恢复源。因为split属于ResizeSourceIndex,所以primaryRecoverSrouce为LocalShardsRecoverySource.INSTANCE。到这里应该清楚了,目标索引的每一个primary shard都是首先从local shard 恢复而来。也就是源索引的shard在哪个机器,分列出来的目标shard也会在那个机器上。当然后边可能会有relocate,但创建之初,目标索引的primary shard和源索引的shard一定是在同台机器上的。确定完primary shard 的恢复位置后,然后在设置副本的恢复类型为PeerRecoverySource.INSTANCE。需要注意的是,到这一步还只是设置了每个分片的恢复类型,但每个分片最终应该属于哪台机器还并没有设置。最后把每个shard的路由表放到shards成员变量中。然后回到addAsNew方法中,把新建的索引的路由表加入到集群路由表中。所以路由表的关系为:
RoutingTable->IndexRoutingTable->IndexShardRoutingTable
最后再看下IndexCreationTask的execute方法,生成了路由表后,会调用allocationService的reroute方法。
AllocationService
public ClusterState reroute(ClusterState clusterState, String reason) {
return reroute(clusterState, reason, false);
}
protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(debug);
reroute(allocation);
if (allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}
直接进入到reroute函数
private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
removeDelayMarkers(allocation);
gatewayAllocator.allocateUnassigned(allocation);
}
shardsAllocator.allocate(allocation);
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}
到这里会调用shardsAllocator 的allocate方法分配所有shard。具体怎么分配这里不展开细讲了,我们重点关注下ResizeAllocationDecider这个类。
ResizeAllocationDecider
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
if (unassignedInfo != null && shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// we only make decisions here if we have an unassigned info and we have to recover from another index ie. split / shrink
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
assert resizeSourceIndex != null;
if (allocation.metaData().index(resizeSourceIndex) == null) {
return allocation.decision(Decision.NO, NAME, "resize source index [%s] doesn't exists", resizeSourceIndex.toString());
}
IndexMetaData sourceIndexMetaData = allocation.metaData().getIndexSafe(resizeSourceIndex);
if (indexMetaData.getNumberOfShards() < sourceIndexMetaData.getNumberOfShards()) {
// this only handles splits so far.
return Decision.ALWAYS;
}
//选择源shard
ShardId shardId = IndexMetaData.selectSplitShard(shardRouting.id(), sourceIndexMetaData, indexMetaData.getNumberOfShards());
//找到源shard的primary分片对应的路由信息
ShardRouting sourceShardRouting = allocation.routingNodes().activePrimary(shardId);
if (sourceShardRouting == null) {
return allocation.decision(Decision.NO, NAME, "source primary shard [%s] is not active", shardId);
}
if (node != null) { // we might get called from the 2 param canAllocate method..
if (node.node().getVersion().before(ResizeAction.COMPATIBILITY_VERSION)) {
return allocation.decision(Decision.NO, NAME, "node [%s] is too old to split a shard", node.nodeId());
}
//只有在源shard的primary分片所在的节点和当前分配的节点一致,才返回YES
if (sourceShardRouting.currentNodeId().equals(node.nodeId())) {
return allocation.decision(Decision.YES, NAME, "source primary is allocated on this node");
} else {
return allocation.decision(Decision.NO, NAME, "source primary is allocated on another node");
}
} else {
return allocation.decision(Decision.YES, NAME, "source primary is active");
}
}
return super.canAllocate(shardRouting, node, allocation);
}
elasticsearch在决定是否将某个shard分配到一个节点时会调用许多的AllocationDecider来决策的。只有所有的AllocationDecider都通过后才能将该shard分配到某个节点。在split这种场景下,主要是ResizeAllocationDecider来决定。该decider决定目标索引的所有主分片只能和源索引的相应的主分片在同一个节点上。例如源索引的分片0的主分片在节点3上,那么由分片0分裂的所有分片的主分片都必须同样在节点3上。
等所有的分片路由信息决定好后索引创建完毕,回到calculateTaskOutputs函数,生成新的集群状态,然后构造TaskOutPuts信息返回到MasterService的runTasks方法。在该方法中会生成一个ClusterChangedEvent事件,然后通过clusterStatePublisher对外公布集群状态变更事件。
至此,索引已经创建完毕,但索引背后的数据从哪来呢,在下篇文章中将会解答这个问题。