一、TiSpark 概述
1.拓扑图
TiSpark
是将 Spark SQL
直接运行在分布式存储引擎 TiKV
上的 OLAP
解决方案
可以使用 TiDB 内建的统计信息来选择更优的计划任务,不仅可以使用 Spark 的暴力计算,而且还可以将计算下推 TiKV,让计算效率更佳。
减少运维成本,简化系统架构,使用同一个数据源(TiKV),减少对 ETL 的维护。但是可能会导致 AP 的业务影响到 TP 业务,对于资源的利用需要规划。
可以借助 TiSpark 项目可以在 TiDB 上使用 Spark 生态圈提供的多种工具进行数据处理,更好的低成本嵌入当前 AP 业务中。
TiSpark 深度整合了 Spark Catalyst 引擎, 可以对计算提供精确的控制,使 Spark 能够高效的读取 TiKV 中的数据,提供索引支持以实现高速的点查。
2.版本对应
Spark 需要和 TiSpark 版本一一对应,如果不满足版本需求,会使用不了。没有上下兼容,必须一一对应,Spark 2.0 和 2.2 版本和更低版本暂不支持。
二、 TiSpark 安装入门
1.ansible 安装
基础安装
1.配置 inventor.ini 配置文件
- 填写 spark 的 master IP 和 slave 的 IP
[spark_master]
10.0.1.9
[spark_slaves]
10.0.1.9
10.0.1.10
2.安装
部署 TiSpark:ansible-playbook deploy.yml --tags=tispark
3.启动
ansible-playbook start_spark.yml
扩容
1.关闭 spark
ansible-playbook stop_spark.yml
2.配置 inventor.ini 配置文件
扩容 10.0.1.11 服务器为新 slave 节点
[spark_master]
10.0.1.9
[spark_slaves]
10.0.1.9
10.0.1.10
10.0.1.11
3.安装
部署 TiSpark:ansible-playbook deploy.yml --tags=tispark
4.启动
ansible-playbook start_spark.yml
缩容
1.关闭 spark
ansible-playbook stop_spark.yml
2.配置 inventor.ini 配置文件
- 缩容掉 10.0.1.11 的节点
[spark_master]
10.0.1.9
[spark_slaves]
10.0.1.9
10.0.1.10
3.安装
部署 TiSpark:ansible-playbook deploy.yml --tags=tispark
4.启动
ansible-playbook start_spark.yml
2.现有 Spark 集群部署 TiSpark
注意
需要注意版本对应,如果 Spark 和 TiSpark 版本对应不上,可能会导致不可用。
需要注意重置环境变量,比如 hadoop_home 变量需要取消,否则会影响到 TiSpark 的使用
需要注意 Java 的版本,JDK 1.8+(下载地址),版本不可过高,以及 Scala 2.11
安装
已有 Spark 集群上运行 TiSpark,无需重启集群。可以使用 Spark 的 --jars 参数将 TiSpark (TiSpark jar 包下载地址)作为依赖引入:
spark-shell --jars $PATH/tispark-core-2.0-SNAPSHOT-jar-with-dependencies.jar
如果想 TiSpark 作为默认组件部署,只需要将 TiSpark 的 jar 包放进 Spark 集群每个节点的 jars 路径并重启 Spark 集群:
{SPARK_INSTALL_PATH}/jars
在实际操作过程中,我们推荐使用第一个方法,当 TiSpark 的 jar 包放入 Spark 集群每个节点,经常会因为重复下载 jar 包,导致使用异常。
3.安装常见问题
可以使用
ansible-playbook deploy.yml --tags=tispark
修改配置文件修改过配置文件记得重启 spark-shell
三、TiSpark 使用
1.常见使用方法介绍
Thrift Server 的 JDBC 连接
启动
对于 ansible 安装的 TiSpark,可以使用生成脚本启动,在 master spark 服务器 deploy/spark/sbin 下执行:
./start-thriftserver.sh
这时会发现 10000 端口启动
使用
例如运行 beeline 来连接并测试 Thrift JDBC/ODBC 服务器:
其中 beeline 的binary 可以在 spark/bin/ 目录下找到:
[tidb@xiaohou-vm5 bin]$ ./beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000:
Enter password for jdbc:hive2://localhost:10000:
2019-01-27 00:53:49 INFO Utils:310 - Supplied authorities: localhost:10000
2019-01-27 00:53:49 INFO Utils:397 - Resolved authority: localhost:10000
2019-01-27 00:53:50 INFO HiveConnection:203 - Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.3.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000> show databases;
+---------------+--+
| databaseName |
+---------------+--+
| default |
| new |
| test |
| tpch_001 |
| tmp_test |
| mysql |
| tidb_loader |
+---------------+--+
7 rows selected (1.009 seconds)
这里举一个简单的例子,spark 自带了 beeline 客户端程序,我们可以使用它连接 JDBC 服务器,如果使用 beeline 客户端连接没有问题,代表着配置是成功的,这个时候我们就可以使用 java 代码连接了:jdbc:hive2://127.0.0.1:10000
。
也可以使用 deploy/spark/bin
目录下 spark-sql 启动 Spark SQL CLI,这个脚本主要对于本地开发比较有用。在共享的集群上,你应该使用JDBC服务器,让各用户通过beeline进行连接。
总结
如果想将 Spark Dataframe加载到TiDB中,可以使用 jdbc 或者 odbc 的方式加载,这里可以参考:TiSpark 用户指南
2.优化
a).Spark UI 界面解读
Spark-shell UI:默认是 4040 端口
Spark-master UI: 默认是 8080 端口,当且仅当集群模式有,local 模式是没有的。
Spark 可用资源确认
Workers : 他的数量表明当前 spark 可以将任务分发到多少机器,以及可分配资源(cores 以及 memory),如果机器资源总和是小于单个 TiDB 的话,是可能慢于 TiDB 的。
判断是否有数据倾斜
需要考虑是否有数据倾斜,导致有“木桶效应”,既 Spark 任务最终的执行时间取决于最后执行结束的 task 的运行时间。登录 Spark-shell UI,点开 States 标签页列出和提供当前spark上下文执行的所有 job 的当前 stage,当发现一个 job 的 States 有明显的时间差异过大的想象,可以点开 detail,查看更为详细的信息,根据 SQL 来查询数据分布,缺是否有数据倾斜的现象。
b).环境配置检查
两个重要的资源说明
检查分配计算 CPU 和内存是否过少:系统 50 G
的内存,executer
配置了 2G (配置少了),导致 Spark 可用资源较少,SQL 性能不理想。使用 ansible 部署的 TiSpark 默认分配的每个节点计算可利用的内存是 10G ,可用 CPU 为 5C,可以按照实际物理机器性能进行配置。配置在:tidb-ansible/conf/spark-env.yml
盘是否成为了瓶颈:SPARK_LOCAL_DIRS
,可在 tidb-ansible/conf/spark-env.yml
配置,Spark 默认配置是落盘在 tmp 目录下:第一需要确定 tmp 目录是否是 SSD 盘,可以使用 fio (fio -ioengine=psync -bs=128k -fdatasync=1 -thread -rw=write -size=10G -filename=/tmp/fio_write_test.txt -name=‘PingCAP’ -runtime=60
) 测试盘的性能,如果较差,可能会成为瓶颈。
也可以在 Spark SQL 执行过程中,使用 iostat (iostat -x 1
) 查看当前 io 是否已经使用过高
相关配置修改和说明
对于 TiSpark 而言,最重要的两个配置文件是:spark-env.sh
和 spark-defaults.conf
,可以使用 ansible 进行修改 tidb-ansible/conf/spark-env.yml
和 tidb-ansible/conf/spark-defaults.yml
两个配置文件,然后 deploy 后修改整个 TiSpark 集群的配置。
对于内存和 CPU 配置,可以按照实际物理配置分配,推荐比例:Memory:CPU=2:1
spark-env.sh 常见配置项
参数 | 说明 |
---|---|
SPARK_LOCAL_DIRS | 此节点上用于 shuffle 和 RDD 数据的存储目录,默认 /tmp |
SPARK_EXECUTOR_INSTANCES | 要启动的执行程序的数量,默认为 2,整个集群的 executor 数量为 worker 节点乘该配置 |
SPARK_EXECUTOR_CORES | 每个执行器的核心数,默认为 1 |
SPARK_EXECUTOR_MEMORY | 每个执行器的可用内存,默认为 2G |
SPARK_DRIVER_MEMORY | Spark应用程序 Application 所占的内存大小,默认是 2G,这里的Driver对应Yarn 中的 ApplicationMaster |
SPARK_WORKER_CORES | 每个Worker进程所需要的CPU核的数目 |
SPARK_WORKER_MEMORY | 每个Worker进程所需要的内存大小,默认为 2G |
SPARK_WORKER_INSTANCES | 每个Worker节点上运行Worker进程的数目,整个集群的 Worker 数量为 worker 节点乘该配置 |
SPARK_WORKER_DIR | Worker 进程的工作目录 |
spark-defaults.conf 常见配置
参数 | 说明 |
---|---|
spark.driver.memory | Driver 的内存配置,默认为 1G,在 Spark 程序中:SparkContext、DAGScheduler 和对应 rdd 的 Stage 切分都是运行在Driver端。如果写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。 |
spark.tispark.plan.allow_agg_pushdown | 允许聚合下推到 TiKV 节点,默认为 true,注意 TiKV 的集群压力 |
spark.tispark.plan.allow_index_double_read | 如果允许索引双读,默认为 false,这可能对TiKV造成很大压力 |
spark.tispark.index.scan_batch_size | 在批处理中有多少行键用于并发索引扫描,默认为 2000000,一般不需要调整 |
spark.tispark.plan.allow_index_read | 开启 index 下推,默认关闭,一般不需要调整,如果 TiKV 压力较小,可以开启。 |
c).SQL 调优
对于 Spark
的 SQL
调优,这里只是简单的介绍。
首先要学会看执行计划,比如:
scala> spark.sql("explain select * from parition where id=1").show(false)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Physical Plan ==
TiSpark CoprocessorRDD{[table: parition] , Columns: [id], [fname], [year], Residual Filter: Not(IsNull([id])), [[id] EQUAL 1], KeyRange: [[116,128,0,0,0,0,0,0,33,95,114,0,0,0,0,0,0,0,0], [116,128,0,0,0,0,0,0,33,95,115,0,0,0,0,0,0,0,0])}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
- 或者
scala> spark.sql("select * from parition where id=1").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('id = 1)
+- 'UnresolvedRelation `test`.`parition`
== Analyzed Logical Plan ==
id: bigint, fname: string, year: bigint
Project [id#73L, fname#74, year#75L]
+- Filter (id#73L = cast(1 as bigint))
+- Relation[id#73L,fname#74,year#75L] TiDBRelation(com.pingcap.tikv.TiSession@3fc378f2,TiTableReference(test,parition,9223372036854775807),com.pingcap.tispark.MetaManager@61d9da53)
== Optimized Logical Plan ==
Filter (isnotnull(id#73L) && (id#73L = 1))
+- Relation[id#73L,fname#74,year#75L] TiDBRelation(com.pingcap.tikv.TiSession@3fc378f2,TiTableReference(test,parition,9223372036854775807),com.pingcap.tispark.MetaManager@61d9da53)
== Physical Plan ==
TiSpark CoprocessorRDD{[table: parition] , Columns: [id], [fname], [year], Residual Filter: Not(IsNull([id])), [[id] EQUAL 1], KeyRange: [[116,128,0,0,0,0,0,0,33,95,114,0,0,0,0,0,0,0,0], [116,128,0,0,0,0,0,0,33,95,115,0,0,0,0,0,0,0,0])}
关于举例这个情况,就是一个简单的点查。
总结
TiSpark
可以暴力计算,在大多数 AP
场景下是要比 TiDB
快的。
对于计算能力,一般来说分布式计算能力高于单节点计算能力。TiSpark
也是有 cop
和 root
的概念,会有部分计算下推到 TiKV 层。
TiSpark 的配置部分可以在线修改,比如开启 index 下推:
spark.conf.set("spark.tispark.plan.allow_index_read", "true")
四、总结
对于较重的 AP 场景,推荐使用 TiSpark 进行替代 TiDB。优势是可以暴力计算,优化了传统的系统架构,较少了对 ETL 的维护。但是需要注意 TiKV 的压力计算,由于 TP 和 AP 同用一个数据源,可能会发生 AP 相关较重的作业导致 TiKV 压力较高,影响线上 TP 业务。