Kafka简介
Apache Kafka 是一款开源的分布式和容错的流式信息处理系统,最早是由LinkedIn开发的,后来提交到了Apache软件基金。整个系统是用Scala和Java编写的.
这篇文章会介绍如何用springboot 与kafka做一个简单的收发代码样例,在第二篇中会写一些更为复杂的列子。
Kafka的安装
由于Kafka是分布式的消息系统,这边需要使用Apache Zookeeper来管理分布式的集群,当然Kafka安装包里自带Zookeeper的依赖包,也可以直接从Kafka里启动Zookeeper. 但是我们演示的更为像实际应用场景, 这边我们找两台Linux服务器.
这里我用的是Ubuntu 16.04(ubuntu-xenial)
因为Kafka和Zookeeper都需要Java的运行环境,所以两台机器都需要先安装最新的jdk
apt-get update
apt-get install default-jdk
安装Zookeeper
然后我们在一台机器上安装zookeeper
apt-get install zookeeperd
装完后zookeeper应该默认监听2181端口,查看zookeeper是否运作正常
telnet [zookeeper ip] 2181
输入status , 会显示zookeeper的信息比如下图,说明zookeeper工作正常
安装Kafka
接着我们在另外一台机器上装上Kafka
#下载
wget "http://www-us.apache.org/dist/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz"
#解压到kafka文件夹
mkdir kafka
tar -xvzf kafka_2.12-0.11.0.0.tgz --directory kafka --strip-components 1
# 到解压目录
cd kafka
kafka目录中有
- /config 所有相关的配置文件
- /lib Kafka的依赖包
- /bin Kafka的启动脚本
现在我们需要启动Kafka,由于Kafka是一个分布式的消息系统,为了方便演示这边我们先启动默认一个节点。在Kafka里面每个Topic是可以有多个partitions的,为了方便暂时规定默认每个Topic只有一个partition.
在启动Kafka先在配置文件中修改连接zookeeper的地址, 用vim打开位置文件
vi config/server.properties
找到 zookeeper.connect=localhost:2181
修改为 zookeeper.connect=[zookeeper ip]:2181, 别的配置我们暂时先不动,保存并退出。
启动kafka
bin/kafka-server-start.sh config/server.properties
看到以下类似信息
INFO Registered broker 0 at path /brokers/ids/0 with addresses
INFO [Kafka Server 0], started (kafka.server.KafkaServer)
说明kafka启动成功了
Kafka的运作原理
安装完了zookeeper和kafka后这边介绍一下kafka的一些简单的运作原理。
Kafka中有几个概念比较重要:
- Producer: 发送消息者
- Consumer: 消息消费者
- Consumer Group: 消费者团体
- Broker: Kafka处理消息的节点,一个集群可以包含多个Brokers
- Topic : 发送和消费消息主题
- Partition: 储存Topic消息的分区, 每个Topic可以有多个Partitions
- Partition Replica: 每个消息分区的副本, 每个Partition可以有多个副本,一般是2-3个
下面看一下Kafka的物理架构
- Producer是通过连接Broker来发送消息的
- Producer是向某个Topic来传递送消息的
- 发送到Topic中的消息是可以给不同Broker处理的
- Consumer是连接到某个Broker来监听订阅的Topic的
- Brokers是通过Zookeeper来进行管理的,并互相通知各自的运行情况
- 如果Producer或Consumer连接到的Broker没有相关的Topic的,那么消息会自动路由到相关的Broker, 下一次Producer或者Consumer会自动记住相关的Broker
- 在发送消息后,每个消息会依次排列到每个Partition
- 消息是可以通过配置来决定要在Partition上保留多久
- 每个消费者可以从不同队列位置来开始消费消息,并且可以重复消费
- 每个Topic可以有多个Partitions
- 每个Partition是分布在不同Brokers上的
- 如果一个Topic有多个Partitions, 那么Consumer收到的消息不能保证有序
这边只是对Kafka一个基本收发消息做一个简单描述,在第二篇会有更为具体的原理解释
Kafka使用的代码实现
Producer的创建
这边使用spring IO中的kafka的依赖带的KafkaTemplate
@Component
public class Producer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic , String payload) {
kafkaTemplate.send(topic, payload);
}
}
topic如果不存在的话kafka server会自动创建的。
这边需要对KafkaTemplate 进行一些配置因为需要序列化key/value,配置如下
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
其中bootstrapServers 是producer要发送的kafka server地址。
这边要注意的是bootstrapServers必须是domain name(比如 xxx.com:9092), 直接写IP地址的话可能会导致消息发送后没有响应,如果是本地测试可以用localhost:9092, 或者修改hosts文件。
Consumer的创建
创建Consumer
@Component
public class Consumer {
@KafkaListener(topics = "${kafka.topic.testtopic}")
public void receive(String payload) {
System.out.println(payload);
}
}
当Consumer消费消息时需要反序列化,所以也要进行一些配置如下
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topic.testtopic}")
private String topic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
演示代码
以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-kafka 目录下载并运行