1、引入依赖
使用的是java开发语言,以下是主要的pom.xml配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
2、开发步骤
- 获取执行环境(批处理或流处理)
- 通过source加载数据
- 执行算子操作
- 通过sink输出数据
- execute执行
3、流处理demo
通过socket模拟输入流,统计单词个数
package streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import lombok.Data;
/**
* @author pxl
*/
public class SocketWordCount {
public static void main(String[] args) throws Exception {
String hostname = "localhost";
String delimiter = "\n";
int port = 9000;
// 获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 链接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
SingleOutputStreamOperator<Word> stream = text.flatMap(new FlatMapFunction<String, Word>() {
@Override
public void flatMap(String s, Collector<Word> out) {
String[] line = s.split("\\s");
for (String word : line) {
out.collect(new Word(word, 1));
}
}
});
SingleOutputStreamOperator<Word> sum = stream.keyBy("word").sum("count");
sum.print().setParallelism(1);
env.execute("job is running");
}
@Data
public static class Word {
String word;
int count;
public Word() {
}
public Word(String word, int count) {
this.word = word;
this.count = count;
}
}
}
启动控制面板,输入内容:
启动程序,输出内容如下:
4、批处理demo
读取txt文本进行分词,统计词频
package batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author pxl
*/
public class FileWordCount {
public static void main(String[] args) throws Exception {
String filePath = "/Users/xiaolong/tmp/poem.txt";
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> file = env.readTextFile(filePath);
DataSet<Tuple2<String, Integer>> data = file.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = s.split("\\W+");
for (String word : words) {
if (word.trim().length() > 0) {
out.collect(new Tuple2<>(word.trim(), 1));
}
}
}
}).groupBy(0).sum(1);
data.print();
}
}
输出结果: