STS(SparkThrfitServer)版本
- spark-3.2.1-bin-hadoop3.2
问题表现
- Spark UI 经常无响应
- STS 经常挂掉
问题分析
获取heap.hprof和gc.log
spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/spark/hbi/logs/ -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC -XX:+PrintReferenceGC -XX:+PrintGCApplicationStoppedTime -Xloggc:/home/spark/hbi/logs/gc-%t.log
堆对象分析
- 工具:jprofiler
大对象
Hashtable引用链
ConcurrentHashMap引用链
GC趋势
- 工具:gcviewer
- 黄色:新生代
- 红色:老年代
- 黑色:full gc
结论
- 确定是内存泄漏了,而且内存大对象有两个
a. Hashtable
b. ConcurrentHashmap - 两个大对象都被该对象引用
a. SparkExecuteStatementOperation
STS架构
Spark Thrift Server大量复用了HiveServer2的代码。
HiveServer2的架构主要是通过ThriftCLIService监听端口,然后获取请求后委托给CLIService处理。CLIService又一层层的委托,最终交给OperationManager处理。OperationManager会根据请求的类型创建一个Operation的具体实现处理。比如Hive中执行sql的Operation实现是SQLOperation。
Spark Thrift Server做的事情就是实现自己的CLIService——SparkSQLCLIService,接着也实现了SparkSQLSessionManager以及SparkSQLOperationManager。另外还实现了一个处理sql的Operation——SparkExecuteStatementOperation。这样,当Spark Thrift Server启动后,对于sql的执行就会最终交给SparkExecuteStatementOperation了。
Spark Thrift Server其实就重写了处理sql的逻辑,其他的请求处理就完全复用HiveServer2的代码了。比如建表、删除表、建view等操作,全部使用的是Hive的代码。
推测原因
大对象引用者
- org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation
- 注:下文中SparkExecuteStatementOperation统称为SESO
SESO引用者
- org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1
- 这是个什么东西?
SESO$$anon$1
- 通过反编译,发现该类是SparkExecuteStatementOperation的内部类
- 工具:jd-gui
- $$anon$1通过$outer引用了SESO
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation;
import org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$;
import scala.Option;
import scala.util.control.NonFatal$;
public final class null implements Runnable {
private final ScheduledExecutorService timeoutExecutor$1;
public null(SparkExecuteStatementOperation $outer, ScheduledExecutorService timeoutExecutor$1) {}
public void run() {
try {
this.$outer.timeoutCancel();
} catch (Throwable throwable1) {
Throwable throwable2 = throwable1;
Option option = NonFatal$.MODULE$.unapply(throwable2);
} finally {
this.timeoutExecutor$1.shutdown();
}
}
}
SESO内部类源码
- 注意:timeout单位是TimeUnit.SECONDS
# 核心变量timeout
// If a timeout value `queryTimeout` is specified by users and it is smaller than
// a global timeout value, we use the user-specified value.
// This code follows the Hive timeout behaviour (See #29933 for details).
private val timeout = {
val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT)
if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) {
globalTimeout
} else {
queryTimeout
}
}
# 中间代码省略...
# 内部类创建逻辑
if (timeout > 0) {
val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
timeoutCancel()
} catch {
case NonFatal(e) =>
setOperationException(new HiveSQLException(e))
logError(s"Error cancelling the query after timeout: $timeout seconds")
} finally {
timeoutExecutor.shutdown()
}
}
}, timeout, TimeUnit.SECONDS)
}
原因推理
客户端
- 请求超时时间queryTimeout > 0
服务端
timeout默认等于客户端传过来的queryTimeout
timeout大于0时,会创建单线程的线程池,提交延迟任务,延迟任务在timeout时间后开始执行,执行完成后会停止线程池
a. 注意1:延迟任务作为SESO的内部类,会持有SESO的引用
b. 注意2:timeout单位是seconds
3.问题原因:如果timeout时间很长,而且sql执行时间很短,如果短时间内有大量的查询,那么这些线程在timeout时间内一直持有延迟任务的引用,也就是间接持有SESO对象的引用,这就是内存溢出的原因
验证推理
跟踪timeout传入链
- 找到入口方法
org.apache.hive.service.cli.CLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map<java.lang.String,java.lang.String>, long)
监控命令
- 工具:arthas
- params[3]: timeout
# arthas watch 命令
watch org.apache.hive.service.cli.CLIService executeStatementAsync '{params}' 'params[3]>0' -x 2
提出疑问
- 为什么 "SELECT 1" 的timeout是60000s(16.7h)?
服务分析
哪个服务
- 通过STS日志里查找sessionId,定位服务所在节点IP,从而定位到传入queryTimeout=60000的服务
- 最终定位有问题的服务:hbi-query-engine
原有连接池配置
# 连接池配置
engine.ds.hive.read.type = com.alibaba.druid.pool.DruidDataSource
# 单位是seconds,也就是16.7小时(Druid在用"SELECT 1"探活时使用该值)
engine.ds.hive.read.pool.queryTimeout=60000
# 单位是mills,也就是60秒(Druid没有使用该配置)
engine.ds.hive.read.pool.validationTimeout=60000
改进后连接池配置
# 连接池配置
engine.ds.hive.read.type = com.zaxxer.hikari.HikariDataSource
# 单位是seconds,也就是5分钟
engine.ds.hive.read.pool.queryTimeout=300
# 单位是mills,也就是60秒(Hikari在用"SELECT 1"探活时使用该值)
engine.ds.hive.read.pool.validationTimeout=60000
DruidDataSource探测流程
调用栈
setQueryTimeout核心方法
HikariDataSource探测流程
调用栈
setQueryTimeout核心方法
解决方案
- 为了快速解决问题,目前采取方案1(或者方案2),方案3有时间再尝试
方案1
- 根据连接池不同,设置合理的queryTimeout、validationTimeout
a. 注意1:queryTimeout的单位是seconds
b. 注意2:validationTimeout单位是mills
方案2
- 在sts端配置globalTimeout,也就是spark.sql.thriftServer.queryTimeout
a. 注:如果globalTimeout小于queryTimeout,则以globalTimeout为准
方案3
- 源码修复STS
a. 思路:即时释放查询资源(线程池:timeoutExecutor)
b. 实现:timeoutExecutor作为SESO的字段,并在SESO对象close时,调用timeoutExecutor.shutdown()
优化效果
全堆大小
年轻代大小
老年代大小
工具列表
- jprofiler: 分析堆内存
- gcviewer: 分析垃圾回收
- gceasy: 分析垃圾回收 在线分析入口
- arthas: 分析方法调用
- jd-gui: 反编译
参考文档
- Hive内存溢出示例:https://www.jianshu.com/p/88626013b39f
- STS架构:https://www.jianshu.com/p/b719c6415411
- STS架构VS Kyuubi:https://blog.51cto.com/u_15259710/4797692
- Arthas watch:https://arthas.aliyun.com/doc/watch.html