SpringBoot 与 Kafka(一)

Kafka简介

Apache Kafka 是一款开源的分布式和容错的流式信息处理系统,最早是由LinkedIn开发的,后来提交到了Apache软件基金。整个系统是用Scala和Java编写的.
这篇文章会介绍如何用springboot 与kafka做一个简单的收发代码样例,在第二篇中会写一些更为复杂的列子。

Kafka的安装

由于Kafka是分布式的消息系统,这边需要使用Apache Zookeeper来管理分布式的集群,当然Kafka安装包里自带Zookeeper的依赖包,也可以直接从Kafka里启动Zookeeper. 但是我们演示的更为像实际应用场景, 这边我们找两台Linux服务器.

这里我用的是Ubuntu 16.04(ubuntu-xenial)

因为Kafka和Zookeeper都需要Java的运行环境,所以两台机器都需要先安装最新的jdk

apt-get update
apt-get install default-jdk

安装Zookeeper

然后我们在一台机器上安装zookeeper

apt-get install zookeeperd

装完后zookeeper应该默认监听2181端口,查看zookeeper是否运作正常

telnet [zookeeper ip] 2181

输入status , 会显示zookeeper的信息比如下图,说明zookeeper工作正常

安装Kafka

接着我们在另外一台机器上装上Kafka

#下载
wget "http://www-us.apache.org/dist/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz"

#解压到kafka文件夹
mkdir kafka
tar -xvzf kafka_2.12-0.11.0.0.tgz --directory kafka --strip-components 1
# 到解压目录
cd kafka

kafka目录中有

  • /config 所有相关的配置文件
  • /lib Kafka的依赖包
  • /bin Kafka的启动脚本

现在我们需要启动Kafka,由于Kafka是一个分布式的消息系统,为了方便演示这边我们先启动默认一个节点。在Kafka里面每个Topic是可以有多个partitions的,为了方便暂时规定默认每个Topic只有一个partition.

在启动Kafka先在配置文件中修改连接zookeeper的地址, 用vim打开位置文件

vi config/server.properties

找到 zookeeper.connect=localhost:2181
修改为 zookeeper.connect=[zookeeper ip]:2181, 别的配置我们暂时先不动,保存并退出。

启动kafka

bin/kafka-server-start.sh config/server.properties

看到以下类似信息

INFO Registered broker 0 at path /brokers/ids/0 with addresses
INFO [Kafka Server 0], started (kafka.server.KafkaServer)

说明kafka启动成功了

Kafka的运作原理

安装完了zookeeper和kafka后这边介绍一下kafka的一些简单的运作原理。
Kafka中有几个概念比较重要:

  • Producer: 发送消息者
  • Consumer: 消息消费者
  • Consumer Group: 消费者团体
  • Broker: Kafka处理消息的节点,一个集群可以包含多个Brokers
  • Topic : 发送和消费消息主题
  • Partition: 储存Topic消息的分区, 每个Topic可以有多个Partitions
  • Partition Replica: 每个消息分区的副本, 每个Partition可以有多个副本,一般是2-3个

下面看一下Kafka的物理架构


Kafka物理架构
  • Producer是通过连接Broker来发送消息的
  • Producer是向某个Topic来传递送消息的
  • 发送到Topic中的消息是可以给不同Broker处理的
  • Consumer是连接到某个Broker来监听订阅的Topic的
  • Brokers是通过Zookeeper来进行管理的,并互相通知各自的运行情况
  • 如果Producer或Consumer连接到的Broker没有相关的Topic的,那么消息会自动路由到相关的Broker, 下一次Producer或者Consumer会自动记住相关的Broker
  • 在发送消息后,每个消息会依次排列到每个Partition
  • 消息是可以通过配置来决定要在Partition上保留多久
  • 每个消费者可以从不同队列位置来开始消费消息,并且可以重复消费
Kafka逻辑架构
  • 每个Topic可以有多个Partitions
  • 每个Partition是分布在不同Brokers上的
  • 如果一个Topic有多个Partitions, 那么Consumer收到的消息不能保证有序

这边只是对Kafka一个基本收发消息做一个简单描述,在第二篇会有更为具体的原理解释

Kafka使用的代码实现

Producer的创建

这边使用spring IO中的kafka的依赖带的KafkaTemplate

@Component
public class Producer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic , String payload) {
        kafkaTemplate.send(topic, payload);
    }
}

topic如果不存在的话kafka server会自动创建的。

这边需要对KafkaTemplate 进行一些配置因为需要序列化key/value,配置如下

@Configuration
public class KafkaProducerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

其中bootstrapServers 是producer要发送的kafka server地址。

这边要注意的是bootstrapServers必须是domain name(比如 xxx.com:9092), 直接写IP地址的话可能会导致消息发送后没有响应,如果是本地测试可以用localhost:9092, 或者修改hosts文件。

Consumer的创建

创建Consumer

@Component
public class Consumer {

    @KafkaListener(topics = "${kafka.topic.testtopic}")
    public void receive(String payload) {
        System.out.println(payload);
    }
}

当Consumer消费消息时需要反序列化,所以也要进行一些配置如下

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.topic.testtopic}")
    private String topic;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

演示代码

以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-kafka 目录下载并运行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容