linux搭建,kafkao3节点虚拟机为CentOS6,ip为192.168.1.128,192.168.1.129和192.168.1.130,域名分别为master,worker1,worker2
1. 集群
#192.168.1.128[root@masterlocal]#cd/home/gilbert/app/rar/[root@masterrar]#tarzxvfkafka_2.10-0.10.2.0.tgz[root@masterrar]#mvkafka_2.10-0.10.2.0/home/gilbert/app/kafka
配置文件路径:kafka/config/server.properties
修改配置文件
启动kafka
[root@masterkafka]#./bin/kafka-server-start.shconfig/server.properties&[2018-06-2502:31:21,931]INFOKafkaConfigvalues:advertised.host.name=nulladvertised.listeners=nulladvertised.port=nullauthorizer.class.name=auto.create.topics.enable=trueauto.leader.rebalance.enable=truebackground.threads=10broker.id=0broker.id.generation.enable=truebroker.rack=nullcompression.type=producerconnections.max.idle.ms=600000controlled.shutdown.enable=truecontrolled.shutdown.max.retries=3controlled.shutdown.retry.backoff.ms=5000controller.socket.timeout.ms=30000
创建topic
#创建topictopic名字为gilbert[root@masterkafka]#./bin/kafka-topics.sh--create--zookeepermaster:2181,worker1:2181,worker2:2181--replication-factor3--partitions3--topicgilbertCreatedtopic"gilbert".
查看topic
[root@masterkafka]#./bin/kafka-topics.sh--describe--zookeepermaster:2181,worker1:2181,worker2:2181--topicgilbertTopic:gilbertPartitionCount:3ReplicationFactor:3Configs:Topic:gilbertPartition:0Leader:2Replicas:2,0,1Isr:2,0,1Topic:gilbertPartition:1Leader:0Replicas:0,1,2Isr:0,1,2Topic:gilbertPartition:2Leader:1Replicas:1,2,0Isr:1,2,0 [root@masterkafka]#./bin/kafka-topics.sh--list--zookeepermaster:2181,worker1:2181,worker2:2181gilberttest
创建producer
./bin/kafka-console-producer.sh--broker-listmaster:9092-topicgilbert
创建consumer,分别在3台服务器上执行创建消费者
#192.168.1.128服务器[root@masterkafka]#./bin/kafka-console-consumer.sh--zookeepermaster:2181,worker1:2181,worker2:2181-topicgilbert--from-beginningUsingtheConsoleConsumerwitholdconsumerisdeprecatedandwillberemovedinafuturemajorrelease.Considerusingthenewconsumerbypassing[bootstrap-server]insteadof[zookeeper]. #192.168.1.129服务器[root@worker1kafka_2.10-0.10.2.0]#./bin/kafka-console-consumer.sh--zookeepermaster:2181,worker1:2181,worker2:2181-topicgilbert--from-beginningUsingtheConsoleConsumerwitholdconsumerisdeprecatedandwillberemovedinafuturemajorrelease.Considerusingthenewconsumerbypassing[bootstrap-server]insteadof[zookeeper].#192.168.1.130服务器[root@worker2kafka_2.10-0.10.2.0]#./bin/kafka-console-consumer.sh--zookeepermaster:2181,worker1:2181,worker2:2181-topicgilbert--from-beginningUsingtheConsoleConsumerwitholdconsumerisdeprecatedandwillberemovedinafuturemajorrelease.Considerusingthenewconsumerbypassing[bootstrap-server]insteadof[zookeeper].
在#192.168.1.128服务器上生产者控制台输入:hello kafka进行测试
在3台服务器上的消费者都正常接收到消息
删除topic
[root@masterkafka]#./bin/kafka-topics.sh--delete--zookeepermaster:2181,worker1:2181,worker2:2181--topictestTopictestismarkedfordeletion.Note:Thiswillhavenoimpactifdelete.topic.enableisnotsettotrue
springboot集成kafka
1.生产者kafka-producer
a) pom文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
b) yml配置文件,本例为kafka3节点集群
spring: kafka: bootstrap-servers: http://master:9092,http://worker1:9092,http://worker2:9092 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
c) message消息实体类
@DatapublicclassMessage{privateLongid;//idprivateStringmsg;//消息privateDatesendTime;//时间戳}
d) 生产者
@Component@Slf4jpublicclassKafkaProducer{@AutowiredprivateKafkaTemplate<String,String>kafkaTemplate;privateGsongson=newGsonBuilder().create();//发送消息方法publicvoidsend() {Messagemessage=newMessage();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(newDate());log.info("+++++++++++++++++++++ message = {}",gson.toJson(message)); //topic-ideal为主题kafkaTemplate.send("topic-ideal",gson.toJson(message)); }}
e) 测试类,运行kafkaProducer方法即可
@RunWith(SpringRunner.class)@SpringBootTestpublicclassKafkaProducerApplicationTests{ @Autowired privateKafkaProducerkafkaProducer; @Test publicvoidkafkaProducer(){ this.kafkaProducer.send(); } @Test publicvoidcontextLoads() { }}
2. 消费者kafka-consumer
a) pom文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
b) yml配置文件
server: port: 9999spring: kafka: bootstrap-servers: http://master:9092,http://worker1:9092,http://worker2:9092 consumer: group-id: ideal-consumer-group auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 20000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
c) 消费者类
@Component@Slf4jpublicclassKafkaConsumer{@KafkaListener(topics={"topic-ideal"})publicvoidconsumer(ConsumerRecord<?,?>record){Optional<?>kafkaMessage=Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {Objectmessage=kafkaMessage.get();log.info("----------------- record ="+record);log.info("------------------ message ="+message); } }}
运行消费者kafka-consumer,再运行kafka-producer工程测试类KafkaProducerApplicationTests中kafkaProducer()方法,可以看到消费者后台正常接收消息