Kafka Stream简单示例(四)---定义更通用的Serde

本篇是在《Kafka Stream简单示例(一)》《Kafka Stream简单示例(二)---聚合 Aggregation--统计总和》 以及《 Kafka Stream简单示例(三)---自定义Serde》基础上成文的,建议先阅读前三篇,以便清楚上下文关系需求背景。

第三篇 《Kafka Stream简单示例(三)---自定义Serde》中,我们自定义了Statistic类的Serializer和Deserializer。现实中我们可能需要多个类都支持序列化和反序列,能否有泛型的Serializer和Deserializer,直接放入自己的类就可以完成工作?答案是,如果你的类,是POJO类型的,使用泛型JsonPOJOSerializer和JsonPOJODeserializer就可以。

注意:示例中的代码只是展示流程,非生产代码,仅供参考,由此导致的问题本人概不负责。

官方文档在这里,我用是kafka 1.0. 所以连接也是1.0版本的文档。 http://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html

项目需求

统计一分钟内(固定时间窗口Tumbling Window)内温度的总和与平均值。类似的还有,最大值,最小值。

主要流程和代码

完整的代码在这里,欢迎加星和fork。 谢谢!

一个结果中必须同时含有总和与平均值,于是我们设计一个简单数据结构

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Statistics {
    private Long avg;
    private Long sum;
    private Long count;
}

细心的人会发现本篇中的Statistic比《 Kafka Stream简单示例(三)---自定义Serde》中的Statistic多一个@NoArgsConstructor注解,这是因为我们后面使用反序列化是需要生成Statistics对象(使用默认的无参构造函数生成)。 因此需要添加@NoArgsConstructor注解。

根据Serdes的要求,我们必须提供对应的Serializer和Deserializer。


SerdesClass.png

我们需要实现JsonPOJOSerializer和JsonPOJODeserializer。仍然才考LongSerializer和LongDeserializer的实现, 我们实现了StatisticsSerializer和StatisticsDeserializer。
首先是序列化实现JsonPOJOSerializer

package com.yq.generic;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

/**
 * 这个是官方例子的copy, 版权归官方。copy到本地是为了让我的例子也运行起来
 * https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
 * className: JsonPOJOSerializer
 *
 */


public class JsonPOJOSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * Default constructor needed by Kafka
     */
    public JsonPOJOSerializer() {
    }

    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null)
            return null;

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error serializing JSON message", e);
        }
    }

    @Override
    public void close() {
    }

}


其次是反序列化实现JsonPOJODeserializer

package com.yq.generic;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

/**
 * 这个是官方例子的copy, 版权归官方。copy到本地是为了让我的例子也运行起来
 * https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
 * className: JsonPOJOSerializer
 *
 */
public class JsonPOJODeserializer<T> implements Deserializer<T> {
    private ObjectMapper objectMapper = new ObjectMapper();

    private Class<T> tClass;

    /**
     * Default constructor needed by Kafka
     */
    public JsonPOJODeserializer() {
    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
        tClass = (Class<T>) props.get("JsonPOJOClass");
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        if (bytes == null)
            return null;

        T data;
        try {
            data = objectMapper.readValue(bytes, tClass);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    @Override
    public void close() {

    }
}


最后是我们的主流程。
第一步,我们需要先定义

final Serializer<Statistics> statisticsSerializer = new JsonPOJOSerializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsSerializer.configure(serdeProps, false);

        final Deserializer<Statistics> statisticsDeserializer = new JsonPOJODeserializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsDeserializer.configure(serdeProps, false);

        final Serde<Statistics> statisticsSerde = Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);

第二步。 就像SerDe内置的Serdes.Long()或者 Serdes.String(), 可以直接使用statisticsSerde。

KTable的格式是 KTable<Windowed<String>, Statistics>。 aggregate函数的初始值和返回都是Statistics类型, 结果存储的格式Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
.withValueSerde(statisticsSerde) , 也是Statistics类型。

package com.yq.generic;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yq.customized.Statistics;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.WindowStore;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 *  http://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html
 * 统计60秒内,温度值的最大值  topic中的消息格式为数字,30, 21或者{"temp":19, "humidity": 25}
 */
public class TemperatureAvgGenericSerDeDemo {
    private static final int TEMPERATURE_WINDOW_SIZE = 60;

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temp-avg");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("iot-temp");

        Map<String, Object> serdeProps = new HashMap<>();

        final Serializer<Statistics> statisticsSerializer = new JsonPOJOSerializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsSerializer.configure(serdeProps, false);

        final Deserializer<Statistics> statisticsDeserializer = new JsonPOJODeserializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsDeserializer.configure(serdeProps, false);

        final Serde<Statistics> statisticsSerde = Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);

        KTable<Windowed<String>, Statistics> max = source
                .selectKey(new KeyValueMapper<String, String, String>() {
                    @Override
                    public String apply(String key, String value) {
                        return "stat";
                    }
                })
                .groupByKey()
                .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
                .aggregate(
                        new Initializer<Statistics>() {
                            @Override
                            public Statistics apply() {
                                Statistics avgAndSum = new Statistics(0L,0L,0L);
                                return avgAndSum;
                            }
                        },
                        new Aggregator<String, String, Statistics>() {
                            @Override
                            public Statistics apply(String aggKey, String newValue, Statistics aggValue) {
                                //topic中的消息格式为{"temp":19, "humidity": 25}
                                System.out.println("aggKey:" + aggKey + ",  newValue:" + newValue + ", aggKey:" + aggValue);
                                Long newValueLong = null;
                                try {
                                    JSONObject json = JSON.parseObject(newValue);
                                    newValueLong = json.getLong("temp");
                                }
                                catch (ClassCastException ex) {
                                    try {
                                        newValueLong = Long.valueOf(newValue);
                                    }
                                     catch (NumberFormatException e) {
                                         System.out.println("Exception:" + e.getMessage());
                                         //异常返回原值
                                         return aggValue;
                                    }
                                }
                                catch (Exception e) {
                                    System.out.println("Exception:" + e.getMessage());
                                    //异常返回原值
                                    return aggValue;
                                }


                                aggValue.setCount(aggValue.getCount() + 1);
                                aggValue.setSum(aggValue.getSum() + newValueLong);
                                aggValue.setAvg(aggValue.getSum() / aggValue.getCount());

                                return aggValue;
                            }
                        },
                        Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
                                .withValueSerde(statisticsSerde)
                );

        WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

        max.toStream().to("iot-temp-stat", Produced.with(windowedSerde, statisticsSerde));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);


        Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

效果截图

图中已经有文字说明,结合代码能更清楚了解Kafka Stream。


GenericPOJOSer.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容

  • 他是一个四川小伙,出生于1987年,目前已安家在浙江台州。他与妻子共同经营一家生产户外用品的企业,厂里有工人30人...
    从心处发阅读 174评论 1 2
  • 只要我喜欢这女孩,就没有什么婆媳矛盾。报纸上曾看到一个阿姨替儿子找对象时对记者说了这样的话。一语道破婆媳不好相处的...
    小雏菊_0af6阅读 205评论 0 0
  • 我们是在我本科暑假支教队里认识的,大家都不是最初在队里熟悉起来的伙伴,后来却意外的投缘,三个人有了小团体和友谊。一...
    番茄和圣女果的关系阅读 262评论 0 0
  • 很多时候需要简单的限制一下文本输入框的文本长度.写多了类似的代码之后,想把它封装一下一劳永逸.封装后将一行代码搞定...
    Fsn_soul阅读 393评论 0 0