常用命令
- 启动Zookeeper
./zkServer.sh start-foreground
可选参数:
./zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
- 启动ZooInspector,可以查看注册到Zookeeper的Kafka broker和topic情况:
java -jar zookeeper-dev-ZooInspector.jar
- 启动Kafka
./kafka-server-start.sh ../config/server.properties
- 创建topic,指定3个分区,每个分区1个副本
./kafka-topics.sh --create -topic testtopic -partitions 3 -replication-factor 1 -zookeeper localhost:2181
- 列出所有topic
./kafka-topics.sh --list -zookeeper localhost:2181
- 删除topic
./kafka-topics.sh --delete -topic testtopic -zookeeper localhost:2181
- 使用producer命令行工具
./kafka-console-producer.sh -topic testtopic --broker-list localhost:9092
- 使用consumer命令行工具
注意:--from-beginning
会从初始offset位置开始接收消息;不加该参数从当前offset位置开始。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic testtopic --from-beginning
Java API使用
- Producer API
public class SampleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducer.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("client.id", "DemoProducer");
//序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("testtopic", "hello world");
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(recordMetadata);
}
}
- Consumer API
public class SampleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
//关闭自动位移提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//订阅topic
consumer.subscribe(Arrays.asList("testtopic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println(record.value());
});
//手动提交位移
consumer.commitAsync();
}
}