一flink 基础使用
此处主要介绍两部分基础内容案例,
- 使用wiki作为连接器,读取日志数据发送到kafka队列
- 读取socket流, 实现单词计数功能
- 代码提交jar包到远程服务器
1.1 使用wiki流 ,发送数据到kafka
首先是创建maven工程,maven pom文件如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
其中flink-connector-wikiedits_2.11是用于操作wiki上数据使用的Wikipedia 连接器.
接下来是应用程序的说明,首先创建FLink 的SteamingExecutionEnvironment变量(如果是批处理任务的话则创建ExecutionEnvironment对象)它用于设置程序所需要的执行参数,读取创建资源文件。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
通过env添加资源,用于读取wiki ipc 日志的资源
val edits:DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource())
相当于建立了一根桥梁, 打开flink与wiki的桥梁,
val keyedEdits:KeyedStream[WikipediaEditEvent, String] = edits.keyBy({ (event: WikipediaEditEvent) => {
event.getUser
}
})
上面是获取一个wiki的流
val secondR = keyedEdits
.timeWindow(Time.seconds(5))
.fold(WikiModel("","",0L,"","","",0))(getWikiEvent)
.map(res => res.user)
.addSink(new FlinkKafkaProducer010[String]("kason-pc:9092","wiki-result",new SimpleStringSchema()))
首先创建了一个model,WikiModel
case class WikiModel(user: String,title: String,time: Long, summary: String, diffUrl: String, channel: String, byteDiff: Int)
之后将getWikiEvent方法传入,为了获取数据封装成对象,来看看此基本方法
def getWikiEvent(data: WikiModel, event: WikipediaEditEvent): WikiModel = {
val user: String = event.getUser
val title: String = event.getTitle
val time: Long = event.getTimestamp
val summary: String = event.getSummary
val diffUrl: String = event.getDiffUrl
val channel: String = event.getChannel
val byteDiff: Int = event.getByteDiff
WikiModel(user,title,time,summary,diffUrl,channel,byteDiff)
}
说明此方法只是为了讲wiki流的每一个新获得的数据转换成WikiModel对象
addSink(new FlinkKafkaProducer010[String]("kason-pc:9092","wiki-result",new SimpleStringSchema())这个是为了将数据发送到kafka里面, 此时会自动创建topic为wiki-result的kafka topic,最后通过env.execute()来执行应用程序,开启每隔5s钟一个窗口。
结果如下
1.2 socket流单词计数
其基本情况就是使用linux下的nc -l 9000在9000端口上发送数据, flink通过读取此端口的数据,处理10s间隔的窗口,对此窗口的数据进行单词计数(注意只对此窗口时间内的数据)
其ubuntu端的发送数据截屏如下
flink程序如下:
package quickstart
import java.sql.Date
import java.text.SimpleDateFormat
import org.apache.flink.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
val simple = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text: DataStream[String] = env.socketTextStream("kason-pc", port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1, simple.format(new Date(System.currentTimeMillis()))) }
.keyBy("word")
.timeWindow(Time.seconds(10), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long, time: String)
}
此处获取的结果如下图:
这里分析一下输出的结果, 上面的代码是每隔1s处理10s socket发送过来的数据,所以开始时输入的zk zk ll 当1s时间到来时, 处理的数据只有zk zk ll所以此时输出应该是zk,2 ll,1, 之后又输入zk kk pp, 然后当1s到来时, 需要处理的数据就是zk zk ll zk ll pp了, 此时结果输出应该是zk 3 ll 2 pp 1, 然后又输入了zk ll kk 所以1s后处理的数据就变成了 zk zk ll zk ll pp zk ll kk 所以数据输出为zk 4 ll 3 pp 1 kk 1, 然后时间过到11s时, 窗口10s内的数据就变成了 zk ll pp zk ll kk,所以结果变成了zk 2 ll 2 pp 1 kk 1, 然后时间过到12s时, 窗口10s内就只剩下zk ll kk了, 此时结果输出就是zk 1 ll 1 kk1, 然后时间过到13时, 窗口10s内没有数据了,此时也就没有输出了。
此处需要介绍一下flink的这个时间窗口,timeWindow
所谓的TIme Window实际上就是根据时间对数据流进行分组的, 也可以认为是根据固定时间进行切割数据, timeWindow有两种类型: 翻滚时间窗口, 滑动时间窗口。
- 翻滚时间窗口:
翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。可以认为是数据不能跨窗口, 此时相当于只给timeWIndow传入一个时间参数。比如一个例子:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window), 比如你调整本例子的timeWindow函数为timeWindow(Time.seconds(1))
此时通过nc -l 9000每隔一秒发送数据:
结果如下:
- 滑动时间窗口
对于某些应用,它们需要的窗口是不间断的,需要平滑地进行窗口聚合。比如,我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口, 也就是数据可以跨窗口, 如本例
1.3 代码提交jar包到远程服务器
首先打包可执行程序为jar包, 本例子使用的maven
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>quickstart.SocketWindowWordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
然后修改应用程序代码
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("kason-pc",6123,"/home/kason/workspace/BigdataComponents/target/BigdataComponents-1.0-SNAPSHOT.jar")
应用提交之后IDEA 显示
登录kason-pc:8081 查看RUNNING jOBS
根据JOB Name 来找到你提交的程序, 点进去
查看应用程序输出结果:
到log日志目录中去查看, 结果如下, 和本地调试结果一致: