步骤
logback的AppenderBase和UnsynchronizedAppenderBase
先来段logback配置
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>%d [%-5level][%t][%c][%X{tenant}][%X{requestId}] %m%n</Pattern>
</encoder>
</appender>
<appender name="logfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/xxx-%d{yyyy-MM-dd}.log</FileNamePattern>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>[%d{HH:mm:ss:SSS}][%5p][%c:%L] %m%n</pattern>
</layout>
</appender>
<!-- <appender name="kafka" class="com.xxx.util.logback.UnblockedKafkaAppender"> -->
<!-- <needFilter>true</needFilter> -->
<!-- <includingPackage>com.xxx.mirc.redis.dubbo</includingPackage> -->
<!-- </appender> -->
<root level="info">
<!-- <appender-ref ref="kafka"/> -->
<appender-ref ref="logfile"/>
<!-- <appender-ref ref="STDOUT"/> -->
</root>
</configuration>
上述的appender标签就是配置的logback处理类。有使用logback提供的ConsoleAppender,RollingFileAppender。同时也可以自定义扩展appender。
logback提供的抽象处理类。AppenderBase,UnsynchronizedAppenderBase,用来提供扩展支持。分析下源码。
abstract public class UnsynchronizedAppenderBase<E> extends ContextAwareBase implements
Appender<E> {
private ThreadLocal<Boolean> guard = new ThreadLocal<Boolean>();
public void doAppend(E eventObject) {
}
abstract protected void append(E eventObject);
}
abstract public class AppenderBase<E> extends ContextAwareBase implements
Appender<E> {
private boolean guard = false;
public void doAppend(E eventObject) {
}
public synchronized void doAppend(E eventObject);
}
其实这两个类,大多代码都一样。
实现的功能都是记录Status状态,然后检查Appender上的Filter是否满足条件,最后再调用子类的doAppend方法。用到设计模式:模板方法。
但是区别在于Appender的doAppend方法是synchronized的,UnsynchronizedAppenderBase则是用ThreadLocal的方式存储guard状态值。
自定义一个扩展类,实现发送kafka消息:
public class UnblockedKafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent>{
BaseKafkaProducer<LogBackKafkaVo> producer;
private static Set<String> includeSet = new HashSet<String>();
private String includingPackage;
private String kafkaBrokerPath;
private boolean needFilter=true;
public boolean isNeedFilter() {
return needFilter;
}
@Override
protected void append(ILoggingEvent eventObject) {
if (needFilter) {
boolean flag=false;
if(CollectionUtils.isNotEmpty(includeSet)){
for(String regex:includeSet){
if(eventObject.getLoggerName().matches(regex)){
flag=true;
break;
}
}
}
if(!flag)
return;
}
LogBackKafkaVo vo = new LogBackKafkaVo().build(eventObject);
if (producer != null)
try {
producer.sendMsg(vo);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
new Thread(new Runnable() {
@Override
public void run() {
initProducer();
}
}).start();
}
private void initProducer(){
while (!FileReaderUtils.existsFile("kafka.properties")) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (needFilter) {
if (StringUtils.isBlank(includingPackage))
return;
for (String s : includingPackage.split(",")) {
includeSet.add(s+".*");
}
}
producer = new LogBackLoggerProducer();
try {
producer.kafkaProducerConfig=producer.initConfig(kafkaBrokerPath);
producer.setProducer_type(KafkaConstant.ASYNC);
producer.init();
} catch (Exception e) {
e.printStackTrace();
}
}
}
kafka操作/ 生产者为例
定义底层kafka操作类
public class ProducerProxy<T1, T2> {
public ProducerProxy(ProducerConfig producerConfig,int size) {
for(int i=0;i<size;i++){
Producer<T1, T2> producer=new Producer<T1, T2>(producerConfig);
prodMap.put(i, producer);
queue.offer(producer);
}
}
private Map<Integer,Producer<T1, T2>> prodMap=new HashMap<Integer,Producer<T1, T2>>();
private ConcurrentLinkedQueue<Producer<T1, T2>> queue=new ConcurrentLinkedQueue<Producer<T1, T2>>();
public void send(List<KeyedMessage<T1, T2>> messages) {
if (prodMap.isEmpty())
throw new IllegalStateException("prodMap can not be null");
int i = java.util.concurrent.ThreadLocalRandom.current().nextInt(
prodMap.size());
prodMap.get(i).send(messages);
}
public void close() {
for(Producer<T1, T2> prod:prodMap.values())
prod.close();
}
public void send(KeyedMessage<T1, T2> msg) {
if (prodMap.isEmpty())
throw new IllegalStateException("prodMap can not be null");
int i = java.util.concurrent.ThreadLocalRandom.current().nextInt(
prodMap.size());
prodMap.get(i).send(msg);
}
public Producer<T1, T2> pollProducer() {
return queue.poll();
}
}
ProducerProxy意义在于:创建多个producer,调用时,随机分配;
public class AbstractKafkaProducer {
protected static ProducerProxy<String, String> producer;
protected static ProducerProxy<byte[], byte[]> byteProducer;
protected static Map<String,AsyncKafkaMessageProducer> kafkaMessageCacheMap = new HashMap<String,AsyncKafkaMessageProducer>(2);
static Map<String,AbstractKafkaProducer> mapProducer=new HashMap<String,AbstractKafkaProducer>(2);
private static AbstractKafkaProducer abstractKafkaProducer=new AbstractKafkaProducer();
public static AbstractKafkaProducer getInstance(){
return abstractKafkaProducer;
}
protected AbstractKafkaProducer(){
if(!mapProducer.isEmpty())
return;
mapProducer.put(KafkaConstant.STRING, StringKafkaProducer.getInstance());
}
public void setProducer(ProducerProxy<String, String> producer) {
AbstractKafkaProducer.producer = producer;
}
public void setByteProducer(ProducerProxy<byte[], byte[]> byteProducer) {
AbstractKafkaProducer.byteProducer = byteProducer;
}
public void sendMsg(String prodtype,String serializerType,Object msg,String topic,String... key) throws IOException{
mapProducer.get(serializerType).sendMsg(prodtype,msg,topic,key);
}
protected void sendMsg(String prodtype,Object msg,String topic,String... key) throws IOException{
}
}
提供基础kafka操作类。
提供基础kafka操作类
public abstract class BaseKafkaProducer<T> implements KafkaProducer<T> {
public static ProducerProxy<String, String> getProducer() {
return producer;
}
public static ProducerProxy<byte[], byte[]> getByteProducer() {
return byteProducer;
}
protected Properties initProducer(KafkaProducerConfig kafkaProducerConfig,
String... home) {
final Properties props = new Properties();
props.put(KafkaConstant.Producer.metadata_broker_list, kafkaProducerConfig.getMetadata_broker_list());
...
return props;
}
/**
* 发送管理事件
*
* @throws Exception
*/
@Override
public boolean sendMsg(final T t,String... key) throws Exception {
if (t == null) {
return false;
}
try {
resetTopic();
String type = producer_type;
Object o = generateMsg(t);
AbstractKafkaProducer
.getInstance()
.sendMsg(
type,
KafkaConstant.BYTEENCODER
.equals(produce_serilize_class) ? KafkaConstant.BYTE
: KafkaConstant.STRING, o, topic,key);
return true;
} catch (final Exception e) {
logger.error("send msg to jump mq exception:", e);
throw e;
} catch (final Error e) {
logger.error("send msg to jump mq error:", e);
throw e;
}
}
public void init(){}
protected abstract void resetTopic();
protected Object generateMsg(T t) {
return t;
}
}
暴露使用方式
@Configuration
public class DubboKafkaProducerConfiguration {
@Bean(name = "dubboLoggerProducer")
public DubboLoggerProducer dubboLoggerProducer() throws IOException {
BaseKafkaProducer<DubboInvokeDetail> dubboProducer=new DubboLoggerProducer();
try {
dubboProducer.setProducer_type(KafkaConstant.ASYNC);
dubboProducer.init();
KafkaDubboUtil.setLogSender((KafkaProducer<DubboInvokeDetail>) dubboProducer);
} catch (Exception e) {
LOG.error(e.getMessage(),e);
return null;
}
return (DubboLoggerProducer) dubboProducer;
}
}
es操作
写个kafka消费程序,写入es即可。
结果说明
{
"_index":"logback-2017",
"_type":"com.xxx.util.logback.LogBackKafkaVo",
"_id":"hacyp0pdrvtt",
"_version":1,
"_score":1,
"_source":{
"argumentArray":"[]/r/n",
"callerDataArray":"",
"formattedMessage":"不存在memberno[]",
"level":"INFO",
"loggerContextVO":"LoggerContextVO{name='default', propertyMap={HOSTNAME=wx-test}, birthTime=1495017602946}",
"loggerName":"com.xxx.Object",
"shardId":25,
"status":0,
"threadName":"ConsumeMessageThread_10",
"timeStamp":"2017-05-17 18:52:26"
}
}