由于要实时读取redis的AOF文件,但是flume的taildir source在监控文件的时候,如果文件的inode变化了,那么会出现重复读取数据的情况,这里可以通过修改flume taildir源码解决,只针对读一个文件的情况。
- 去flume官网下载flume源码下载
- 解压后在idea中打开如下
配置好maven,到flume-ng-source中找到ReliableTaildirEventReader
- 找到updateTailFiles方法
/**
* Update tailFiles mapping if a new file is created or appends are detected
* to the existing file.
*/
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
pass
...
...
for (TaildirMatcher taildir : taildirCache) {
long inode = getInode(f);
TailFile tf = tailFiles.get(inode);
//判断是否是新文件,inode或者文件名不同就认为是新文件
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;
System.out.println(tf);
if (tf != null) {
inode = tf.getInode();
}
//找到这行,startPos是读取文件的位置,当有新文件时会从0开始读
//tf = openFile(f, headers, inode, startPos);
//改成⤵️,f.length()是此时读到的位置
tf = openFile(f, headers, inode, f.length());
} else {
继续找到TaildirSource类
private String toPosInfoJson() {
@SuppressWarnings("rawtypes")
List<Map> posInfos = Lists.newArrayList();
for (Long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
//这里会将新的inode写到位置文件中
//posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath()));
//改成⤵️,inode=0
posInfos.add(ImmutableMap.of("inode", 0, "pos", tf.getPos(), "file", tf.getPath()));
}
return new Gson().toJson(posInfos);
}
右侧找到package打包,或者进入到项目目录用mvn package打包,复制target中的flume-taildir-source-1.8.0.jar到flume中的lib下即可,这样即使文件的inode变化,也可以继续读
另外,我是把taildir这个包又复制了一份,这样在flume的配置中a1.sources.r1.type = org.apache.flume.source.taildir2.TaildirSource
直接指定修改后的类名,这样不会影响原来的TAILDIR