为Kafka的每条消息加上MessageID

由于一些因素限制(无法粘帖代码,都是我手敲的),代码里没有异常处理,自己实现时请注意异常处理

简介

之前提过我在公司内部做的是一个消息平台,为各个应用提供消息服务。前端时间有个领导给了个需求,能不能自动为所有的消息加上一个消息ID的属性,可以全局唯一定义一个消息。

听到时候的第一个反应是,消息体的序列化反序列化方法都可以自定义了,为什么不在自己的消息体里加MessageID?

领导的说法是因为这样所有客户端在使用的时候还要修改自己的消息体,多不方便啊

EMMMMMM。。好吧,你是领导。。。

然后我转念一想,在不破坏消息体的前提下,给每个消息添加一个消息ID,感觉很有趣啊,于是报着干脆玩一下的心理预期,开始干活。

基本思路

在Kafka客户端中,会根据用户选择的序列化方式实例化序列化器(Serializer)和反序列化器(Deserializer),要实现这个功能,基本思路是在用户定义的序列化方式上包上一层自己的序列化方法。

最终实现的效果是:

//Producer Demo
public class ProducerDemo{
    @Test
    public void sendTest(){
        KafkaConfigProperties prop = new KafkaConfigProperties("producer.properties");
        ABCProducer<String,ABCMessage<String>> producer = new ABCProducer<>(prop);
        ProducerRecord<String,ABCMessage<String>> record = new ProducerRecord<String,ABCMessage<String>>("testTopic",null,new ABCMessage<String>("hello world!"));
        producer.send(record);
    }
}

//Consumer Demo
public class ConsumerDemo{
    @Test
    public void receiveTest(){
        KafkaConfigProperties prop = new KafkaConfigProperties("consumer.properties");
        ABCConsumer<String,ABCMessage<String>> consumer = new ABCConsumer<>(prop);
        consumer.subscribe(Collections.singletonList("testTopic"));
        
        ConsumerRecords<String,ABCMessage<String>> records = consumer.poll(10000);
        for (ConsumerRecord<String,ABCMessage<String>> record:records){
            
            System.out.println(record.getMessageId()+" "+record.getvalue());
        }
        
    }
}

而配置的配置文件仍然可以是原生的那些

bootstrap.servers=192.168.0.1:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all

究其原因,就是在通过对原生接口的修改,侵入了原生的kafka-client的api,本文只实现了在用户序列化方式外面包上一层,其实还可以实现很多其他的能力。

序列化位置的选择

对一个kafka消息来说,ProducerRecord里写死了很多属性:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final K key;
    private final V value;
    private final Long timestamp;
}

这些类型都是private final的,其中topic,partition都是用户指定的,不能修改,key直接决定了patition,可能有业务含义,timestamp和value是可以稍微动一下的,但是timestamp是long类型,不好修改,最终message ID还是要加在value上。

value这个类型是用户指定的,且指定了序列化方法,在发送的时候发送的都是byte[]类型,所以考虑在用户指定的序列化器做完序列化以后,在前面加上自己的序列后的messageID,重新组合成一个新的byte[]再发往kafka。

整个过程是这样的:


侵入序列化方法

设计思路

我们需要自定义properties(KafkaConfigProperties),自定义producer(ABCProducer),自定义一个新的Message类型(ABCMessage),自定义一个序列化和反序列化方式(ABCMessageSerializer,ABCMessageDeserializer)。

每个组件的用途是这样的:

  1. KafkaConfigProperties :继承Properties,用于从用户给的properties中进行value.serializer的修改,此外还能加上一些强制鉴权的能力、参数校验等等。。
  2. ABCProducer:继承KafkaProducer,用于在父类KafkaProducer初始化以后,将用户定义的序列化器传入ABCMessageSerializer
  3. ABCMessage:包装上MessageID,给生产者和消费者对等的消息发送和接收的概念
  4. ABCMessageSerializer:对数据进行拆分,并将消息的message ID进行基本序列化,将用户的value根据用户的序列化方式进行序列化。

这样设计的原因是,kafka的客户端封装得太紧密,所以找了半天没找到漏洞,最后只能通过反射来破坏KafkaProducer中的valueSerializer的private修饰,从而在子类中对其进行方法调用。

实现代码

KafkaConfigProperties

根据配置文件,判断是否需要启动这个功能。如果启动,就更改一下value.serializer配置或者value.deserializer配置

public class KafkaConfigProperties extends Properties {
    private boolean getBooleanProperties(String key, String defaultValue){
        return Boolean.valueOf(this.getproperty(key,defaultValue));
    }
    
