Apache HttpClient连接池泄露问题排查
问题背景
业务系统主要的业务是一个数据聚合管理平台,其中系统有一个功能是同步所有资源(简称 大同步)
业务同步数据请求数据工具是适配
Apache HttpClient
的Feign
,这种请求封装是我当时根据业务适配业务封装请求api
Feign
版本:10.10.1
问题来源
- 在生产环境,大同步功能(20多个任务)发现跑了一半多的任务时候卡住,在测试环境并没有发现这个问题
同步接口
public interface SyncHelper {
Order syncOrder();
void syncAllAccount();
void syncSingleAccount(Long accountId);
default boolean enableSync() {
return true;
}
}
大同步功能实现
@Slf4j
@Component
public class SyncAccountResourceListener {
@Autowired
private final List<SyncHelper> helpers;
// 单线程线程池
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("h3c-sync-resource-%d")
.build()
);
public void sync(){
for (SyncHelper helper : helpers) {
if (Thread.currentThread().isInterrupted()) {
log.error("[{}] sync task interrupted,account:[{}]", className, accountId);
continue;
}
Future<?> future = EXECUTOR.submit(() -> helper.syncSingleAccount(accountId));
try {
future.get(helper.getTimeOut(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException|TimeoutException e) {
log.error("[{}] sync error,account:[{}]", className, accountId, e);
} finally {
future.cancel(true);
}
}
}
}
排查步骤
本想着以最快速度解决问题,系统上同步进度列表 显示都卡在同一个同步类,然后粗略看了一下相关同步类的代码,发现并没有相关可能导致死循环的代码
尝试复现
- 在测试环境测试大同步,发现没问题(包括请求来回数据日志、数据库sql打印日志),顺利完成所有的同步任务
- 那就针对卡住的同步类做单元测试反复执行多次,结果发现也并没有问题
至此,问题就更加疑惑。并无法在测试环境和本地单元测试复现,生产怎么就会有相关的问题?
死锁
一开始没去排查死锁问题,因为大部分同步都没有用到多线程
可能原因
- 用到多线程在大同步资源使用单线程的线程池跑任务,然后任务超时 TimeOut 没做好任务中断的处理,导致后面任务全部阻塞
- 看到有同事同步数据用了多线程,用的不是很合理,类似以下代码:
List<SysDept> deptList = ......
List<CompletableFuture<CmdbUsageReport>> futureList = new ArrayList<>();
deptList.forEach(t -> futureList.add(
CompletableFuture.supplyAsync(() -> {
// 耗时任务
return report;
}, ioPool)));
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{})).join();
List<CmdbUsageReport> reportList = futureList.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(toList());
// ......
既然排查线程的问题,直接使用相关的分析工具去分析,看一下到底是怎么回事
分析线程状态
使用 阿里 arthas
或者 visualvm
查看同步任务的线程状态
启动 arthas
attach 相应进程
java -jar arthas-boot.jar
thread --all
查看所有线程简单信息
使用单线程线程池跑同步任务,执行线程池线程也有自定义名称,名称为 `h3c-sync-resource-0`(进程 ID 为 250 ,线程状态为 `WAITING` )
thread 250
查看同步信息进程的详细信息
"h3c-sync-resource-0" - Thread t@195
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <66bb3d00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:393)
at org.apache.http.pool.AbstractConnPool.access$300(AbstractConnPool.java:70)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:253)
- locked <637b83f5> (a org.apache.http.pool.AbstractConnPool$2)
at org.apache.http.pool.AbstractConnPool$2.get(AbstractConnPool.java:198)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:306)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:282)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at feign.httpclient.ApacheHttpClient.execute(ApacheHttpClient.java:83)
at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:119)
at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89)
at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100)
at com.sun.proxy.$Proxy360.osAggregates(Unknown Source)
根据线程信息看到关键代码,卡在 feign
请求的地方,再细看发现是 apache http client dead lock 死锁
AbstractConnPool.java:393
位于 org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking
详细看代码,发现是取
http client
连接池的空闲连接阻塞等待导致的问题
源码追踪
现在访问 http client
官网看一下简单的 demo example,demo 案例访问地址:httpcomponents-client-quickstart
会看到一个简单的使用案例:
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet("http://httpbin.org/get");
// The underlying HTTP connection is still held by the response object
// to allow the response content to be streamed directly from the network socket.
// In order to ensure correct deallocation of system resources
// the user MUST call CloseableHttpResponse#close() from a finally clause.
// Please note that if response content is not fully consumed the underlying
// connection cannot be safely re-used and will be shut down and discarded
// by the connection manager.
try (CloseableHttpResponse response1 = httpclient.execute(httpGet)) {
System.out.println(response1.getCode() + " " + response1.getReasonPhrase());
HttpEntity entity1 = response1.getEntity();
// do something useful with the response body
// and ensure it is fully consumed
EntityUtils.consume(entity1);
}
}
- 官方简单使用 java7
try-with-resources
的形式去使用 httpclient ,使用完成自动释放资源 - 官方文档解释也说了:如果连接安全回收重用,需要使用
EntityUtils.consume
去消费 响应内容 并关闭流 - 最后线程阻塞等待问题可能原因是没有及时消费完响应内容
追踪 Feign 源码
现在看一下 Apache HttpClient
转换 Feign
请求的大概流程和源码, HttpClient
转换 Feign Response
方法 : feign.httpclient.ApacheHttpClient.toFeignBody
Response.Body toFeignBody(HttpResponse httpResponse) {
final HttpEntity entity = httpResponse.getEntity();
if (entity == null) {
return null;
}
return new Response.Body() {
@Override
public Integer length() {
// 如果是 Transfer-Encoding: chunked length 必定是返回 null
return entity.getContentLength() >= 0 && entity.getContentLength() <= Integer.MAX_VALUE
? (int) entity.getContentLength()
: null;
}
@Override
public boolean isRepeatable() {
return entity.isRepeatable();
}
@Override
public InputStream asInputStream() throws IOException {
// 单纯传递 InputStream
return entity.getContent();
}
@SuppressWarnings("deprecation")
@Override
public Reader asReader() throws IOException {
return new InputStreamReader(asInputStream(), UTF_8);
}
@Override
public Reader asReader(Charset charset) throws IOException {
Util.checkNotNull(charset, "charset should not be null");
return new InputStreamReader(asInputStream(), charset);
}
@Override
public void close() throws IOException {
// 资源回收方法
EntityUtils.consume(entity);
}
};
}
因为响应内容需要完全被消费才能回到连接池重用连接,
org.apache.http.util.EntityUtils.consume
大概代码如下:
public static void consume(final HttpEntity entity) throws IOException {
if (entity == null) {
return;
}
if (entity.isStreaming()) {
// 还在传输状态,获取流
final InputStream inStream = entity.getContent();
// close 直接关闭回收资源
if (inStream != null) {
inStream.close();
}
}
}
EntityUtils.consume 消费响应内容并安全重用连接 流程如下,如果有兴趣可以自己去看一下,这边就不长篇讨论了:
org.apache.http.util.EntityUtils.consume
org.apache.http.impl.execchain.ResponseEntityProxy.getContent 包装成自动释放连接的 EofSensorInputStream
org.apache.http.conn.EofSensorInputStream.close
org.apache.http.conn.EofSensorInputStream.checkClose
org.apache.http.impl.execchain.ResponseEntityProxy.streamClosed
org.apache.http.impl.execchain.ResponseEntityProxy.releaseConnection
feign.AsyncResponseHandler#handleResponse
void handleResponse(CompletableFuture<Object> resultFuture,
String configKey,
Response response,
Type returnType,
long elapsedTime) {
// copied fairly liberally from SynchronousMethodHandler
boolean shouldClose = true;
try {
if (logLevel != Level.NONE) {
// 日志级别不是NONE就输出日志
response = logger.logAndRebufferResponse(configKey, logLevel, response,
elapsedTime);
}
if (Response.class == returnType) {
if (response.body() == null) {
resultFuture.complete(response);
} else if (response.body().length() == null
|| response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
// 如果是 Transfer-Encoding: chunked length 必定是返回 null,导致下面 finally 块 没有关闭回收资源
shouldClose = false;
resultFuture.complete(response);
} else {
// Ensure the response body is disconnected
// InputStream 转 byte[] 回收资源,并存回 response
final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
resultFuture.complete(response.toBuilder().body(bodyData).build());
}
} else if (response.status() >= 200 && response.status() < 300) {
if (isVoidType(returnType)) {
resultFuture.complete(null);
} else {
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
}
} else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {
final Object result = decode(response, returnType);
shouldClose = closeAfterDecode;
resultFuture.complete(result);
} else {
resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));
}
} catch (final IOException e) {
if (logLevel != Level.NONE) {
logger.logIOException(configKey, logLevel, e, elapsedTime);
}
resultFuture.completeExceptionally(errorReading(response.request(), response, e));
} catch (final Exception e) {
resultFuture.completeExceptionally(e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}
feign.slf4j.Slf4jLogger.logAndRebufferResponse
protected Response logAndRebufferResponse(String configKey,
Level logLevel,
Response response,
long elapsedTime)
throws IOException {
if (logger.isDebugEnabled()) {
// DEBUG 级别调用父类方法进行日志输出
return super.logAndRebufferResponse(configKey, logLevel, response, elapsedTime);
}
return response;
}
feign.Logger.logAndRebufferResponse
protected Response logAndRebufferResponse(String configKey,
Level logLevel,
Response response,
long elapsedTime)
throws IOException {
String reason =
response.reason() != null && logLevel.compareTo(Level.NONE) > 0 ? " " + response.reason()
: "";
int status = response.status();
log(configKey, "<--- HTTP/1.1 %s%s (%sms)", status, reason, elapsedTime);
if (logLevel.ordinal() >= Level.HEADERS.ordinal()) {
// 大于 HEADERS 级别进行日志输出
for (String field : response.headers().keySet()) {
for (String value : valuesOrEmpty(response.headers(), field)) {
log(configKey, "%s: %s", field, value);
}
}
int bodyLength = 0;
if (response.body() != null && !(status == 204 || status == 205)) {
// HTTP 204 No Content "...response MUST NOT include a message-body"
// HTTP 205 Reset Content "...response MUST NOT include an entity"
if (logLevel.ordinal() >= Level.FULL.ordinal()) {
log(configKey, ""); // CRLF
}
// 这里很关键,读取 InputStream 转换成 byte[],调用 InputStream close 进行资源回收操作
byte[] bodyData = Util.toByteArray(response.body().asInputStream());
bodyLength = bodyData.length;
if (logLevel.ordinal() >= Level.FULL.ordinal() && bodyLength > 0) {
log(configKey, "%s", decodeOrDefault(bodyData, UTF_8, "Binary data"));
}
log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
// 因为当前 response.body() 被消费,重新存回去 response
return response.toBuilder().body(bodyData).build();
} else {
log(configKey, "<--- END HTTP (%s-byte body)", bodyLength);
}
}
return response;
}
feign.Util.toByteArray
public static byte[] toByteArray(InputStream in) throws IOException {
checkNotNull(in, "in");
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
} finally {
// closeable close 关闭回收资源
ensureClosed(in);
}
}
追踪 Feign Slf4jLogger `feign/Logger.java:84` , 发现 `feign.Logger#logAndRebufferResponse` 会读取一次 `body` 内容
大于`HEADERS` 级别:`logLevel.ordinal() >= Level.HEADERS.ordinal()`,在后面 `byte[] bodyData = Util.toByteArray(response.body().asInputStream());`,进行一次数据拷贝,然后 close 掉原来的 `InputStream`
读源码思考
源码分析到这个地方以后,发现一般情况下也会自动释放掉相应内容:
-
开启日志(Feign 日志级别大于/等于
HEADERS
&&Logger
级别小于/等于DEBUG
)- 会拷贝原有的响应信息
response.body()
并释放掉InputStream
- 会拷贝原有的响应信息
-
关闭日志(Feign 日志级别小于
HEADERS
&&Logger
级别大于DEBUG
)-
Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)
这种情况不会释放掉响应信息 -
closeAfterDecode
为 false 不会释放掉响应信息
-
验证问题
生产环境 Feign
日志级别是 FULL(大于 HEADERS
),但是 Logger 开启的日志级别是 INFO
,尝试以下步骤复现问题
- 构建 ApacheHttpClient 把连接池数量调整到最低
CLIENT = new ApacheHttpClient(
HttpClientBuilder.create()
.setSSLSocketFactory(
new SSLConnectionSocketFactory(trustAllSslSocketFactory(),
(hostname, session) -> true)
)
// 设置最大路由
.setMaxConnPerRoute(1)
// 设置最大连接总数
.setMaxConnTotal(1)
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(REQ_TIMEOUT)
.build()
)
.useSystemProperties()
.build()
- 本地同步的单元测试
Logger
级别配置等于INFO
&&Feign
日志级别 大于/等于HEADERS
,单元测试结果同步任务一段时间后 死锁 - 本地同步的单元测试
Logger
级别配置等于DEBUG
&&Feign
日志级别 小于HEADERS
,单元测试结果同步任务一段时间后 死锁 - 本地同步的单元测试
Logger
级别配置等于DEBUG
&&Feign
日志级别 大于/等于HEADERS
(这就是测试、开发环境的配置),单元测试结果同步任务一段时间后 没发现死锁
单元测试和源码分析后得到问题结果就非常明显:没有完全消费释放 响应信息 导致连接池连接无法安全复用
追溯问题根源
回顾上面源码分析的 feign.AsyncResponseHandler#handleResponse
源码, closeAfterDecode
默认为 true
,那就只有是以下情况才会不释放响应内容:
Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)
排查项目代码 是否存在 Response 返回类型,且请求响应体的长度为空的
回想起来请求对应 api 时候会异步校验并刷新 token
,检验 token
不需要处理返回值,之前用了 Response
并判断返回码,以下是部分关键代码:
校验 token Feign 接口
@RequestLine("GET /sys/identity/v2/tokens")
@Headers({"X-Subject-Token: {token}"})
Response verifyToken(@Param("token") String token);
校验 token 代码
@Override
public boolean verifyToken(H3CClientConfig config, String token) {
IdentityApi identityApi = Feigns.h3c(IdentityApi.class, config);
Response response = identityApi.verifyToken(token);
return response.status() == HttpStatus.OK.value();
}
请求响应日志
GET https://19.50.81.200:8100/sys/identity/v2/tokens
HTTP/1.1 200 OK
Date: Sun, 05 Sep 2021 02:55:09 GMT
Content-Type: application/json; charset=UTF-8
Transfer-Encoding: chunked
Connection: keep-alive
X-Subject-Token: eyJjdHkiOiJKV1QiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiZGlyIn0..yWaex8RVEvBvw1D-kk_LMQ.IwiMpoRWSPsUZiEpr09tL7WDZ1-vIRZqFsGqQq2CV4wBp6S8mBIhICI3Ce2sE_TLA_A2oX_NnMpAf5D4C_DwunJaiJ3lnD51Sg1bxWao_gXnPS7JdfpyaRXY-rtPMaxs-0FisUuyVlKfQh3Ab8t3WsCLzU9Yz7sQ367CKtW1z32ttafrWRlotLN0y7XX3ZRz7Ttznm2cZ5Ae79MEPQF1-hbKiGoz4B8kR1NRgeL-arlpa8qtgERYEEtr-VtJgydDpylusItc_uOtPqwEh0HAgYQjJovF75pej5WlCgdzYVQMr08OGT0JnBrReWYxl0h2P0xxZQtNcM2d0T54TebvvRhQKRyywvasQ064FS4B4mGN-8E3TZkxSfSfr4OWZ1Nmwpr3xFGBOSVpKf5-AufCoXPW3yGu3vFSpCahoKq01n9_gd4UbKLE82Cwou4uZf4VMZ7A7hOAdWYo_geb1bTzLUyTdDSUgbS8XiiYCOpaA4euv409ELE22U77F940M2DO2y8lbaDuk4iAv3QIp5gCGg.9pzTvRPM-FAMa-17a2J5kQ
上述代码我并没有处理 Response
响应,只是单纯判断一下响应码 ,但是通过请求来回日志发现请求响应的响应信息有点不一样:没有 Content-Length
字段且 带有响应头为 Transfer-Encoding: chunked
关于 Http Chuncked
Chuncked InputStream
的响应头是 Transfer-Encoding: chunked
, 是未知长度, 没有 Content-Length
字段,最后一个数据块的长度必须为 0 ,也可以理解为 http 报文以0\r\n\r\n 来结尾。
刚好符合泄露的条件 Response.class == returnType && (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE)
如果没及时释放掉 Response
会导致连接池的连接无法安全重用。
问题总结
最终发现问题的根源是 输入流 InputStream
泄露导致请求连接池未能正常释放,feign
请求返回值使用 Response
并没有用 try-with-resources
形式包裹,导致没执行 close
方法
排查问题的关键:
- 对
Feign
源码需要有一定的调试分析能力 - 掌握常规的应用性能分析方法
- 业务上了解生产、测试环境的区别,最好测试环境和生产的配置尽量保持一致,尽早发现问题
解决方案
Feign
请求返回值使用 Response
响应类型的时候,使用 java7 try-with-resources
形式或者 try-finally
及时释放资源