Flink基础使用

一flink 基础使用

此处主要介绍两部分基础内容案例,

  • 使用wiki作为连接器,读取日志数据发送到kafka队列
  • 读取socket流, 实现单词计数功能
  • 代码提交jar包到远程服务器

1.1 使用wiki流 ,发送数据到kafka

首先是创建maven工程,maven pom文件如下:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-wikiedits_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

其中flink-connector-wikiedits_2.11是用于操作wiki上数据使用的Wikipedia 连接器.
接下来是应用程序的说明,首先创建FLink 的SteamingExecutionEnvironment变量(如果是批处理任务的话则创建ExecutionEnvironment对象)它用于设置程序所需要的执行参数,读取创建资源文件。

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

通过env添加资源,用于读取wiki ipc 日志的资源

val edits:DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource())

相当于建立了一根桥梁, 打开flink与wiki的桥梁,

val keyedEdits:KeyedStream[WikipediaEditEvent, String] = edits.keyBy({ (event: WikipediaEditEvent) => {
      event.getUser
    }
    })

上面是获取一个wiki的流

val secondR = keyedEdits
      .timeWindow(Time.seconds(5))

      .fold(WikiModel("","",0L,"","","",0))(getWikiEvent)
      .map(res => res.user)
      .addSink(new FlinkKafkaProducer010[String]("kason-pc:9092","wiki-result",new SimpleStringSchema()))

首先创建了一个model,WikiModel

case class WikiModel(user: String,title: String,time: Long, summary: String, diffUrl: String, channel: String, byteDiff: Int)

之后将getWikiEvent方法传入,为了获取数据封装成对象,来看看此基本方法

def getWikiEvent(data: WikiModel, event: WikipediaEditEvent): WikiModel = {
    val user: String = event.getUser
    val title: String = event.getTitle
    val time: Long = event.getTimestamp
    val summary: String = event.getSummary
    val diffUrl: String = event.getDiffUrl
    val channel: String = event.getChannel
    val byteDiff: Int = event.getByteDiff

    WikiModel(user,title,time,summary,diffUrl,channel,byteDiff)
  }

说明此方法只是为了讲wiki流的每一个新获得的数据转换成WikiModel对象
addSink(new FlinkKafkaProducer010[String]("kason-pc:9092","wiki-result",new SimpleStringSchema())这个是为了将数据发送到kafka里面, 此时会自动创建topic为wiki-result的kafka topic,最后通过env.execute()来执行应用程序,开启每隔5s钟一个窗口。

结果如下

image.png

1.2 socket流单词计数

其基本情况就是使用linux下的nc -l 9000在9000端口上发送数据, flink通过读取此端口的数据,处理10s间隔的窗口,对此窗口的数据进行单词计数(注意只对此窗口时间内的数据)
其ubuntu端的发送数据截屏如下

image.png

flink程序如下:

package quickstart
import java.sql.Date
import java.text.SimpleDateFormat

import org.apache.flink.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCount {

  def main(args: Array[String]) : Unit = {

    val simple = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    // the port to connect to
    val port: Int = try {
      ParameterTool.fromArgs(args).getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
        return
      }
    }

    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream("kason-pc", port, '\n')

    // parse the data, group it, window it, and aggregate the counts
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map { w => WordWithCount(w, 1, simple.format(new Date(System.currentTimeMillis()))) }
      .keyBy("word")
      .timeWindow(Time.seconds(10), Time.seconds(1))
      .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  // Data type for words with count
  case class WordWithCount(word: String, count: Long, time: String)
}

此处获取的结果如下图:

image.png

这里分析一下输出的结果, 上面的代码是每隔1s处理10s socket发送过来的数据,所以开始时输入的zk zk ll 当1s时间到来时, 处理的数据只有zk zk ll所以此时输出应该是zk,2 ll,1, 之后又输入zk kk pp, 然后当1s到来时, 需要处理的数据就是zk zk ll zk ll pp了, 此时结果输出应该是zk 3 ll 2 pp 1, 然后又输入了zk ll kk 所以1s后处理的数据就变成了 zk zk ll zk ll pp zk ll kk 所以数据输出为zk 4 ll 3 pp 1 kk 1, 然后时间过到11s时, 窗口10s内的数据就变成了 zk ll pp zk ll kk,所以结果变成了zk 2 ll 2 pp 1 kk 1, 然后时间过到12s时, 窗口10s内就只剩下zk ll kk了, 此时结果输出就是zk 1 ll 1 kk1, 然后时间过到13时, 窗口10s内没有数据了,此时也就没有输出了。
此处需要介绍一下flink的这个时间窗口,timeWindow
所谓的TIme Window实际上就是根据时间对数据流进行分组的, 也可以认为是根据固定时间进行切割数据, timeWindow有两种类型: 翻滚时间窗口, 滑动时间窗口。

  • 翻滚时间窗口:
    翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。可以认为是数据不能跨窗口, 此时相当于只给timeWIndow传入一个时间参数。比如一个例子:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window), 比如你调整本例子的timeWindow函数为timeWindow(Time.seconds(1))
    此时通过nc -l 9000每隔一秒发送数据:
image.png

结果如下:

image.png
  • 滑动时间窗口
    对于某些应用,它们需要的窗口是不间断的,需要平滑地进行窗口聚合。比如,我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口, 也就是数据可以跨窗口, 如本例
image.png

1.3 代码提交jar包到远程服务器

首先打包可执行程序为jar包, 本例子使用的maven

<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
    <plugins>
    <plugin>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
      <configuration>
        <scalaVersion>${scala.version}</scalaVersion>
        <args>
          <arg>-target:jvm-1.5</arg>
        </args>
      </configuration>
    </plugin>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <appendAssemblyId>false</appendAssemblyId>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>quickstart.SocketWindowWordCount</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

然后修改应用程序代码

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("kason-pc",6123,"/home/kason/workspace/BigdataComponents/target/BigdataComponents-1.0-SNAPSHOT.jar")

应用提交之后IDEA 显示

image.png

登录kason-pc:8081 查看RUNNING jOBS

image.png

根据JOB Name 来找到你提交的程序, 点进去

image.png

查看应用程序输出结果:
到log日志目录中去查看, 结果如下, 和本地调试结果一致:

日志结果.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容