ES Rest/Rpc流程
Rpc-Server端处理流程
调试流程
- 在RpcServer的消息处理入口的地方放置断点Netty4MessageChannelHandler
- 启动一个TransportClient,发送一个查询请求,看断点能否命中
- 1个查询请求后,发现断点经常命中
- 发现有三类消息:Liveness,HandShake和真正的业务消息
- 从客户端日志可以看到indices:data/read/get这样的消息
- 在服务端设置条件断点action.contains("indices:data/read/get")
调试代码
@Test
public void test1(){
try {
Settings settings = Settings.builder().put("cluster.name", "distribution_run").build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
GetResponse response = client.prepareGet("website", "blog", "AWMHVaesXwpGuvHHNcVg").execute().actionGet();
//输出结果
System.out.println(response.getSourceAsString());
//关闭client
client.close();
} catch (UnknownHostException e) {
}
//QueryBuilder qb1 = QueryBuilders.matchAllQuery();
}
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.9</version>
</dependency>
<dependency>
<groupId>org.appenders.log4j</groupId>
<artifactId>log4j2-elasticsearch-core</artifactId>
<version>1.1.1</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</appenders>
<loggers>
<logger name="com.relin.HelloLog4j" level="error" additivity="false">
<appender-ref ref="Console"/>
</logger>
<root level="trace">
<appender-ref ref="Console"/>
</root>
</loggers>
</configuration>
代码分析
- ES为不同类型的操作提供不同类型的线程池,总共有15中,调试上述代码使用的SANE,对应的线程池执行器是DIRECT_EXECUTOR_SERVICE
- 线程池所有类型的定义以及类型和执行器的对应关系都在ThreadPool类中
- TransportClient请求和对应Transport*Action的对应关系是怎么样的
- TransportClient发送业务请求都会带一个类似于indices:data/read/get的消息
- Transport*Action注入的时候都会带一个ActionName的属性名
@Inject
public TransportGetAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
GetRequest::new, ThreadPool.Names.GET);
this.indicesService = indicesService;
}
public static final GetAction INSTANCE = new GetAction();
public static final String NAME = "indices:data/read/get";