1、环境准备
1、JDK配置,Scala 配置 。目前教程环境用的是 hadopp2.6-CDH5.6.0、spark 2.1.0 、jdk 1.7u51、scala2.11.8 。
2、Scala 下载地址 https://www.scala-lang.org/download/all.html 迅雷下载速度更快(PS: 这不是打广告)。
3、 安装Scala 、JDK并配置环境变量(jdk 1.8 也是可以的、scala 要和spark 保持一致,因为可能我们会修改spark源码的需求)
4、IDEA scala 插件下载 。如果没有scala插件 ,IDEA 不能新建 scala 类 和对象 。(如果在IDEA 插件中心下载太慢了可以,查看版本后到 官网下载,或者用迅雷下载 官网: https://plugins.jetbrains.com/idea)
5、测试数据存放百度网盘 https://pan.baidu.com/s/1xju3QodxC-abpWADifigyA 密码微信联系我
2、IDEA创建项目
1、打开IDE :file - new - project (搭建maven 项目并勾选 create from archetype ,并且选择 scala IDEA 自动帮你配置scala 大部分依赖省不少事)
2、 设置maven 坐标 GAV
3)、 配置项目maven 版本 设置以及本地仓库地址 (在spark 源码编译对maven 有要求,为了避免出现莫名问题 所以这里指定较高mean版本)
4、 添加Scala 到IDEA 中
5、 第一个scala Object
3、编码阶段
1、业务需求,根据用户日志统计出各服务调用次数日志文件为csv 格式如下:
2、添加依赖
<!-- Spark-SQL 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!--开源 ip 工具类 依赖根据IP计算出城市,已经安装到本地了, 需要的可以到github 上面找-->
<dependency>
<groupId>com.ggstar</groupId>
<artifactId>ipdatabase</artifactId>
<version>1.0</version>
</dependency>
<!-- ip 工具类 需要读取excel 中ip信息 依赖-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.14</version>
</dependency>
<!-- ip 工具类依赖 需要读取excel-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.14</version>
</dependency>
<!-- json工具类依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
3、程序代码
package cn.harsons.mbd
import java.util.Locale
import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
*
* @author lyb
* @date 2020/3/9 0009
*/
object UserLogStatApp {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)
def main(args: Array[String]): Unit = {
// 拿到Spark Session 在本地开发时先设置local 模式发布正式在修改对应的模式
val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
// 注册函数,根据ip 获取城市 注册函数可以在Spark SQL 中使用 注意后面必须要使用 空格和下划线" _"
val city = session.udf.register("getCity", getCity _)
// 注册函数 时间转换
val formatTime = session.udf.register("getTime", convertDate _)
// 注册函数 分割URL 得到用户调用的模块
val moduleType = session.udf.register("getModuleType", getModuleType _)
// 以cvs 方式读取文件,cvs 分隔符为; (默认",") 从第一个参数里面读取 并且转成 DataFrame。
val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
// 这里面进行过滤, 清洗
// 剔除第9列值不等于200 或者第四列为空
// 查询 第一列的值 并且命名为 url 后面语法大体与SQL 类似
// moduleType 为上面注册的方法主要是分割URL 得到调用的服务路径,city formatTime 都一样原理
val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
.select(
frame.col("_c1").as("url")
, moduleType(frame.col("_c1")).as("server")
, frame.col("_c0").as("ip")
, city(frame.col("_c0")).as("address")
, frame.col("_c3").as("userId")
, frame.col("_c4").as("userName")
, frame.col("_c5").as("browserName")
, formatTime(frame.col("_c2")).as("time"))
// 显示分组统计后的结果,这里可以把结果集输出到 HDFS 或者JDBC show 显示50行结果集
result.groupBy("server").count().orderBy("count").show(50, false)
//这里大体意思是 coalesce 指定文件大小,作用分区大小。 model 模式 指定为覆盖 partitionBy 根据什么分区
// 这里用的是地址 和用户分区 输出文件为JSON
// 这里也可以指定目录 在json 方法中
result.coalesce(1).write.mode(SaveMode.Overwrite).partitionBy("address", "userName")
.json(args(1))
//关闭session
session.stop()
}
def convertDate(date: String) = {
format.format(TypeUtils.castToDate(date))
}
def getCity(ip: String) = {
IpHelper.findRegionByIp(ip)
}
def getModuleType(url: String) = {
if (url != null && url.contains("/")) {
val str = url.replaceAll("//", "/")
str.substring(str.indexOf("hscp/") + 5).split("/")(0)
} else {
""
}
}
}
3、启动参数设置(IDEA 中可以通过program arguments 来设置 输入文件 和输出文件,注意参数之间用空格分开)
4、成功输出结果
5、可能出现的错误
1、Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.9-3
原因 调试发现这是由于默认的jackson-databind版本太高导致。
报错代码:
解决方案:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
2、java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
由于window 下 没有配置好hadoop 环境变量导致。
解决方案:http://down2.opdown.com:8019/opdown/winutilsmaster.opdown.com.zip 配置好环境变量,重启计算机。
6、数据保存到MYSQL的代码如下(记得添加MYSQL驱动)
package cn.harsons.mbd
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Locale
import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.mutable.ListBuffer
/**
*
* @author liyabin
* @date 2020/3/9 0009
*/
object UserStatSaveApp {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)
def main(args: Array[String]): Unit = {
// 拿到Spark Session 在本地开发时先设置local 模式发布正式在修改对应的模式
val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
// 注册函数,根据ip 获取城市 注册函数可以在Spark SQL 中使用 注意后面必须要使用 空格和下划线" _"
val city = session.udf.register("getCity", getCity _)
// 注册函数 时间转换
val formatTime = session.udf.register("getTime", convertDate _)
// 注册函数 分割URL 得到用户调用的模块
val moduleType = session.udf.register("getModuleType", getModuleType _)
// 以cvs 方式读取文件,cvs 分隔符为; (默认",") 取程序传入的第一个参数当做文件地址读取 并且转成 DataFrame。
val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
// 这里面进行过滤, 清洗
// 如果 剔除第9列值不等于200 或者第四列为空
// 查询 第一列的值 并且命名为 url 后面语法大体与SQL 类似
// moduleType 为上面注册的方法主要是分割URL 得到调用的服务路径,city formatTime 都一样原理
val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
.select(
frame.col("_c1").as("url")
, moduleType(frame.col("_c1")).as("server")
, frame.col("_c0").as("ip")
, city(frame.col("_c0")).as("address")
, frame.col("_c3").as("userId")
, frame.col("_c4").as("userName")
, frame.col("_c5").as("browserName")
, formatTime(frame.col("_c2")).as("time"))
// 保存统计结果
readToServerStat(result.groupBy("server").count().orderBy("count"))
//清洗后的数据写入数据库,也可以写入HDFS 以及任何HADOOP 支持的路径 。这次案例写入MYSQL数据库
readToUserLog(result)
//关闭session
session.stop()
}
def readToServerStat(value: Dataset[Row]): Unit = {
value.foreachPartition(partitionOfRecords => {
val buffer = new ListBuffer[ServerBean]
partitionOfRecords.foreach(row => {
val server = row.getAs[String]("server")
val count = row.getAs[Long]("count")
buffer.append(ServerBean(server, count))
})
saveServerStat(buffer)
})
}
def readToUserLog(value: Dataset[Row]): Unit = {
value.foreachPartition(partitionOfRecords => {
val buffer = new ListBuffer[UserLogBean]
partitionOfRecords.foreach(row => {
val url = row.getAs[String]("url")
val address = row.getAs[String]("address")
val server = row.getAs[String]("server")
val ip = row.getAs[String]("ip")
val userId = row.getAs[String]("userId")
val userName = row.getAs[String]("userName")
val browserName = row.getAs[String]("browserName")
val time = row.getAs[String]("time")
buffer.append(UserLogBean(server, url, ip, address, userId, userName, browserName, time))
})
saveUserLog(buffer)
})
}
def saveServerStat(list: ListBuffer[ServerBean]) = {
val connection = getConnection()
connection.setAutoCommit(false)
//todo
val sql = "insert into server_stat(server,count) values (?,?) "
val statement = connection.prepareStatement(sql)
for (bean <- list) {
statement.setString(1, bean.server)
statement.setLong(2, bean.count)
statement.addBatch()
}
statement.executeBatch() // 执行批量处理
connection.commit() //手工提交
release(connection, statement)
}
def saveUserLog(list: ListBuffer[UserLogBean]) = {
val connection = getConnection()
connection.setAutoCommit(false)
val statement = connection.prepareStatement("insert into user_log(url,address,server,ip,userId,userName,browserName,time) values (?,?,?,?,?,?,?,?)")
// todo
for (bean <- list) {
statement.setString(1, bean.url)
statement.setString(2, bean.address)
statement.setString(3, bean.server)
statement.setString(4, bean.ip)
statement.setString(5, bean.userId)
statement.setString(6, bean.userName)
statement.setString(7, bean.browserName)
statement.setString(8, bean.time)
statement.addBatch()
}
statement.executeBatch() // 执行批量处理
connection.commit() //手工提交
release(connection, statement)
}
/**
* 转换日期
*
* @param date
* @return
*/
def convertDate(date: String) = {
format.format(TypeUtils.castToDate(date))
}
/**
* 根据ip计算出城市信息
*
* @param ip
* @return
*/
def getCity(ip: String) = {
try {
IpHelper.findRegionByIp(ip)
} catch {
case e: Exception => "未知"
}
}
/**
* 切割字符串
*
* @param url 用户请求路径
* @return
*/
def getModuleType(url: String) = {
if (url != null && url.contains("/")) {
val str = url.replaceAll("//", "/")
str.substring(str.indexOf("hscp/") + 5).split("/")(0)
} else {
""
}
}
case class ServerBean(server: String, count: Long)
case class UserLogBean(server: String, url: String, ip: String, address: String, userId: String
, userName: String, browserName: String, time: String)
def getConnection() = {
// 这里把数据库与地址写死,正式程序 可以改成配置式
DriverManager.getConnection("jdbc:mysql://192.168.137.1:3306/spark?user=root&password=123456")
}
def release(connection: Connection, pstmt: PreparedStatement): Unit = {
try {
if (pstmt != null) {
pstmt.close()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
}
}
}