前言:为了呼应上篇文章总结部分提出的点。这一篇文章决定来个实战Demo。需求就是使用Spooling Directory Source监听一个文件夹下面新增的文件,然后上传到HDFS中,这需要大家先安装好Hadoop。(ps:我的环境是伪分布式,我有个问题请教各位大佬,我namenode,datanode都能通过jps查看。而且也可以使用hadoop client,为什么localhost:50070还是访问不了。网上搜都是重新格式化namenode,但是还是不行。有知道的大佬请帮一下我,先谢)。
一、Spooling Directory Source介绍
Spooling Directory Source通过监听某个目录下的新增文件,并将文件的内容读取出来,实现日志信息的收集。实际使用中会结合log4j进行使用。被传输结束的文件会修改后缀名,添加.completed后缀(可以自定义)。
这里需要注意两点:
- 放入监控目录后,文件不能被修改,否则抛异常;
- 监控目录下不能有子目录;
二、log4j配置
我使用log4j的DailyRollingFileAppender去每分钟生成一个日志到配置的目录下,代码如下:
#输出信息到文件
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
#这个是生成日志文件的目录及文件名
log4j.appender.file.File = /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log
log4j.appender.file.Append = true
#每分钟产生一个日志文件
#当前的文件名是testlog.log,前面分钟产生的文件是这种命名形式testlog.log.2018-08-20-18-16。
log4j.appender.file.DatePattern = '.'yyyy-MM-dd-HH-mm
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = [%-5p] %-d{yyyy-MM-dd HH:mm:ss} %m%n
注意这里日志生成的目录不要和Spooling Directory监控的目录一样。因为产生日志时会不断Append到当前这一分钟的日志文件,我们刚刚还说Spooling Directory监控目录下的文件不能修改。那怎么办呢?
- 可以通过shell脚本定时把文件通过mv 或 cp命令弄到Spooling Directory的监控文件夹下。
But,为了降低学习曲线,防止大家看到shell望而却步,我决定不采用这种方式。只是简单的先让程序生成一些文件,然后直接通过shell一次性把所有文件放进去。
三、Java生成日志
日志的内容(不含log4j中的配置)为:
0a58f82b-ff6f-4feb-abe2-7c6ac9a0c24d####ERH####qhp####6677062
格式为:用户ID--县号--镇号--收入
public class Main {
public static void main(String[] args) throws Exception {
Thread thread = new Thread(new GenerateRecord());
thread.start();
}
}
class GenerateRecord extends Thread {
private final Logger log = Logger.getLogger(GenerateRecord.class);
public void run() {
while (true) {
// 随机产生一个用户uuid
UUID userId = UUID.randomUUID();
System.out.println(userId.toString().length());
// 产生一个随机的用户总资产
int num = (int) (Math.random() * 10000000) + 100000;
// 产生一个随意的县名
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3; i++) {
char a = (char) (Math.random() * (90 - 65) + 65);
sb.append(a);
}
String county = sb.toString();
// 产生一个随机的镇名
StringBuilder sb1 = new StringBuilder();
for (int i = 0; i < 3; i++) {
char a = (char) (Math.random() * (122 - 97) + 97);
sb1.append(a);
}
String town = sb1.toString();
// 生成日志
log.info(userId + "####" + county + "####" + town + "####" + num);
// 停1秒钟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在几分钟后停掉程序,在终端输入cd /Users/jsj/eclipse-workspace/log4j/src/main/java/
查看生成的文件 ls -1
,如下:
四、Flume配置
这部分是大家关注的重点。(不懂的配置可以去官网看)
在flume安装目录的conf/flume.conf
下加入如下代码:
# my application flume configuration
#agent2是我们给agent起的名字
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2
#Spooling Directory
#set source2
#设置type为spooldir,这个值是flume给定的alias
agent2.sources.source2.type=spooldir
#设置监控目录,注意和前面log4j的目录不同
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs
agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false
#set sink2
agent2.sinks.sink2.type=hdfs
agent2.sinks.sink2.hdfs.path=hdfs://localhost:9000/flume
agent2.sinks.sink2.hdfs.fileType=DataStream
agent2.sinks.sink2.hdfs.writeFormat=TEXT
agent2.sinks.sink2.hdfs.rollInterval=60
agent2.sinks.sink2.channel=channel2
#设置存储到HDFS后文件的前缀
agent2.sinks.sink2.hdfs.filePrefix=%Y-%m-%d
#set channel2
#设置内存通道
agent2.channels.channel2.type=memory
agent2.channels.channel2.capacity=10000
agent2.channels.channel2.transactionCapacity=1000
agent2.channels.channel2.keep-alive=30
可以,现在启动flume-ng。首先进到 bin 目录下,输入./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console -n agent2
启动成功如下图:
接下来开另一个终端,将刚才生成的日志文件拷贝到Spooling Directory监控目录下。输入如下命令:
cp /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log* /Users/jsj/eclipse-workspace/logs
此时flume的终端会嗖嗖嗖的刷日志,我截下来几条,主要是打开文件,对正在处理的文件改名为.tmp后缀,上传到HDFS后把HDFS上文件的.tmp删掉,本地的监控目录下文件加.COMPLETED后缀。
这时候我们去HDFS上检查一下:新开个终端输入hadoop fs -ls /flume
。发现生成了比我们文件数多的多的文件,原来只有11个,现在有62个文件。
为什么呢?你是不是
有问题去官网看文档啊。发现在HDFS SINK下有这样一个配置。
意思是说,这个属性你不配默认是10条记录一个文件。那我们看下HDFS上的文件是不是10条记录。输入
hadoop fs -cat /flume/2018-08-20.1534766919112
查看下文件内容,没错就是10条。这下真相大白了。这又体现了看官方文档的重要性。剩下的事情就交给你们了。
五、总结
本文接Flume学习系列(一),实现了一个具体的Demo,练习了flume的配置文件的编写,以及查找问题的方法。