背景
为了能平滑、不改写代码、无侵入地迁移应用使用pulsar,KoP以插件形式支持Kafka协议。
Github地址:https://github.com/streamnative/kop
一、Kafka协议适配器KoP
KoP(Kafka on Pulsar)通过在 Pulsar 代理上引入 Kafka 协议处理程序,为 Apache Pulsar 带来了原生的Apache Kafka协议支持。通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到Pulsar,而无需修改代码。这使 Kafka 应用程序能够利用 Pulsar 的强大功能。
KoP 作为 Pulsar协议处理程序插件实现,协议名称为“kafka”,在 Pulsar broker 启动时加载。它通过在Apache Pulsar上提供原生 Kafka 协议支持,这样可以大大降低学习Pulsar的成本。基于KOP方案, 整合两个流行的事件流生态系统软件。使用ApachePulsar 构建真正统一的事件流平台,以加速实时应用程序和服务的开发。
1.1 下载Kop版本
- 下载与pulsar版本一致的kop
https://github.com/streamnative/kop/releases
下载的文件为:pulsar-protocol-handler-kafka-2.8.0.7.nar
1.2 安装部署Kop
- 1.2.1 将KOP NAR包上传到Pulsar的protocols目录中,如果没有此目录,直接创建即可
$cd /workspace/apache-pulsar-2.8.0.7/
$mkdir protocols
$cp /home/admin/pulsar-protocol-handler-kafka-2.8.0.7.nar protocols/ #将Kop包放入到protocols目录下
- 1.2.2 修改broker配置文件,设置Kop的相关配置信息
$cd /workspace/apache-pulsar-2.8.0.7/
$vim conf/broker.conf
allowAutoTopicCreationType=partitioned
messagingProtocols=kafka
protocolHandlerDirectory=/workspace/apache-pulsar-2.8.0.7/protocols
kafkaListeners=PLAINTEXT://10.9.0.54:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
默认情况下,allowAutoTopicCreationType设置为non-partitioned。您需要设置allowAutoTopicCreationType为,partitioned因为 KoP 仅支持分区主题。否则,KoP 自动创建的主题仍然是分区主题,而 Pulsar broker 自动创建的主题是非分区主题。
- 1.2.3 设置 Kafka 侦听器
kafkaListeners=PLAINTEXT://10.69.2.25:9092
kafkaListeners 是一个以逗号分隔的侦听器列表,以及 Kafka 绑定到的主机/IP 和端口以进行侦听。
- 1.2.4 设置偏移管理如下,因为 KoP 的偏移管理依赖于 Pulsar “Broker Entry Metadata”。KoP 2.8.0 或更高版本需要它。
brokerEntryMetadataInterceptors =org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
- 1.2.5 禁用删除非活动主题。它不是必需的,但在 KoP 中非常重要。目前,Pulsar 会删除已分区主题的非活动分区,而不会删除已分区主题的元数据。在这种情况下,KoP 无法 创建丢失的分区。
brokerDeleteInactiveTopicsEnabled =false
说明
KOP的nar包各个broker节点都要上传操作
配置文件每个Broker节点也是都要修改的, 其中 kafkaListeners各个节点要更改为自己的IP或者主机名
1.2.6 通过重启 Pulsar broker 加载 KoP
$cd /workspace/apache-pulsar-2.8.0.7/
$bin/pulsar-daemon stop broker
$bin/pulsar-daemon start broker
1.3 生产者—基于KAFKA的Java API方式
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @Author: huangyibo
* @Date: 2022/6/6 22:12
* @Description:
*/
public class KafkaKopProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
1.4 消费者—基于KAFKA的Java API方式
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @Author: huangyibo
* @Date: 2022/6/6 22:15
* @Description:
*/
public class KafkaKopConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
二、AMQP 协议适配器AoP
AoP 是基于 Pulsar 特性实现的。但是,使用 Pulsar 和使用 AMQP 的方法是不同的。以下是 AoP 的一些限制。
- 目前,AoP 协议处理程序支持 AMQP0-9-1 协议,仅支持持久交换和持久队列。
- 一个 Vhost 由一个只能有一个包的命名空间支持。您需要提前为 Vhost 创建一个命名空间。
- Pulsar 2.6.1 或更高版本支持 AoP。
目前基于AMQP协议主要是 RabbitMQ为代表的消息队列,可以说 Pulsar的AOP方案主要是完成RabbitMQ到Pulsar之间的迁移工作,开发者可以基于AOP更方便的将原有在RabbitMQ部署业务迁移到Pulsar上。
2.1 AoP 框架概览
AoP 是一个可插拔的协议处理插件,可以通过使用 Pulsar 的 topics, cursors 等特性在 Pulsar 上支持原生 AMQP 协议。
下图展示了AoP 协议处理插件与 Pulsar 集群的结合。AMQP Proxy 服务和 AMQP 协议处理插件都与 Pulsar broker 一起运行。目前,AoP 是基于 AMQP 0.9.1 协议进行开发。
AMQP 0.9.1 引入了一些基础概念,例如 Exchagne, Queue 和 Router。这些与 Pulsar 的模型有着较大的区别。所以我们需要找到一种方法,支持利用 Pulsar 现有的一些特征,并将它们联系在一起。下图展示了消息在 AoP 中的流转,并讨论了关于消息持久化,消息路由,消息投递的细节。
1、当 Producer 发送消息到 AmqpExchange,AmqpExchange 将消息持久化到 Pulsar Topic (我们称之为存储原始消息的 Topic)。
2、AmqpExchange 的 replicator 会将消息传递给 Router。
3、Router 判断是否需要将消息路由给 AmqpQueue。如果是,会将原始消息的 ID 存入AmqpQueue 的 Topic 中 (我们称之为存储索引消息的 Topic)。
4、AmqpQueue 将消息传递给 consumer。
AmqpExchange
AmqpExchange 包含一个原始消息 Topic,用来保存 AMQP producer 发送的消息。AmqpExchange 的 replicator 会将消息处理到 AMQP 队列中。Replicator 是基于 Pulsar 的持久化游标,可以确保成功将消息发送到队列,而不会丢失消息。
AmqpMessageRouter
AmqpMessageRouter 用于维护消息路由类型以及将消息从 AmqpExchange 路由到 AmqpQueue 的路由规则。路由类型和路由规则这些原数据都持久化在 Pulsar 的存储中。所以就算 broker 重启,我们也可以恢复 AmqpMessageRouter。
AmqpQueue
AmqpQueue 提供一个索引消息 Topic,用来存储路由到这个队列的 IndexMessage。IndexMessage 由原始消息的 ID 和存储消息的 Exchange 的名称组成。当 AmqpQueue 向 consumer 发送消息时,AmqpQueue 会根据 IndexMessage 读取原始消息数据,然后将其发送给 consumer。
Vhost 分配
在 AoP 中,一个 AMQP Vhost 只能由一个 Pulsar broker 提供服务,而一个 Pulsar broker 可以为多个 Vhost 提供服务。所以增加 Vhost 和 broker 的数量可以达到横向扩容的效果。通过使用更多的 Vhost 可以使用户构建更大的 AoP 集群。
在 AoP 中,一个 Vhost 基于一个 Pulsar namespace,并且这个 namespace 只能有一个 bundle。如果一台 broker 崩溃,其他的 broker 可以接管这台崩溃的 broker 维护的 Vhost。Vhost 也可以利用 broker 的负载均衡机制。broker 可以将 Vhost 从一台高负载的机器转移到一台空闲的机器。下图展示了 Vhost 在 broker 上的分配情况。
Proxy
AoP Proxy 用于在客户端与 AMQP 服务连接时,帮助查找负责处理 Vhost 数据的 owner broker,并在客户端与 Vhost 的 owner broker 之间传输数据。
如上一节所述,Vhost 由集群中的一个 broker 提供服务,这可以通过 Pulsar 的 Topic 发现机制来实现。这也是为什么一个 Vhost 只能由带有单个 bundle 的 namespace 支持。如果一个 namespace 有多个 bundle,用户很难通过 Vhost 名称来确定 owner broker。
下图说明了 AoP Proxy 的服务流程。
1、AMQP 客户端建立与 AoP Proxy 的连接。
2、AoP Proxy 向 Pulsar cluster 发送查找请求,以便确定 Vhost 的 owner broker 的 URL 地址。
3、Pulsar 集群 将 owner broker 的 URL 地址返回给 AoP Proxy。
4、AoP Proxy 建立与 Vhose 的 owner broker 的连接并开始在 AMQP 客户端和 Vhost 的 owner broker 之间传输数据。
目前,AoP Proxy 与 Pulsar broker 共同工作。用户可以通过配置 amqpProxyEnable 来选择是否开启 AoP Proxy 服务。https://github.com/streamnative/aop/wiki
2.2 下载AoP NAR包
https://github.com/streamnative/aop/releases
2.3 修改Pulsar的broker.conf配置文件
$cd /workspace/apache-pulsar-2.8.0.7/
$vim conf/broker.conf
messagingProtocols=amqp
protocolHandlerDirectory=./protocols
amqpListeners=amqp://10.9.0.54:5672
2.4 说明
- 1、AOP的nar包各个broker节点都要上传操作
- 2、配置文件每个Broker节点也是都要修改的,其中 kafkaListeners各个节点要更改为自己的IP或者主机名
2.5 重启各个Broker节点
参考:
https://blog.51cto.com/u_536410/3516515