这周来学习一点新的技术吧,感觉自己消息队列方面用的挺少的,所以这周学习下这方面的知识,我选择的是很流行的Kafka,今天抽时间就简单的了解一下。任何一门技术的学习都不是简简单单学习quick start就完事的,以前自己就容易犯这样的错误,感觉入个门就行,其实这样是错误的,入门只是开始。不过今天还是要先从quick start开始,先了解一些基本的概念,然后和spring boot整合一下,看看在项目中如何使用Kafka。
1、基本介绍
Kafka是一个分布式的流平台,主要有三个重要功能:
1、发布和订阅记录流,类似于消息队列或企业消息传递系统。
2、以容错的持久方式存储记录流
3、当记录流发生时处理它们。
Kafka通常用于两大类应用:
1、构建可在系统或应用程序之间可靠获取数据的实时流数据管道
2、构建转换或响应数据流的实时流应用程序
Kafka的核心API主要有四个:Producer API、Consumer API、Streams API、Connector API。根据字面意思也很好理解,就是生产者、消费者、流、连接器。
Producer:允许应用程序将记录流发布到一个或多个topics,关于什么是topics后面会讲到。
Consumer:则允许应用程序订阅一个或多个topics并处理生成的记录流。
Streams:允许应用程序充当流处理器,消费来自一个或多个topics的输入流,并生产出一个或多个输出topics的输出流,从而有效地将输入流转换为输出流。简单点理解可以认为它是一个转换器或者加工厂,将接收到输入流进行一定处理,然后再输出出去。
Connector:允许构建和运行可重用生产者或消费者,这些生产者和消费者将topics连接到现有应用程序或数据系统。
topics:topics是Kafka中很重要的一个概念,不管生产者生产数据还是消费者消费数据都离不开topcis。topics或者说主题是发布记录的类别或者别名,topics是一个多订阅者,也就是说一个topics可以有任意多个消费者订阅写入它的数据。
对于每一个topic,Kafka的集群都维护着一个分区日志文件,下面是官网的一张图片:
上面这个图片可以看出一个topic有3个分区,分别是partition0、partition1、partition2。每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中。并且分区中的每条记录都被分配一个称为offset的有序ID号,这个ID号唯一地标识分区中的每条记录。上图中显示的0、1、2....这些都是ID号,一个ID对应着一条数据,类似于数据库中的数据行。
Kafka集群通过使用一个可配置过期时间来持久化保留已发布的消息记录,无论这些记录是否被消费。比如,我设置的过期时间时两天,那么在记录发布之后的两天内我都是可以使用的,超过两天的话这些记录将会被清除掉。Kafka的性能在数据大小方面实际上是恒定的,也就是说性能和数据大小无关,因此长时间保存数据并不是一个问题。
实际上,基于每个消费者保留的唯一元数据就是该消费者在日志中的偏移量或位置。偏移量是由消费者控制的,通常情况下,消费者读取数据的时候会线性提高其偏移量,这个很好理解,因为偏移量本身就是线性增加的。但是,因为偏移量实际由消费者控制,所以其实消费者可以读取任意偏移量上的数据记录。
日志中的分区有几个作用:1、它们允许日志扩展到超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但主题可能有许多分区,因此它可以处理任意数量的数据。2、它们作为并行性的一个单元,更多的是在一点上。
关于理论的知识先了解到这里,主要了解它们的作用就行了,另外就是要多看官方的文档。下面试着启动Kafka并作一点简单操作。
2、安装和启动
首先到官网下载Kafka的压缩包,然后解压。因为Kafka的启动依赖于zookeeper的,当然这个zookeeper可以说Kafka自带的,也可以是你自己配置,有兴趣可以使用自己安装的zookeeper来启动,我就偷懒了。首先切换到Kafka的bin目录下,在这个目录下我们看到有很多的脚本文件,我们先启动zookeeper。
ypcfly@linux:~/ypcfly/kafka/kafka_2.11-2.1.0/bin$ ./zookeeper-server-start.sh
USAGE: ./zookeeper-server-start.sh [-daemon] zookeeper.properties
ypcfly@linux:~/ypcfly/kafka/kafka_2.11-2.1.0/bin$ ./zookeeper-server-start.sh ../config/zookeeper.properties
这里提示需要指定zookeeper的配置文件,Kafka的配置文件都是在config目录下,所以指定好配置文件再重启zookeeper。
接下来是启动Kafka的服务器,和zookeeper一样,也要指定Kafka server的配置文件。重新开一个窗口,执行下面的命令
./kafka-server-start.sh ../config/server.properties
这时候整个Kafka就启动成功了,我们先不着急进行操作,简单看下Kafka的配置文件,都有哪些比较重要的内容。
broker.id=0 # broker 可以近似的理解为Kafka服务器,broker中文为经纪人、代理人的意思
listeners=PLAINTEXT://:9092 #监听的端口号,默认9092
log.dirs=/tmp/kafka-logs #日志文件存储位置,可以指定多个,有逗号分隔
num.partitions=1 #每个topic分区数量,默认1
log.retention.hours=168 # 日志保留时间,也就是过期策略,默认168小时
除了这些以外还有一些配置,就不再详细说明了。下面我们创建一个topic,并使用生产者和消费者来对记录进行操作。先创建一个topic,并指定相关参数
./kafka-topics.sh --create --zookeeper localhost:2181 --topic my_topic --replication-factor 1 --partitions 1
下面解释下命令中各参数的意思:--zookeeper,应该不需要解释了吧,就是zookeeper的服务地址和端口号。--create表示创建topic(还有delete),--topic后面是创建的topic的名称,这里我创建的时候有个警告,说不能使用"."和"_",不过只是警告,但是最好还是不要用这两个符号。--partitions后跟着这个topic创建的分区的数量。--replication-factor后面跟着是每一个分区副本的数量。通过list命令可以查看topic的数量
./kafka-topics.sh --list --zookeeper localhost:2181
接下来使用一个生产者向上面创建的topic发送数据,然后使用消费者接受topic的数据。先创建生产者,再创建消费者
./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic #生产者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning #消费者
因为我本地命令模式运行消费者有点问题,不知道为什么我生产者已经发出消息了,但是执行消费的命令控制台什么不显示,不知道是不是Kafka出错了,昨天刚安装好的时候还是可以的,清理了日志文件重启依然不行。
3、与spring boot整合
关于spring与Kafka整合,spring官方有专门的文档,有兴趣可以参考spring for apache kafka,而且也有例子,当然也可以自己创建。用idea创建一个spring boot项目,选择相关模块,并添加相关依赖。项目配置文件如下:
server.port=8090
# 数据库配置
spring.datasource.url=jdbc:postgresql://localhost:5432/test_db?useSSL=false&characterEncoding=utf8
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.username=postgres
spring.datasource.password=123456
# mybatis 配置
mybatis.type-aliases-package=com.ypc.kafka.entity
mybatis.mapper-locations=classpath:mapper/*.xml
# kafka配置
spring.kafka.consumer.group-id=custom_consumer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
主要是指定Kafka消费者组的ID,已经服务器的地址和端口号,关于consumer、producer的那几个配置我也不是特别清楚,这个有时间需要看看文档。
这个项目主要是把Kafka作为消息队列使用的,所以我就想使用生产者发送消息,然后由监听的消费者处理消息,太复杂我这里也不会。
controller接受前台传过得参数,然后保存到数据库,成功后则调用发送消息的service将这个数据传递给消费者,消费者进行其他处理,自己都感觉会不会太简单了点......,但是复杂的操作自己也还没学。
自己就把消息发送者和消费者的代码贴一下吧
@Service
public class SenderService {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送消息
* @param topic
* @param message
*/
public void sendMessage(String topic, String message){
kafkaTemplate.send(topic,message);
}
}
这里发送消息使用的是kafkaTemplate,spring对第三方的支持好像都喜欢使用***Template,比如RedisTemplate、RestTemplate等等。kafkaTemplate重写了多个send方法,自己根据情况选择就好了。我还是建议多看看spring的文档。
消费者,消费者的话主要是使用一个注解,即@KafkaListener,这个注解参数很多,这里就不再详细说明了,我是监听一个topic的消息,当然也可以监听多个topic。
@Service
public class ConsumerService {
/**
* 监听消息
* @param record
*/
@KafkaListener(topics = "custom_kafka")
public void listen(ConsumerRecord<?, ?> record) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("topics=").append(record.topic()).append(", offset=").append(record.offset())
.append(", value=").append(record.value());
String value = (String) record.value()
//
System.out.println(stringBuilder.toString());
}
}
listen方法里面的ConsumerRecord包装了消息记录的很多信息,比如偏移量,消息值,消息头,主题等等。我们可以从中取出我们需要的数据,可以看下具体的代码:
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
.....省略代码
}
使用idea的自带的rest client调用我的接口,传递的json参数如下:
{ "age":17, "mobile":"13885474512", "username":"kafka" }
最后输出结果如下:
topics=custom_kafka, offset=0, value={"id":6,"username":"kafka","age":17,"mobile":"13885474512"}
也就是说消费者成功接收到了存入数据库成功的数据,接下怎么对这些数据,就看具体的业务需要了。当然我这里只有一条数据,所以offset为0。像电商经常涉及到的秒杀业务,也是将数据写入消息队列,然后再转给具体的服务进行处理。