import java.util.Properties
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable
object Test01 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder()
.master("local")
.appName(this.getClass.getSimpleName)
.getOrCreate()
// dataMask(session, "D:\\testLogs\\userinfo.txt", "D:\\testLogs\\userinfo")
// datainfoToJson(session, "D:\\testLogs\\userinfo", "D:\\testLogs\\idRules.txt", "D:\\testLogs\\userinfoToJson")
datainfoToMysql(session,"D:\\testLogs\\userinfoToJson","countprovince")
session.close()
session.stop()
}
// 1.使用spark读取文本文件,将身份证号脱敏,格式为: 620402********2111,姓名脱敏,为: 黄**,并将脱敏后的身份证和姓名保存为userInfo.parquet文件(只需要脱敏后的身份证号和姓名)
def dataMask(session: SparkSession, input: String, output: String): Unit = {
val sc: SparkContext = session.sparkContext
// 350211199006033016|1|7|黄测佳|10|2|||1|||||2018-10-26|2018-10-26|||
// 1)、读取数据
val rdd: RDD[String] = sc.textFile(input)
// 2)、引入隐式
import session.implicits._
val rdd1: RDD[(String, String)] = rdd.map(t => {
// 3)、切分数据
val str: mutable.ArrayOps[String] = t.split("\\|")
// 4)、过滤脏数据(只需要格式正确的数据)
if (str.size == 15 && !str(0).isEmpty && !str(3).isEmpty && (str(0).size == 15 || str(0).size == 18)) {
// 5)、身份证号脱敏(中间八位替换为“*”)
val id: String = str(0).replace(str(0).substring(6, 14), "********")
var star = ""
// 6)、姓名脱敏(保留姓氏,名替换为“*”)
for (i <- 1 to str(3).trim.size - 1) star += "*"
val name: String = str(3).trim.substring(0, 1) + star
// 7)、数据有效输出,无效用(无效信息)替代
(id, name)
} else
("无效信息", "无效信息")
})
// 8)、写出为parquet格式的数据
rdd1.toDF("idnumber", "uname").write.parquet(output)
}
//2.读取userInfo.parquet文件,统计每个人的省份,并将身份证号idnumber,姓名uname和省份信息province保存成userInfo.json格式文件
def datainfoToJson(session: SparkSession, inputParquet: String, inputIdRules: String, output: String): Unit = {
// 1)、读取parquet数据:(350211********3016,黄**)
val userinfo: RDD[(String, String)] = session.read.parquet(inputParquet).rdd.map(t => (t.getString(0), t.getString(1)))
// 2)、 读取文本数据: (福建350000)
val idRules: Array[(String, String)] = session.read.textFile(inputIdRules).rdd.map(t => {
// 3)、过滤如果不是数字为省份,是数字为编号
(t.filterNot(_.isDigit), t.filter(_.isDigit))
// 4)、RDD中不能操作RDD,通过collect操作为Array(小型文件)
}).collect()
// 5)、引入隐式
import session.implicits._
val df: DataFrame = userinfo.map(t => {
// 6)、根据需求身份证号前三位和编号对比,得到省份信息(相当于二嵌套循环过滤数据)
val tuples: Array[(String, String)] = idRules.filter(f => f._2.substring(0, 2) == t._1.substring(0, 2))
// 7)、如果数据无效使用(未知)替代,否则输出数据
if (tuples.isEmpty) {
(t._1, t._2, "未知")
} else {
(t._1, t._2, tuples(0)._1)
}
}).toDF("idnumber", "uname", "province")
// 8)、写出为json格式数据
df.write.json(output)
}
// 3.统计各省份人次,(同一身份证多次出现则多次计算),按人次降序保存到sql的count_province表中, 使用配置文件载入数据库配置
def datainfoToMysql(session:SparkSession,inputJson:String,tableName:String): Unit ={
// 1)、读取json数据
val frame: DataFrame = session.read.json(inputJson)
// 2)、创建临时表
frame.createTempView("count_province")
// 3)、打印schema信息
frame.printSchema()
// 4)、获取mysql连接相关参数
val param: ConnectionProperties = getPropertiesParams()
// 5)、通过sql语句查询:省份、人次,并排序
val df: DataFrame = session.sql("select count(uname) as count,province from count_province group by province order by count desc")
// 6)、通过jdbc写出
df.write.mode("append").jdbc(param.url, tableName, param.pro)
println("Saving data to a JDBC source win!")
}
// 获取配置文件中的相关参数信息
def getPropertiesParams() = {
val config: Config = ConfigFactory.load("jdbcMysql.properties")
val url: String = config.getString("MYSQL_URL")
val user: String = config.getString("MYSQL_USER")
val password: String = config.getString("MYSQL_PASSWORD")
val table: String = config.getString("MYSQL_DBTABLE")
val driver: String = config.getString("MYSQL_DRIVER")
val pro: Properties = new Properties()
pro.setProperty("user", user)
pro.setProperty("password", password)
pro.setProperty("driver", driver)
ConnectionProperties(url, user, password, table, driver, pro)
}
}
case class ConnectionProperties(url: String, user: String, password: String, table: String, driver: String, pro: Properties)
case class coutProvince(province: String, count: Long)
ComprehensivePractice
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 下文写于两年前,一直未公开,现在也换公司了,公开发布。 (一)需求本质 用户需求本质来源于人的需求本质,根据马斯洛...