Kafka 中使用 Avro 序列化组件(三):Confluent Schema Registry

1. schema 注册表

无论是使用传统的Avro API自定义序列化类和反序列化类还是使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了schema,这会让记录的大小成倍地增加。但是不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。有没有什么方法可以让数据共用一个schema?

我们遵循通用的结构模式并使用"schema注册表"来达到目的。"schema注册表"的原理如下:

把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema 的 ID。负责读取数据的应用程序使用 ID 从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。

schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现。比如本文要讨论的Confluent Schema Registry。

2. 案例说明

现有 schema 文件 user.json,其中内容如下:

{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name",  "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

需求:把这个 schema 中的内容注册到 Confluent Schema Registry 中,Kafka Producer 和 Kafka Consumer 通过识别 Confluent Schema Registry 中的 schema 内容来序列化和反序列化。

3. 实操步骤

(1) 启动 Confluent Schema Registry 服务

  • Confluent 下载地址:https://www.confluent.io/download/,我这里使用confluent-oss-4.1.1-2.11.tar.gz
  • 下载好后上传到服务器,解压即可用
  • 进入confluent-4.1.1/etc/schema-registry/目录下,修改schema-registry.properties文件,内容及注释如下:
# Confluent Schema Registry 服务的访问IP和端口
listeners=http://192.168.42.89:8081

# Kafka集群所使用的zookeeper地址,如果不配置,会使用Confluent内置的Zookeeper地址(localhost:2181)
kafkastore.connection.url=192.168.42.89:2181/kafka-1.1.0-cluster

# Kafka集群的地址(上一个参数和这个参数配置一个就可以了)
# kafkastore.bootstrap.servers=192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094

# 存储 schema 的 topic
kafkastore.topic=_schemas

# 其余保持默认即可
  • 启动 Confluent Schema Registry
[root@confluent confluent-4.1.1]# bin/schema-registry-start etc/schema-registry/schema-registry.properties
# 省略一些内容......
[2018-06-22 16:10:26,442] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

(2) 注册 User 的 schema 注册到对应的 topic 下

  • 首先把原来的 schema 文件加上 "schema" 标记
{
    "schema": "{
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "id", "type": "int"},
            {"name": "name",  "type": "string"},
            {"name": "age", "type": "int"}
        ]
    }"
}
  • 部分"需要转义:
{
    "schema": "{
        \"type\": \"record\",
        \"name\": \"User\",
        \"fields\": [
            {\"name\": \"id\", \"type\": \"int\"},
            {\"name\": \"name\",  \"type\": \"string\"},
            {\"name\": \"age\", \"type\": \"int\"}
        ]
    }"
}
  • 注册 schema 的命令如下
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '' \ 
http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions

说明:
<1> ''之间需要填写schema字符串
<2> 我用来测试的 topic 为 dev3-yangyunhe-topic001,而且我只对 Kafka 的 value 进行 avro 的序列化,所以注册的地址为http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
<3> http://192.168.42.89:8081需要根据自己的配置进行修改

  • 把转义后 schema 填充到 --data ''的两个单引号中
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
http://192.168.42.89:8081/subjects/dev3-yangyunhe-topic001-value/versions
  • 注册成功会返回这个 schema 的 ID
{"id":102}

(3) 在 maven 工程中引入 Confluent Schema Registry 相关的 jar 包

这些 jar 包在 maven 仓库中下载不到,需要自己手动添加到集群中,confluent-4.1.1 解压后,其 share/java/目录下有 confluent 各个组件的 jar 包:

我们需要 confluent-common 目录下的common-config-4.1.1.jarcommon-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jarkafka-avro-serializer-4.1.1.jar,关于如何添加本地的 jar 包到 java 工程中,本文不再赘述。

(4) Kafka Producer 发送数据

package com.bonc.rdpe.kafka110.producer;

import java.util.Properties;
import java.util.Random;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * @Title ConfluentProducer.java 
 * @Description 使用Confluent实现的Schema Registry服务来发送Avro序列化后的对象
 * @Author YangYunhe
 * @Date 2018-06-25 10:49:19
 */
public class ConfluentProducer {
    
    public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " + 
            "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " + 
            "{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
    
    public static void main(String[] args) throws Exception {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 使用Confluent实现的KafkaAvroSerializer
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://192.168.42.89:8081");

        Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
        
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        
        Random rand = new Random();
        int id = 0;

        while(id < 100) {
            id++;
            String name = "name" + id;
            int age = rand.nextInt(40) + 1;
            GenericRecord user = new GenericData.Record(schema);
            user.put("id", id);
            user.put("name", name);
            user.put("age", age);
            
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("dev3-yangyunhe-topic001", user);
            
            producer.send(record);
            Thread.sleep(1000);
        }

        producer.close();

    }

}

(5) Kafka Consumer 消费数据

package com.bonc.rdpe.kafka110.consumer;

import java.util.Collections;
import java.util.Properties;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * @Title ConfluentConsumer.java
 * @Description 使用Confluent实现的Schema Registry服务来消费Avro序列化后的对象
 * @Author YangYunhe
 * @Date 2018-06-25 11:42:21
 */
public class ConfluentConsumer {

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("group.id", "dev3-yangyunhe-group001");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://192.168.42.89:8081");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                            + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

(6) 测试结果

Kafka Consumer 的控制台输出内容如下:

value = [user.id = 1, user.name = name1, user.age = 20], partition = 1, offset = 696
value = [user.id = 2, user.name = name2, user.age = 27], partition = 0, offset = 696
value = [user.id = 3, user.name = name3, user.age = 35], partition = 2, offset = 695
value = [user.id = 4, user.name = name4, user.age = 7], partition = 1, offset = 697
value = [user.id = 5, user.name = name5, user.age = 34], partition = 0, offset = 697

......
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,701评论 13 425
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,816评论 8 167
  • 一位美女CEO,写的一篇三十感言。句句名言,虽然是写给女生的,但是,男生也更应该看看。 事业篇 1无论是打工还是创...
    空心菜_b963阅读 378评论 1 4
  • 一直以为,自己是一个很谨慎的人,之前的工作,无论大事小事都会做的点滴不漏,直到自己认为没有缺陷,才会罢休。 ...
    小博么尼阅读 290评论 0 0