因工作需求,研究了下flume插件开发,调通了开发断点环境。
公司使用了CDH5.10.1, flume对应的版本是1.6,文档和源码请参见下面链接。
http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.10.1/index.html
直接给出CDH版本flume 1.6版本的源码下载链接(这里浪费我很多时间,花了一段时间才找到)
https://git-wip-us.apache.org/repos/asf?p=flume.git;a=commit;h=refs/tags/release-1.6.0
第一次编译错误
[ERROR] Failed to execute goal on project flume-hdfs-sink: Could not resolve dependencies for project org.apache.flume.flume-ng-sinks:flume-hdfs-sink:jar:1.5.0: Failure to find org.apache.hadoop:hadoop-test:jar:2.4.0 in http://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of repo1.maven.org has elapsed or updates are forced -> [Help 1]
在flume根目录下的pom.xml中<repositorys>添加
<repository>
<id>p2.jfrog.org</id>
<url>http://p2.jfrog.org/libs-releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
执行mvn clean install后,flume-ng-dist模块生成打包文件。
flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\conf
[只讲解windows系统的方法]
准备flume配置文件
在flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\conf文件夹新建文件test-header.conf
配置信息test-header.conf
a1.channels = c1
a1.sources = r1
a1.sinks = k1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = E:\\flumefile
a1.sources.r1.batchSize= 100
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader = true
a1.sources.r1.fileHeaderKey = file_path
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = file_name
## source 拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
#a1.sources.r1.interceptors.i1.regex = cookieid is (.*) and ip is (.*)
a1.sources.r1.interceptors.i1.regex = cookieid is (.*) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
#sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
flume启动方式一:
请准备windows下flume启动bat:
在flume-ng-dist\target\apache-flume-1.6.0-bin\apache-flume-1.6.0-bin\bin文件夹新建文件flume-ng_windows.bat
set CONF_FILE=test-header.conf
::set agent name
set AGENT_NAME=a1
setlocal
for %%i in ("%~dp0..") do set "folder=%%~fi"
set FLUME_HOME="%folder%"
::Dflume.monitoring.type=ganglia
::-Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y
%JAVA_HOME%\bin\java.exe -Xms512m -Xmx512m -Dlog4j.configuration=file:///E:\git\flume-4652849\flume-ng-dist\target\apache-flume-1.6.0-bin1\apache-flume-1.6.0-bin\conf\log4j.properties -Dflume.root.logger=INFO,console -cp %FLUME_HOME%\lib\*;%FLUME_HOME%\plugins.d\KolazzInterceptor\lib\* org.apache.flume.node.Application -f %FLUME_HOME%\conf\%CONF_FILE% -n %AGENT_NAME%
pause
flume启动方式二(推荐):
----------命令启动,开始,在这个flume版本 powershell启动方式有些问题(已解决问题,解决方案见内容),推荐--------
flume 1.6中存在问题,如图
需要先将 apache-flume-1.6.0-bin\bin\flume-ng.ps1 文件中322行如图所示部分替换掉。
替换为
$pluginTmp1 = (@(Get-ChildItem "$plugin\*\lib") -join "\*"";""")
if( "$pluginTmp1" -ne "" ) {
$javaClassPath="$javaClassPath;""" + $pluginTmp1 + "\*"";"
}
$pluginTmp2 = (@(Get-ChildItem "$plugin\*\libext") -join "\*"";""")
if( "$pluginTmp2" -ne "" ) {
$javaClassPath="$javaClassPath;""" + $pluginTmp2 + "\*"";"
}
然后从conf\flume-env.ps1.template复制一个新文件命名为flume-env.ps1 ,文件修改$JAVA_OPTS="-Xmx100m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y"
在bin目录中执行命令:
flume-ng.cmd agent --conf ../conf --conf-file ../conf/test-header.conf --name a1 -property
flume.root.logger=INFO,console
----------命令启动,结束--------
开启flume远程调试
请确认flume-ng_windows.bat文件,远程调试端口8000,
-Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y
添加配置
端口修改为8000
执行flume-ng_windows.bat启动flume
此时命令控制台将暂停在这里,不用担心,请用刚才在IDEA中启动配置启动调试flume项目(先命令启动flume,再启动调试idea项目),
当前讲解用例通过插件来演示。
请先在flume源码中找到RegexExtractorInterceptor.java文件,并在如图所示中打上断点。
由于source是spooldir方式,请将一个内容为cookieid is c_1 and ip is 127.0.0.1的文件丢入E:\flumefile文件夹
此时进入断点:
接下来,请尽情开发flume插件吧。
demo 插件
source日志数据demo:
2018-07-11 13:00:07.122|LOG_INFO |dev14|recv_term|13922222222|{"port":"30011","pro":"808","ver":"2013","buss":"anxin","thread":"140543769519872","connid":"15131","len",220}|7E 02 00 00 CF 01 39 22 22 22 22 87 10 08 00 08 20 00 0C 00 01 02 11 49 C1 06 C6 BB CC 00 00 00 00 00 00 18 07 11 13 00 05 01 04 00 00 00 14 02 02 00 00 03 02 00 00 04 02 00 00 25 04 80 00 00 00 2A 02 00 00 2B 04 00 7A 00 B7 30 01 05 31 01 00 20 02 FF FF 21 02 FF FF 22 02 FF FF 23 02 FF FF 24 04 00 00 00 00 26 04 FF FF FF FF 28 01 00 3A 01 00 3B 04 FF FF FF FF 3F 04 FF FF FF FF 40 04 FF FF FF FF 42 02 00 EF 43 02 FF FF 44 02 FF FF 45 02 FF FF 46 02 FF FF 47 02 FF FF 48 02 FF FF 38 04 00 00 0F 12 37 01 00 E1 30 01 01 FF 02 01 05 03 01 00 04 04 00 00 0F 12 05 04 FF FF FF FF 06 04 FF FF FF FF 08 01 02 09 02 FF FF 0A 02 FF FF 0B 02 FF FF 0C 01 51 0D 01 FF 7B 7E
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kolazz.flume.interceptor;
import com.google.common.base.Charsets;
import org.apache.commons.lang.time.DateUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class KolazzInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(KolazzInterceptor.class);
private String headerName;
private String formatter;
private String dest_formatter;
private final Pattern regex;
/**
* Only {@link KolazzInterceptor.Builder} can build me
*/
protected KolazzInterceptor(Context context) {
headerName = context.getString(Constants.HEADER_NAME, Constants.HEADER_NAME_DFLT);
formatter = context.getString(Constants.FORMATTER, Constants.FORMATTER_DFLT);
dest_formatter = context.getString(Constants.DEST_FORMATTER, Constants.DEST_FORMATTER_DFLT);
regex = Pattern.compile("(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)\\|(.*)");
}
@Override
public void initialize() {
// no-op
}
public static void main(String[] args) {
try {
SimpleDateFormat timeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String time = "2018-7-12 15:22:22.123";
Date dscDt = timeFormater.parse(time);
System.out.println(dscDt);
SimpleDateFormat outformatter = new SimpleDateFormat("yyyyMMddHH");
String dateString = outformatter.format(dscDt);
System.out.println(dateString);
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
try {
Map<String, String> headers = event.getHeaders();
String logTimeStr = "";
Matcher matcher = regex.matcher(
new String(event.getBody(), Charsets.UTF_8));
int count = matcher.groupCount();
if (matcher.matches()) {
if (count > 1) {
logTimeStr = matcher.group(1);
}
}
SimpleDateFormat timeFormater = new SimpleDateFormat(formatter);
Date dscDt = null;
try {
dscDt = timeFormater.parse(logTimeStr);
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
//如时间错误,则使用当前时间
if (dscDt == null) {
dscDt = new Date();
}
SimpleDateFormat formatter = new SimpleDateFormat(dest_formatter);
String destDtStr = formatter.format(dscDt);
headers.put(KolazzInterceptor.Constants.HEADER_NAME, destDtStr);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instances of the TimestampInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private Context context;
@Override
public Interceptor build() {
return new KolazzInterceptor(context);
}
@Override
public void configure(Context context) {
this.context = context;
}
}
public static class Constants {
public static final String HEADER_NAME = "kolazz_dt";
public static final String FORMATTER = "formatter";
public static final String DEST_FORMATTER = "dest_formatter";
public static String HEADER_NAME_DFLT = "kolazz_date";
public static String FORMATTER_DFLT = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String DEST_FORMATTER_DFLT = "yyyyMMdd";
}
}
log日志: