最近由于项目需要开始捣鼓项目组日志系统,日志系统中日志打印采用的是log4j2,同时需要支持公司日志采集需求,所以也会用到公司扩展的appender。之前做过的项目有有日志打印的功能也是采用log4j2组件。这篇文章主要是对这个组件的各个属性进行下总结,同时介绍下如何基于log4j2的Appender自定义Appender,满足项目需求。后边也会陆续的对log4j2的原理、性能等进行分析......
虽然log4j2自带了很多种Appender,通过这些Appender可以将日志输出到文件、DB、ES,但是有时候需要自定义Appender来满足业务需求,将日志输出到指定的位置。例如log4j2就没有为Scribe添加appender。
配置文件节点解析
(1). 根节点Configuration有两个属性:status和monitorinterval,有两个子节点:Appenders和Loggers(表明可以定义多个Appender和Logger)。
status用来指定log4j本身的打印日志的级别。monitorinterval用于指定log4j自动重新配置的监测间隔时间,单位是s,最小是5s,例如:monitorInterval=”600” 指log4j2每隔600秒(10分钟),自动监控该配置文件是否有变化,如果变化,则自动根据文件内容重新配置。
(2).Appenders节点,常见的有三种子节点:Console、RollingFile、File。
Console节点用来定义输出到控制台的Appender。
name:指定Appender的名字。
target:SYSTEM_OUT 或 SYSTEM_ERR,一般只设置默认:SYSTEM_OUT。
PatternLayout:输出格式,不设置默认为:%m%n。
如:
RollingFile节点用来定义超过指定大小自动删除旧的创建新的的Appender。
name:指定Appender的名字。
fileName:指定输出日志的目的文件带全路径的文件名。
PatternLayout:输出格式,不设置默认为:%m%n。
filePattern:指定新建日志文件的名称格式。
Policies:指定滚动日志的策略,就是什么时候进行新建日志文件输出日志。
TimeBasedTriggeringPolicy:Policies子节点,基于时间的滚动策略,interval属性用来指定多久滚动一次,>默认是1 hour。modulate=true用来调整时间:比如现在是早上3am,interval是4,那么第一次滚动是在>4am,接着是8am,12am...而不是7am。
SizeBasedTriggeringPolicy:Policies子节点,基于指定文件大小的滚动策略,size属性用来定义每个日志文>件的大小。
DefaultRolloverStrategy:用来指定同一个文件夹下最多有几个日志文件时开始删除最旧的,创建新的(通过>max属性)。
如:
File节点用来定义输出到指定位置的文件的Appender。
name:指定Appender的名字。fileName:指定输出日志的目的文件带全路径的文件名。
PatternLayout:输出格式,不设置默认为:%m%n。
相对RollingFile来说File在项目中用的比较少。
(3).Loggers节点,常见的有两种:Root和Logger。Root节点用来指定项目的根日志,如果没有单独指定Logger,那么就会默认使用该Root日志输出
level:日志输出级别,共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal >< OFF。
AppenderRef:Root的子节点,用来指定该日志输出到哪个Appender。
Logger节点用来单独指定日志的形式,比如要为指定包下的class指定不同的日志级别等。
level:日志输出级别,共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF。
name:用来指定该Logger所适用的类或者类所在的包全路径,继承自Root节点。
AppenderRef:Logger的子节点,用来指定该日志输出到哪个Appender,如果没有指定,就会默认继承自Root。如果指定了,那么会在指定的这个Appender和Root的Appender中都会输出,此时我们可以设置Logger的additivity="false"只在自定义的Appender中进行输出。
(4).关于日志level
共有8个级别,按照从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF。All:最低等级的,用于打开所有日志记录.
Trace:是追踪,就是程序推进以下,你就可以写个trace输出,所以trace应该会特别多,不过没关系,我们>可以设置最低日志级别不让他输出。
Debug:指出细粒度信息事件对调试应用程序是非常有帮助的。
Info:消息在粗粒度级别上突出强调应用程序的运行过程。
Warn:输出警告及warn以下级别的日志。
Error:输出错误信息日志。
Fatal:输出每个严重的错误事件将会导致应用程序的退出的日志。
OFF:最高等级的,用于关闭所有日志记录。
程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。
<EsAppender name="elasticsearchAppender"
cluster="mycluster"
host="127.0.0.1"
port="9300"
index="${sys:service.type}"
type="${sys:service.id}"
ignoreExceptions="false"
maxActionsPerBulkRequest="100"
maxConcurrentBulkRequests="1"
maxVolumePerBulkRequest="10m"
flushInterval="5s">
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
</EsAppender>
package com.dav.elasticsearch.utils.log4j2;
import com.dfssi.common.Dates;
import com.dfssi.common.JavaOps;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.message.MapMessage;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@Plugin(name = "EsAppender", category = "Core", elementType = "appender", printObject = true)
public class EsAppender extends AbstractAppender {
private Config config;
private TransportClient client;
private volatile BulkProcessor bulkProcessor;
protected EsAppender(Config config, Filter filter,
Layout<? extends Serializable> layout) {
super(config.name, filter, layout);
this.config = config;
LOGGER.info(String.format("Generated EsAppender: \n\t %s ", config));
initEsClient();
}
private void initEsClient(){
try {
Map<String, String> esConf = config.esConf;
String value = esConf.get("client.transport.sniff");
boolean sniff = false;
esConf.remove("client.transport.sniff");
if("true".equalsIgnoreCase(value))sniff = true;
value = esConf.get("client.transport.ping_timeout");
String pingTimeout = "120s";
esConf.remove("client.transport.ping_timeout");
if(value != null) pingTimeout = value;
Settings.Builder builder = Settings.builder()
.put("client.transport.sniff", sniff)
.put("client.transport.ping_timeout", pingTimeout)
//.put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
.put("cluster.name", config.cluster);
Properties properties = new Properties();
properties.putAll(esConf);
builder.putProperties(esConf, key -> {return key;});
Settings settings = builder.build();
client = new PreBuiltTransportClient(settings);
Set<Map.Entry<String, Integer>> entries = config.hostAndPorts.entries();
for (Map.Entry<String, Integer> entry : entries) {
client.addTransportAddress(
new TransportAddress(InetAddress.getByName(entry.getKey()), entry.getValue()));
}
} catch (Exception e) {
throw new IllegalArgumentException(String.format("构建es客户端失败:\n\t %s", config), e);
}
}
private void initBulkProcessor() {
bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
LOGGER.error(String.format("executionId = %s, FailureMessage = %s",
executionId, response.buildFailureMessage()));
}
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
LOGGER.error(String.format("executionId = %s", executionId), failure);
}
})
// 5k次请求执行一次bulk
.setBulkActions(config.maxActionsPerBulkRequest)
// 1mb的数据刷新一次bulk
.setBulkSize(config.getMaxVolumePerBulkRequest())
//固定60s必须刷新一次
.setFlushInterval(config.getFlushInterval())
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(config.maxConcurrentBulkRequests)
// 设置退避, 100ms后执行, 最大请求3次
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(200), 5))
.build();
}
@Override
public void append(LogEvent event) {
if (event.getLevel().isMoreSpecificThan(config.level)) {
//写入es
}
}
public void writeToEs(LogEvent event){
String index = config.indexCreateRule.getIndexName(config.index, new Date(event.getTimeMillis()));
Map<String, Object> record = Maps.newHashMap();
if (event.getMessage() instanceof MapMessage) {
record.putAll(((MapMessage)event.getMessage()).getData());
} else {
record.put("message", event.getMessage().getFormattedMessage());
}
record.put("name", config.name);
record.put("host", config.publisher);
record.put("ti", event.getTimeMillis());
record.put("", Dates.long2Str(event.getTimeMillis(),"yyyy-MM-dd HH:mm:ss"));
record.put("level", event.getLevel().toString());
record.put("logger", event.getLoggerName());
record.put("loggerFQDN", event.getLoggerFqcn());
if (event.getMarker() !=null ) {
record.put("marker", event.getMarker().toString());
};
/* record.put("thread", ievent.threadName);
if (event.getSource()!=null) {
record.put("stack",event.getSource().toString());
};
if (event.getThrown()!=null) {
record.put("throw",convThrowable(event.getThrown()));
};*/
record.put("context", event.getContextMap());
IndexRequestBuilder d= client.prepareIndex(index, config.type).setSource(record);
}
@PluginFactory
public static synchronized EsAppender createAppender(
@PluginAttribute(value = "name", defaultString = "elasticsearchAppender") String name,
@PluginAttribute(value = "publisher", defaultString = "default") String publisher,
@PluginAttribute(value = "servers", defaultString = "localhost:9300") String servers,
@PluginAttribute(value = "cluster", defaultString = "es") String cluster,
@PluginAttribute(value = "esConf") String esConf,
@PluginAttribute(value = "index", defaultString ="logstash") String index,
@PluginAttribute(value = "indexCreateRule", defaultString = "none") String indexCreateRule,
@PluginAttribute(value = "type", defaultString = "logs") String type,
@PluginAttribute(value = "maxActionsPerBulkRequest", defaultInt = 100) int maxActionsPerBulkRequest,
@PluginAttribute(value = "maxConcurrentBulkRequests", defaultInt = 1) int maxConcurrentBulkRequests,
@PluginAttribute(value = "maxVolumePerBulkRequest", defaultString = "10m") String maxVolumePerBulkRequest,
@PluginAttribute(value = "flushInterval", defaultString = "5s") String flushInterval,
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = false) boolean ignoreExceptions,
@PluginAttribute(value = "level") Level level,
@PluginElement("Filters") Filter filter) {
if(level == null)level = Level.INFO;
Config config = new Config();
config.name = name;
config.publisher = publisher;
config.cluster = cluster;
config.index = index;
config.indexCreateRule = IndexCreateRule.getRule(indexCreateRule);
config.type = type;
config.maxActionsPerBulkRequest = maxActionsPerBulkRequest;
config.maxConcurrentBulkRequests = maxConcurrentBulkRequests;
config.maxVolumePerBulkRequest = maxVolumePerBulkRequest;
config.flushInterval = flushInterval;
config.ignoreExceptions = ignoreExceptions;
config.level = level;
config.hostAndPorts = HashMultimap.create();
String[] split = servers.split(",");
try {
String[] kv;
for(String hp : split){
kv = hp.split(":");
if(kv.length == 2){
config.hostAndPorts.put(kv[0], Integer.parseInt(kv[1]));
}
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(String.format("servers:%s格式有误, 示例:host1:port1,host2:port2", servers), e);
}
config.esConf = Maps.newHashMap();
if(esConf != null){
split = esConf.split(",");
String[] kv;
for(String hp : split) {
kv = hp.split(":");
if(kv.length == 2){
config.esConf.put(kv[0], kv[1]);
}
}
}
return new EsAppender(config,filter,null);
}
private static class Config {
String name;
String publisher;
String cluster;
Map<String, String> esConf;
HashMultimap<String, Integer> hostAndPorts;
String index;
IndexCreateRule indexCreateRule;
String type;
int maxActionsPerBulkRequest;
int maxConcurrentBulkRequests;
String maxVolumePerBulkRequest;
String flushInterval;
boolean ignoreExceptions;
Level level;
public ByteSizeValue getMaxVolumePerBulkRequest(){
long kb = JavaOps.byteStringAsKb(maxVolumePerBulkRequest);
return new ByteSizeValue(kb, ByteSizeUnit.KB);
}
public TimeValue getFlushInterval(){
long sec = JavaOps.timeStringAsSec(flushInterval);
return TimeValue.timeValueSeconds(sec);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Config{");
sb.append("name='").append(name).append('\'');
sb.append(", hostAndPorts=").append(hostAndPorts);
sb.append(", cluster='").append(cluster).append('\'');
sb.append(", index='").append(index).append('\'');
sb.append(", indexCreateRule=").append(indexCreateRule);
sb.append(", type='").append(type).append('\'');
sb.append(", maxActionsPerBulkRequest=").append(maxActionsPerBulkRequest);
sb.append(", maxConcurrentBulkRequests=").append(maxConcurrentBulkRequests);
sb.append(", maxVolumePerBulkRequest='").append(maxVolumePerBulkRequest).append('\'');
sb.append(", flushInterval='").append(flushInterval).append('\'');
sb.append(", ignoreExceptions=").append(ignoreExceptions);
sb.append(", level=").append(level);
sb.append('}');
return sb.toString();
}
}
public static void main(String[] args) {
long l = JavaOps.byteStringAsKb("10M");
System.out.println(l);
}
}