Spark 第一个scala 程序

1、环境准备

1、JDK配置,Scala 配置 。目前教程环境用的是 hadopp2.6-CDH5.6.0、spark 2.1.0 、jdk 1.7u51、scala2.11.8 。
2、Scala 下载地址 https://www.scala-lang.org/download/all.html 迅雷下载速度更快(PS: 这不是打广告)。
3、 安装Scala 、JDK并配置环境变量(jdk 1.8 也是可以的、scala 要和spark 保持一致,因为可能我们会修改spark源码的需求)
4、IDEA scala 插件下载 。如果没有scala插件 ,IDEA 不能新建 scala 类 和对象 。(如果在IDEA 插件中心下载太慢了可以,查看版本后到 官网下载,或者用迅雷下载 官网: https://plugins.jetbrains.com/idea
5、测试数据存放百度网盘 https://pan.baidu.com/s/1xju3QodxC-abpWADifigyA 密码微信联系我

image

2、IDEA创建项目

1、打开IDE :file - new - project (搭建maven 项目并勾选 create from archetype ,并且选择 scala IDEA 自动帮你配置scala 大部分依赖省不少事)
image
2、 设置maven 坐标 GAV
image
3)、 配置项目maven 版本 设置以及本地仓库地址 (在spark 源码编译对maven 有要求,为了避免出现莫名问题 所以这里指定较高mean版本)
image
image
4、 添加Scala 到IDEA 中
image.png
5、 第一个scala Object
image.png

3、编码阶段

1、业务需求,根据用户日志统计出各服务调用次数日志文件为csv 格式如下:


image.png
2、添加依赖
        <!-- Spark-SQL 依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <!--开源 ip 工具类 依赖根据IP计算出城市,已经安装到本地了, 需要的可以到github 上面找-->
        <dependency>
            <groupId>com.ggstar</groupId>
            <artifactId>ipdatabase</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- ip 工具类 需要读取excel 中ip信息 依赖-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.14</version>
        </dependency>
        <!-- ip 工具类依赖 需要读取excel-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.14</version>
        </dependency>
        <!-- json工具类依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
3、程序代码
package cn.harsons.mbd

import java.util.Locale

import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  *
  * @author lyb
  * @date 2020/3/9 0009
  */
object UserLogStatApp {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {
    // 拿到Spark Session  在本地开发时先设置local 模式发布正式在修改对应的模式
    val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
    // 注册函数,根据ip 获取城市  注册函数可以在Spark SQL 中使用  注意后面必须要使用 空格和下划线" _"
    val city = session.udf.register("getCity", getCity _)
    // 注册函数 时间转换
    val formatTime = session.udf.register("getTime", convertDate _)
    // 注册函数 分割URL 得到用户调用的模块
    val moduleType = session.udf.register("getModuleType", getModuleType _)
    // 以cvs 方式读取文件,cvs 分隔符为; (默认",") 从第一个参数里面读取 并且转成 DataFrame。
    val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
    // 这里面进行过滤, 清洗
    //  剔除第9列值不等于200 或者第四列为空
    // 查询 第一列的值 并且命名为 url 后面语法大体与SQL 类似
    // moduleType 为上面注册的方法主要是分割URL 得到调用的服务路径,city  formatTime  都一样原理
    val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
      .select(
        frame.col("_c1").as("url")
        , moduleType(frame.col("_c1")).as("server")
        , frame.col("_c0").as("ip")
        , city(frame.col("_c0")).as("address")
        , frame.col("_c3").as("userId")
        , frame.col("_c4").as("userName")
        , frame.col("_c5").as("browserName")
        , formatTime(frame.col("_c2")).as("time"))
    //  显示分组统计后的结果,这里可以把结果集输出到 HDFS 或者JDBC show 显示50行结果集
    result.groupBy("server").count().orderBy("count").show(50, false)
    //这里大体意思是 coalesce 指定文件大小,作用分区大小。 model  模式 指定为覆盖 partitionBy 根据什么分区
    // 这里用的是地址 和用户分区  输出文件为JSON
    // 这里也可以指定目录 在json 方法中
    result.coalesce(1).write.mode(SaveMode.Overwrite).partitionBy("address", "userName")
      .json(args(1))
    //关闭session
    session.stop()
  }

  def convertDate(date: String) = {
    format.format(TypeUtils.castToDate(date))
  }

  def getCity(ip: String) = {
    IpHelper.findRegionByIp(ip)
  }

  def getModuleType(url: String) = {
    if (url != null && url.contains("/")) {
      val str = url.replaceAll("//", "/")
      str.substring(str.indexOf("hscp/") + 5).split("/")(0)
    } else {
      ""
    }
  }

}

3、启动参数设置(IDEA 中可以通过program arguments 来设置 输入文件 和输出文件,注意参数之间用空格分开)
image.png
4、成功输出结果
image.png
5、可能出现的错误

1、Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.9-3
原因 调试发现这是由于默认的jackson-databind版本太高导致。

报错代码:


image.png

解决方案:

       <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.6</version>
        </dependency>

2、java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
由于window 下 没有配置好hadoop 环境变量导致。
解决方案:http://down2.opdown.com:8019/opdown/winutilsmaster.opdown.com.zip 配置好环境变量,重启计算机。

6、数据保存到MYSQL的代码如下(记得添加MYSQL驱动)
package cn.harsons.mbd

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Locale

import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.mutable.ListBuffer

