1、在官方文档中查看自定义
2、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.gujm</groupId>
<artifactId>FlumeCustom</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
</project>
3、编写自己的sink
package top.gujm.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class LoggerSink extends AbstractSink implements Configurable {
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = null;
Transaction tran = null;
try {
channel = getChannel();
tran = channel.getTransaction();
tran.begin();
for (int i = 0; i < 100; i++){
Event event = channel.take();
if(event == null) {
status = Status.BACKOFF;
break;
}else {
//header
Map<String, String> map = new HashMap<String, String>();
map.put("abc", "123");
map.put("abd", "125");
map.put("abe", "122");
event.setHeaders(map);
Map<String, String> headers = event.getHeaders();
System.out.println("{");
if(headers.size() > 0){
System.out.println("\theader: {");
Set<Map.Entry<String, String>> entries = headers.entrySet();
int j = 0;
for (Map.Entry<String, String> entry : entries){
System.out.print((j==0)? "" : ",");
System.out.println();
System.out.print("\t\t"+entry.getKey()+": "+ entry.getValue()+"");
}
System.out.println();
System.out.println("\t},");
}
//body
String body = new String(event.getBody());
System.out.println("\tbody: "+body+"");
System.out.println("}");
}
}
tran.commit();
}catch (Exception e){
e.printStackTrace();
if(tran != null)
tran.commit();
}finally {
if(tran != null)
tran.close();
}
return status;
}
public void configure(Context context) {
}
}
4、 编写自己的source
package top.gujm.flume;
import jdk.nashorn.internal.runtime.linker.InvokeByName;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractPollableSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class LoggerSource extends AbstractPollableSource implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(LoggerSource.class);
private int count = 0;
private int sleep = 0;
public LoggerSource(){
}
protected Status doProcess() throws EventDeliveryException {
logger.info("开始发送event,睡眠时间:"+sleep);
Status status = Status.READY;
count++;
try {
Event event = new SimpleEvent();
Map<String, String> map = new HashMap<String, String>();
map.put("event_number", count+"");
map.put("timestamp", new Date().getTime()+"");
map.put("hostname", InetAddress.getLocalHost().getHostAddress());
event.setHeaders(map);
String body = "event: " + count;
event.setBody(body.getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(sleep);
}catch (Exception e){
e.printStackTrace();
status = Status.BACKOFF;
}
return status;
}
protected void doConfigure(Context context) throws FlumeException {
sleep = context.getInteger(Contracts.SLEEP, Contracts.SLEEP_DEFAULT);
}
protected void doStart() throws FlumeException {
}
protected void doStop() throws FlumeException {
}
private static class Contracts{
//睡眠时间
public static String SLEEP = "sleep";
public static int SLEEP_DEFAULT = 1000;
}
}
5、打包测试,使用maven
将打包的jar包上传至flume的lib文件夹即可
6、编写conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = top.gujm.flume.LoggerSource
a1.sources.r1.sleep = 2000
a1.sources.r1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = top.gujm.flume.LoggerSink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1