Kafka集群安装步骤:
1、解压
2、修改server.properties
broker.id=1
zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181
3、将zookeeper集群启动
4、在每一台节点上启动broker
bin/kafka-server-start.sh config/server.properties
5、在kafka集群中创建一个topic
bin/kafka-topics.sh --create --zookeeper zk_hostname:2181 --replication-factor 3 --partitions 1 --topic topic_name
6、用一个producer向某一个topic中写入消息
bin/kafka-console-producer.sh --broker-list broker_hostname:9092 --topic topic_name
7、用一个comsumer从某一个topic中读取信息
bin/kafka-console-consumer.sh --zookeeper zk_hostname:2181 --from-beginning --topic topic-name
8、查看一个topic的分区及副本状态信息
bin/kafka-topics.sh --describe --zookeeper zk_hostname:2181 --topic topic_name
Java API
Producer端:
public class ProducerDemo{
public static void main(String[] args){
//创建Properties实例,设置属性
Properties properties = new Properties();
//声明zk
properties.put("zk.connect","zk_hostname:2181,....");
//声明broker
properties.put("metadata.broker.list","broker_hostname:9092,.....");
properties.put("serializer.class","kafka.serializer.StringEncoder");
//创建ProducerConfig配置实例
ProducerConfig config = new ProducerConfig(properties);
//创建Producer实例
Producer<String,String> producer = new Producer<String,String>(config);
//发送消息
producer.send(new KeyedMessage<String,String>(topic_name,content));
}
}
Consumer端:
public class ConsumerDemo{
public static void main(String[] args){
//创建Properties实例,设置属性
Properties properties = new Properties();
properties.put("zookeeper.connect","zk_hostname:2181,....");
//如果生产者和消费者在同一个组中,则不能访问同一组Topic内的数据
properties.put("group.id","id_name");
//声明ConsumerConnector
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
MaptopicCountMap = new HashMap();
//一次从一个主题中读取数据
topicCountMap.put(topic, 1);
topicCountMap.put(“topic_name", 1);
topicCountMap.put("topic_name1", 1);
Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[],byte[]>> streams = consumerMap.get("topic_name");
for(final KafkaStreamkafkaStream : streams){
new Thread(new Runnable() {
@Override
public void run() {
for(MessageAndMetadatamm : kafkaStream){
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
}
}
}