简介
flink提供了专门操作redis的Redis Sink
依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
类
Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:
类 | 场景 | 备注 |
---|---|---|
FlinkJedisPoolConfig | 单Redis服务器 | 适用于本地、测试场景 |
FlinkJedisClusterConfig | Redis集群 | |
FlinkJedisSentinelConfig | Redis哨兵 |
使用
Redis Sink 核心类是 RedisMappe 是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
RedisMapper
public interface RedisMapper<T> extends Function, Serializable {
/**
* 设置使用的redis数据结构类型,和key的名词
* 通过RedisCommand设置数据结构类型
* Returns descriptor which defines data type.
*
* @return data type descriptor
*/
RedisCommandDescription getCommandDescription();
/**
* 设置value中的键值对 key的值
* Extracts key from data.
*
* @param data source data
* @return key
*/
String getKeyFromData(T data);
/**
* 设置value中的键值对 value的值
* Extracts value from data.
*
* @param data source data
* @return value
*/
String getValueFromData(T data);
}
RedisCommand
使用RedisCommand设置数据结构类型时和redis结构对应关系。
Data Type | Redis Command [Sink] |
---|---|
HASH | HSET |
LIST | RPUSH, LPUSH |
SET | SADD |
PUBSUB | PUBLISH |
STRING | SET |
HYPER_LOG_LOG | PFADD |
SORTED_SET | ZADD |
SORTED_SET | ZREM |
Demo
package com.yljphp.demo.sink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.StringUtils
object RedisBasicApp extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.socketTextStream("localhost", 9000)
val data: DataStream[(String, String)] = text.filter(!StringUtils.isNullOrWhitespaceOnly(_))
.flatMap(_.split(","))
.map(("ylj_test_1", _))
.map(x=>{
println(x)
x
})
val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
val redisSink = new RedisSink[(String,String)](config,new MyRedisMapper)
data.addSink(redisSink)
env.execute("sink_demo")
}
class MyRedisMapper extends RedisMapper[(String, String)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SADD, "ylj_test_2")
}
override def getKeyFromData(data: (String, String)): String = data._1
override def getValueFromData(data: (String, String)): String = data._2
}