1 Kafka
1.1 Kafka Source
// kafka配置文件
val properties = new Properties()
properties.setProperty("bootstrap.servers","localhost:9092")
properties.setProperty("group.id","consumer-group")
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("auto.offset.reset","latest")
// kafkaSource
val kafkaSource = new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties)
env.addSource(kafkaSource)
1.2 Kafka Sink
val sink = new FlinkKafkaProducer011[String](
"localhost:9092",
"topic1",
new SimpleStringSchema())
datastream.addSink(sink)
2 RabbitMQ
2.1 RabbitMQ Source
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build
val stream = env
.addSource(new RMQSource[String](
connectionConfig, // RabbitMQ 连接配置文件
"queueName", // RabbitMQ 队列名称
true,
new SimpleStringSchema))
.setParallelism(1)
2.2 RabbitMQ Sink
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5000)
...
.build
stream.addSink(new RMQSink[String](
connectionConfig,
"queueName",
new SimpleStringSchema))
3 ElasticSearch
3.1 ES Sink
// 定义httphst
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[String](
httpHosts,
new ElasticsearchSinkFunction[String] {
def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
val json = new java.util.HashMap[String, String]
json.put("data", element)
val rqst: IndexRequest = Requests.indexRequest
.index("my-index")
.`type`("my-type")
.source(json)
indexer.add(rqst)
}
}
)
input.addSink(esSinkBuilder.build)
3.2 ES Sink(Sink到带密码的ES Java)
//从配置文件中获取es的地址
ListhttpHosts =new ArrayList<>();
httpHosts.add(new HttpHost(parameterTool.getRequired("ELASTICSEARCH_HOST"),9200,"http"));
// 创建elasticsearch Sink
ElasticsearchSink.BuilderesSinkBuilder =new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction() {
public IndexRequest createIndexRequest(String element) {
Mapjson =new HashMap<>();
String line =element.substring(element.split("\\|\\^\\|")[0].length() +3);
KafkaEvent kafkaEvent =new KafkaEvent().fromString(line);
json.put("dts_id",kafkaEvent.getDid().toString());
json.put("business_time",kafkaEvent.getBt());
json.put("protocol_id",kafkaEvent.getPi());
json.put("user_view_status",kafkaEvent.getUvs().longValue());
return Requests.indexRequest()
.id(element.split("\\|\\^\\|")[0])
.index(parameterTool.getRequired("ES_INDEX_NAME"))
.type(parameterTool.getRequired("ES_INDEX_TYPE"))//ES_INDEX_TYPE
.source(json);
}
@Override
public void process(String element,RuntimeContext ctx,RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
//批处理最大数
esSinkBuilder.setBulkFlushMaxActions(parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS",40));
esSinkBuilder.setRestClientFactory(
new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
final CredentialsProvider credentialsProvider =new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(parameterTool.getRequired("ELASTICSEARCH_NAME"),
parameterTool.getRequired("ELASTICSEARCH_PASSWD")));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {//设置自定义http客户端配置
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})/*.setMaxRetryTimeoutMillis(2000)*/;
}
}
);
resulted.addSink(esSinkBuilder.build());
4 File
4.1 File Source
env.readTextFile(inputPath)
4.2 File Sink
val sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
.build()
stream.addSink(sink)
5 JDBC
5.1 JDBC Sink
env .fromElements(...)
.addSink(JdbcSink.exactlyOnceSink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
JdbcExecutionOptions.builder().build(),
JdbcExactlyOnceOptions.defaults(),
() -> {
// 创建数据源
EmbeddedXADataSource ds = new EmbeddedXADataSource();
ds.setDatabaseName("my_db");
return ds;
});
env.execute();
6 Resid
6.1 Redis Sink
// CaseClass 是自定义的输入数据类型样例类
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
datastream.addSink(new RedisSink[CaseClass](conf,new SinkRedisMapper))
// 定义一个redisMapper
class SinkRedisMapper extends RedisMapper[CaseClass]{
// 定义写入redis的命令,HSET 表名 key value
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"t_temp")
}
// 指定key
override def getKeyFromData(t: CaseClass): String = t.key
// 指定value
override def getValueFromData(t: CaseClass): String = t.value
}