Version
Spark:1.6.2
Hive : 1.1.0
先看下代码:::::::::
object SparkSql_Hive_Mysql {
def main(args: Array[String]): Unit = {
val url = "jdbc:mysql://0.0.0.0:3306/data?characterEncoding=UTF-8&serverTimezone=CST"
val tableName = "test"
val prop = new Properties()
prop.setProperty("user", "username")
prop.setProperty("password", "pwd")
prop.setProperty("useSSL", "false");
val conf = new SparkConf()
.setAppName("test")
.setMaster("local[5]")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
val sql = "SELECT * FROM test"
val retDF = sqlContext.sql(sql)
/**
* 将结果集已追加的方式写入 mysql 数据库
* Overwrite 覆盖
* Append 追加
* Ignore 忽略
*/
retDF.write.mode(SaveMode.Overwrite) jdbc(url, tableName, prop)
sc.stop()
}
}
使用封装好的写入方式:::::
retDF.write.mode(SaveMode.Overwrite) jdbc(url, tableName, prop)
现在说下我遇到的问题:::::
在任务执行过程中出现了超过Mysql 数据库链接的最大数
下面看下这个方法的源码:::::
def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
getJdbcType(field.dataType, dialect).jdbcNullType
}
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
}
#并且链接也都在使用后进行了关闭回收::::
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
// The stage must succeed. We cannot propagate any exception close() might throw.
try {
conn.close()
} catch {
case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
}
}
}
也就数说再不能增加Mysql最大链接数配置的情况下,使用
Spark 中自带的 coalesce(“numPartition”)方法尽量减少分区数
以此来减少创建的Mysql 连接数,来减少因为超过最大连接数而导致的问题。
至于为什么用coalesce方法而不用repartition方法我在另一篇文章中有讲到。
感兴趣可以点击查看!传送门在此