zookeeper 环境搭建
下载地址
http://zookeeper.apache.org/releases.html
解压:
tar –zxvf zookeeper-3.4.6.tar.gz –C /usr/local
cd /usr/local
重命名
mv zookeeper-3.4.6 zookeeper
cd zookeeper/conf
cp zoo_sample.cfg zoo.cfg
配置
- 在zoo.cfg中追加以下内容
#server.n=ip:portA:portB
#server.n=ip:portA:portB
#n是服务器标识号(1~255)
#ip是服务器ip地址
#portA是与leader进行信息交换的端口
#portB是在leader宕机后,进行leader选举所用的端口
server.1=200.31.157.116:20881:30881
server.2=200.31.157.116:20882:30882
server.3=200.31.157.117:20881:30881
注:
配置文件信息解析:
tickTime:毫秒级的基本时间单位,其他时间如心跳/超时等都为该单位时间的整数倍。
initLimit:tickTime的倍数,表示leader选举结束后,followers与leader同步需要的时间,leader的数据非常多或followers比较多时,该值应适当大一些。
syncLimit:tickTime的倍数,表示follower和observer与leader交互时的最大等待时间,是在与leader同步完毕之后,正常请求转发或ping等消息交互时的超时时间。
clientPort:监听客户端连接的服务端口,若一台服务器上安装多个ZooKeeper server,则需要设置不同的端口号。
dataDir:内存数据库快照地址,事务日志地址(除非由dataLogDir另行指定)。
- 在$dataDir下新建文件myid,并写入服务器标识号
/tmp/zookeeper为dataDir
cd /tmp/zookeeper/
sudo vim myid
在myid中添加服务器标识号
相关命令
启动:sh zkServer.sh start
停止:sh zkServer.sh stop
查看状态: sh zkServer.sh status
kafka 环境搭建
下载地址:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1/kafka_2.9.2-0.8.1.1.tgz
解压:
tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
mv kafka_2.9.2-0.8.1.1 kafka
cd kafka/conf
配置
在配置文件server.properties修改如下内容:
#broker.id是broker的标识,具有唯一性
broker.id=0
#端口号默认为9092
port=9092
#host.name位kafka所在机器的ip
host.name=192.168.101.28
#设置zookeeper,可连接多个zookeeper服务器
zookeeper.connect=192.168.101.28:2182
相关命令
开启
sh kafka-server-start.sh ../config/server.properties
查看
netstat -lnp|awk 'BEGIN{prt=":9092$"}{if ($4 ~ prt) print $0}'
停止
sh kafka-server-stop.sh
创建:
sh kafka-topics.sh --create --zookeeper 192.168.101.28:2181 --replication-factor 1 --partitions 1 --topic TOPIC_TASK_DETAIL
查看
sh kafka-topics.sh -list -zookeeper 192.168.101.28:2181
发消息
sh kafka-console-producer.sh --broker-list 192.168.101.28:9092 --topic Test
收消息
sh kafka-console-consumer.sh --zookeeper 192.168.101.28:2181 --topic Test--from-beginning
Spring 配置
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
kafka.properties
bootstrap.servers=192.168.101.29:9092
group.id=0
retries=10
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=15000
kafka-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:kafka.properties" />
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="${group.id}"/>
<entry key="retries" value="${retries}"/>
<entry key="batch.size" value="${batch.size}"/>
<entry key="linger.ms" value="${linger.ms}"/>
<entry key="buffer.memory" value="${buffer.memory}"/>
<entry key="key.serializer" value="${key.serializer}"/>
<entry key="value.serializer" value="${value.serializer}"/>
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="mhb-test"/>
</bean>
</beans>
kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:kafka.properties" />
<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="${group.id}"/>
<entry key="enable.auto.commit" value="${enable.auto.commit}"/>
<entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}"/>
<entry key="session.timeout.ms" value="${session.timeout.ms}"/>
<entry key="key.deserializer" value="${key.deserializer}"/>
<entry key="value.deserializer" value="${value.deserializer}"/>
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.kfaka.KafkaConsumer"/>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics">
<list>
<value>TEST</value>
<value>TEST_DETAIL</value>
</list>
</constructor-arg>
<property name="messageListener" ref="messageListernerConsumerService"/>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
</beans>
KafkaConsumer
package com.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
/**
* @Description:
* @author: york
* @date: 2017-01-22 17:38
* @version: v1.0
*/
public class KafkaConsumer implements MessageListener<Integer, String>{
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
}
}
KafkaProducer
package com.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Repository;
/**
* @author york
* @version
* Date:2017-01-22 17:22
* @since
*/
@Repository("KafkaProducer")
public class KafkaProducer {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
/**
* @author york
* Date:2017-01-22 17:22
*/
public void send(String key, String msg){
kafkaTemplate.send(key, msg);
}
}
KafkaClientTest
import com.kafka.KafkaProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Created by lewis on 2016/6/29.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:application.xml")
public class KafkaClientTest {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void test() throws Exception {
kafkaProducer.send("TEST", "hello kafka");
}
}
以上、源码稍后放出
git源码