摘要:Spark SQL
,Tidb
依赖准备
需要MySQL连接器驱动mysql-connector-java
,upsert操作需要一个第三方依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.36</version>
</dependency>
<dependency>
<groupId>com.dounine</groupId>
<artifactId>spark-sql-datasource</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
代码记录
先创建一个带有主键的Tidb表
CREATE TABLE `test` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
(1)全量更新overwrite
挂一个jar用shell测试一下
spark2-shell --jars mysql-connector-java-5.1.36.jar
示例代码如下,需要指定"truncate" -> "true"
,否则删表,由程序自动建一个新表,导致数据类型不对,或者主键没有等问题
import org.apache.spark.sql.{SaveMode, SparkSession}
object test {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("yarn").appName("test").enableHiveSupport().getOrCreate()
import spark.implicits._
val df = Seq((1, 9), (2, 3)).toDF("a", "b")
df.write.format("jdbc").options(Map(
"url" -> "jdbc:mysql://172.20.3.78:4000/fin_operation",
"dbtable" -> "test",
"user" -> "username",
"password" -> "password",
"driver" -> "com.mysql.jdbc.Driver",
"truncate" -> "true"))
.mode(SaveMode.Overwrite).save()
}
}
(2)有则更新无则插入
挂两个jar包用shell测试一下
spark2-shell --jars mysql-connector-java-5.1.36.jar,spark-sql-datasource-1.0.1.jar
示例代码如下,参数和正常的jdbc没什么区别
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.jdbc2.JDBCSaveMode
object test {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("yarn").appName("test").enableHiveSupport().getOrCreate()
import spark.implicits._
val df = Seq((1, 9), (2, 3)).toDF("a", "b")
df.write.format("org.apache.spark.sql.execution.datasources.jdbc2").options(
Map(
"savemode" -> JDBCSaveMode.Update.toString,
"driver" -> "com.mysql.jdbc.Driver",
"url" -> "jdbc:mysql://172.20.3.78:4000/fin_operation",
"user" -> "username",
"password" -> "password",
"dbtable" -> "test",
"useSSL" -> "false",
"showSql" -> "false"
)
).save()
}
}
需要插入的数据为
scala> df.show()
+---+---+
| a| b|
+---+---+
| 1| 9|
| 2| 3|
+---+---+
插入Tidb结果如下
MySQL [fin_operation]> select * from test;
+---+------+
| a | b |
+---+------+
| 1 | 9 |
| 2 | 3 |
然后再造两条数据,一条主键冲突,一条是新的id不冲突,测试时候可以upsert
val df = Seq((1, 21), (3, 4)).toDF("a", "b")
再次调用代码插入Tidb结果如下
MySQL [fin_operation]> select * from test;
+---+------+
| a | b |
+---+------+
| 1 | 21 |
| 2 | 3 |
| 3 | 4 |
+---+------+
3 rows in set (0.00 sec)
可以没问题