    public KafkaConfigProperties(Properties prop){
        super();
        this.putAll(prop);
        configMessageId();
    }
    
    private boolean messageIdEnable;
    
    public boolean isMessageIdEnable() {
        return messageIdEnable;
    }
    
    private void configMessageId() {
        //默认不使用messageID,需要显式配置
         messageIdEnable = getBooleanProperties("messageid.enable","false");
        if (messageIdEnable){
            if (this.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)){
                this.put("user.value.serializer",this.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
                this.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"com.abc.ABCMessageSerializer");
            }
            if (this.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)){ 
                this.put("user.value.deserializer",this.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
                this.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"com.abc.ABCMessageDeserializer");
                
            }
        }
        
    }
}

ABCMessage

每次Mesasge的生成都会通过MessageUtils来自动生成一个messageId,建议固定长度,这里暂时用UUID,36位。

public class ABCMessage<V> {
    private String messageId;
    private V value;
    public ABCMessage(String messageId, V value) {
        this.messageId = messageId;
        this.value = value;
    }
    public ABCMessage(V value) {
        this(MessageUtils.generateMessageId(),value);
    }
    public String getMessageId() {
        return messageId;
    }
    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }
    public V getValue() {
        return value;
    }
    public void setValue(V value) {
        this.value = value;
    }
}

ABCProducer

这里用到了反射,将父类中的valueSeriazlier获取到,然后执行里面的方法

public class ABCProducer<K,V> extends KafkaProducer<K,V>{
    public ABCProducer(KafkaConfigProperties prop) {
        super(prop);
        if (prop.isMessageIdEnable()){
            Field field = this.getClass().getSuperClass().getDeclareField("valueSerializer");
            field.setAccessible(true);
            ABCMessageSerializer serializer = (ABCMessageSerializer) field.get(this);
            serializer.setUserSerializer(prop.get("user.value.serializer").toString());
        }
     }
}

ABCMessageSerializer

public class ABCMessageSerializer implements Serializer {
    private Serializer valueSerializer;
    private Map configs;
    
    @Override
    public void configure(Map configs,boolean isKey){
        this.configs = configs;
    }
    
    @Override
    public byte[] seriazlize(String topic, Object data){
        if (data instanceof ABCMessage){
            byte sep = 0;
            ABCMessage message = (ABCMessage) data;
            byte[] joinByte = EncryptUtils.join(sep,message.getMessageId().getBytes("UTF-8"),
            valueSerializer.serialize(topic,message.getValue()));
            return joinByte;
        }
    } 
    
    @Override
    public void close(){}
    
    public void setUserSerializer(String userValueSerializer){
        HashMap<String,Object> config = new HashMap<>();
        config.put("value.serializer",Class.forName(userValueSerializer));
        ABCMessageConfig config = new ABCMessageConfig(configProp);
        this.valueSerializer = config.getConfiguredInstance("value.serializer",Serializer.class);
        this.valueSerializer.configure(this.configs,false);
    }
}

这段代码里有几个需要注意的地方:

  1. serialize方法里,进行了一个messageId的序列化操作,message.getMessageId().getBytes("UTF-8"),建议加上UTF-8;
  2. EncryptUtils.join这个方法是我前文中对kafka的鉴权模式进行自己修改的时候用到的,作用是字符串拼接。拼接的两个字符串之间加上分隔符,也就是sep,使用的是0这个byte,不拼接也行其实;
  3. setUserSerializer方法中用到了ABCMessageConfig,这个Config继承自AbstractConfig这个抽象类,仅仅只是继承,没有任何的重写。主要用途是模仿ProducerConfig和ConsumerConfig中对valuerSeriazlier的实例化方式。这里就不上代码了。
  4. 记得构造好以后的valueSerializer需要调用一下configure方法。

我在一开始的时候想在configure方法里加上指定用户的value.serializer,这样就可以免除后面setuserSerializer这个方法的麻烦,但是发现传入configure方法的configs经过了一层过滤(ProducerConfig构造后过滤了),我们在properties中加入的user.value.serializer无法被识别。所以只好用后续添加的方式。

还能做什么

在这个方法里,破坏了valueSerializer的private修饰,我们还能通过破坏其它的修饰来更改KafaProducer的方法,这样修改而不是直接修改KafkaProduer源码的好处是可以直接升级Kafka客户端的API而不需要重新编译新版本的kafka-clients源码,升级更方便。

下一步可以试试让用户加上自定义的messageId,但是这样的话不如让用户自己在value里定义了吧。。。

╮(╯╰)╭

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容