/**
  *
  * @author liyabin
  * @date 2020/3/9 0009
  */
object UserStatSaveApp {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {
    // 拿到Spark Session  在本地开发时先设置local 模式发布正式在修改对应的模式
    val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
    // 注册函数,根据ip 获取城市  注册函数可以在Spark SQL 中使用  注意后面必须要使用 空格和下划线" _"
    val city = session.udf.register("getCity", getCity _)
    // 注册函数 时间转换
    val formatTime = session.udf.register("getTime", convertDate _)
    // 注册函数 分割URL 得到用户调用的模块
    val moduleType = session.udf.register("getModuleType", getModuleType _)
    // 以cvs 方式读取文件,cvs 分隔符为; (默认",") 取程序传入的第一个参数当做文件地址读取 并且转成 DataFrame。
    val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
    // 这里面进行过滤, 清洗
    // 如果 剔除第9列值不等于200 或者第四列为空
    // 查询 第一列的值 并且命名为 url 后面语法大体与SQL 类似
    // moduleType 为上面注册的方法主要是分割URL 得到调用的服务路径,city  formatTime  都一样原理
    val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
      .select(
        frame.col("_c1").as("url")
        , moduleType(frame.col("_c1")).as("server")
        , frame.col("_c0").as("ip")
        , city(frame.col("_c0")).as("address")
        , frame.col("_c3").as("userId")
        , frame.col("_c4").as("userName")
        , frame.col("_c5").as("browserName")
        , formatTime(frame.col("_c2")).as("time"))
    // 保存统计结果
    readToServerStat(result.groupBy("server").count().orderBy("count"))
    //清洗后的数据写入数据库,也可以写入HDFS 以及任何HADOOP 支持的路径 。这次案例写入MYSQL数据库
    readToUserLog(result)
    //关闭session
    session.stop()
  }

  def readToServerStat(value: Dataset[Row]): Unit = {
    value.foreachPartition(partitionOfRecords => {
      val buffer = new ListBuffer[ServerBean]
      partitionOfRecords.foreach(row => {
        val server = row.getAs[String]("server")
        val count = row.getAs[Long]("count")
        buffer.append(ServerBean(server, count))
      })
      saveServerStat(buffer)
    })
  }

  def readToUserLog(value: Dataset[Row]): Unit = {
    value.foreachPartition(partitionOfRecords => {
      val buffer = new ListBuffer[UserLogBean]
      partitionOfRecords.foreach(row => {
        val url = row.getAs[String]("url")
        val address = row.getAs[String]("address")
        val server = row.getAs[String]("server")
        val ip = row.getAs[String]("ip")
        val userId = row.getAs[String]("userId")
        val userName = row.getAs[String]("userName")
        val browserName = row.getAs[String]("browserName")
        val time = row.getAs[String]("time")
        buffer.append(UserLogBean(server, url, ip, address, userId, userName, browserName, time))
      })
      saveUserLog(buffer)
    })
  }


  def saveServerStat(list: ListBuffer[ServerBean]) = {

    val connection = getConnection()
    connection.setAutoCommit(false)
    //todo
    val sql = "insert into server_stat(server,count) values (?,?) "
    val statement = connection.prepareStatement(sql)

    for (bean <- list) {
      statement.setString(1, bean.server)
      statement.setLong(2, bean.count)
      statement.addBatch()
    }
    statement.executeBatch() // 执行批量处理
    connection.commit() //手工提交
    release(connection, statement)
  }

  def saveUserLog(list: ListBuffer[UserLogBean]) = {
    val connection = getConnection()
    connection.setAutoCommit(false)
    val statement = connection.prepareStatement("insert into user_log(url,address,server,ip,userId,userName,browserName,time) values (?,?,?,?,?,?,?,?)")
    // todo
    for (bean <- list) {
      statement.setString(1, bean.url)
      statement.setString(2, bean.address)
      statement.setString(3, bean.server)
      statement.setString(4, bean.ip)
      statement.setString(5, bean.userId)
      statement.setString(6, bean.userName)
      statement.setString(7, bean.browserName)
      statement.setString(8, bean.time)
      statement.addBatch()
    }
    statement.executeBatch() // 执行批量处理
    connection.commit() //手工提交
    release(connection, statement)

  }

  /**
    * 转换日期
    *
    * @param date
    * @return
    */
  def convertDate(date: String) = {
    format.format(TypeUtils.castToDate(date))
  }

  /**
    * 根据ip计算出城市信息
    *
    * @param ip
    * @return
    */
  def getCity(ip: String) = {
    try {
      IpHelper.findRegionByIp(ip)
    } catch {
      case e: Exception => "未知"
    }
  }

  /**
    * 切割字符串
    *
    * @param url 用户请求路径
    * @return
    */
  def getModuleType(url: String) = {
    if (url != null && url.contains("/")) {
      val str = url.replaceAll("//", "/")
      str.substring(str.indexOf("hscp/") + 5).split("/")(0)
    } else {
      ""
    }
  }

  case class ServerBean(server: String, count: Long)

  case class UserLogBean(server: String, url: String, ip: String, address: String, userId: String
                         , userName: String, browserName: String, time: String)

  def getConnection() = {
    // 这里把数据库与地址写死,正式程序 可以改成配置式
    DriverManager.getConnection("jdbc:mysql://192.168.137.1:3306/spark?user=root&password=123456")
  }

  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
      }
    }
  }

}

下一章-程序打包

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容