/********************************** 数据去重 **************************************/
//以前没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", "earliest");
//不自动提交偏移量,让flink提交偏移量
properties.setProperty("enable.auto.commit", "false");
KeyedStream<KafkaEvent, Tuple> keyed = env.addSource(new FlinkKafkaConsumer011<String>(
parameterTool.getRequired("input-topic"),
new SimpleStringSchema(),
parameterTool.getProperties()
)).map(new MapFunction<String, KafkaEvent>() {
@Override
public KafkaEvent map(String line) throws Exception {
return KafkaEvent.fromString(line);
}
}).filter(new FilterFunction<KafkaEvent>() {
@Override
public boolean filter(KafkaEvent value) throws Exception {
return value.getTpp()>=360;
}
}).keyBy("si", "ci");
SingleOutputStreamOperator<String> mapstatedata = keyed.process(new ProcessFunction<KafkaEvent, String>() {
private transient ValueState<BloomFilter> bloomState;
//private transient ValueState<Integer> counterState;
@Override
public void open(Configuration parameters) throws Exception {
//定义一个布隆过滤器的状态描述器
ValueStateDescriptor<BloomFilter> bloomStateDescriptor = new ValueStateDescriptor<>("bloomState", BloomFilter.class);
//ListStateDescriptor<BloomFilter> bloomStateDescriptor = new ListStateDescriptor<>("bloomState", BloomFilter.class);
//ValueStateDescriptor<Integer> counterDescriptor = new ValueStateDescriptor<>("counterState", Integer.class);
bloomState = getRuntimeContext().getState(bloomStateDescriptor);
//bloomState = getRuntimeContext().getListState(bloomStateDescriptor);
//counterState = getRuntimeContext().getState(counterDescriptor);
}
@Override
public void processElement(KafkaEvent value, Context context, Collector<String> out) throws Exception {
String tno = value.getTno();
BloomFilter bloomFilter = bloomState.value();
StringBuilder stringBuilder = new StringBuilder();
if (bloomState.value() == null) {
bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 100000);
//counterState.update(0);
}
//返回false代表一定不存在
if (!bloomFilter.mightContain(tno)) {
bloomFilter.put(tno);
bloomState.update(bloomFilter);
//counterState.update(counterState.value() + 1);
//System.out.println("countstate ==> "+counterState.value());
String result = stringBuilder//.append(counterState.value()).append("^|")
.append(value.getDid()).append("^|")
.append(value.getUvs()).toString();
out.collect(result);
}
}
})/*.uid("mapStateUid")*/.setParallelism(1);
/**
* 全局row_num,operator 状态管理
*/
public static class OperatorStateMap extends RichFlatMapFunction<String, Tuple2<Long, String>> implements CheckpointedFunction {
//托管状态
private transient ListState<Long> listState;
//原始状态
//private List<Long> listElements = new CopyOnWriteArrayList<>();
Long listElement = 0L;
Long i = 0L;
@Override
public void flatMap(String value, Collector collector) throws Exception {
i = listElement;
if(i == 0) {
i = i + 1;
collector.collect(new Tuple2<Long, String>(i, i + "|^|" + value));
listElement = i;
}else{
listElement = listElement + 1;
i++;
collector.collect(new Tuple2<Long, String>(listElement, listElement + "|^|" + value));
}
//listElements.clear();
//listElements.add(i);
}
/**
* 进行checkpoint进行快照
*
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
listState.clear();
/*for (Long ele : listElements) {
System.out.println("===============>snapshot存储原始状态"+ele);
listState.add(ele);
}*/
listState.add(listElement);
}
/**
* state的初始状态,包括从故障恢复过来
*
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor listStateDescriptor = new ListStateDescriptor("checkPointedList",
TypeInformation.of(new TypeHint<Long>() {
}));
listState = context.getOperatorStateStore().getListState(listStateDescriptor);
//如果是故障恢复
/*if (context.isRestored()) {
//从托管状态将数据到移动到原始状态
for (Long ele : listState.get()) {
listElements.add(ele);
}
//listState.clear();
}*/
if (context.isRestored()) {
Iterator<Long> iterator = listState.get().iterator();
while (iterator.hasNext()){
Long ele = iterator.next();
System.out.println("===============>"+ele);
//listElements.add(ele);
listElement = ele;
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
//listElements = new CopyOnWriteArrayList<>();
}
}
}