序
本文主要解析一下spring for apache kafka对原生的kafka client producer的封装与集成。
producer工厂
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
@Override
public void destroy() throws Exception { //NOSONAR
CloseSafeProducer<K, V> producer = this.producer;
this.producer = null;
if (producer != null) {
producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
}
}
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
try {
destroy();
}
catch (Exception e) {
logger.error("Exception while stopping producer", e);
}
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public Producer<K, V> createProducer() {
if (this.producer == null) {
synchronized (this) {
if (this.producer == null) {
this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
}
}
}
return this.producer;
}
}
集成spring的第一步就是集成到spring容器托管,然后跟随spring容器的生命周期正常启动和销毁。这里创建了CloseSafeProducer,它实际的操作都委托给kafka producer
KafkaTemplate
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/KafkaTemplate.java
实现了如下接口
public interface KafkaOperations<K, V> {
/**
* Send the data to the default topic with no key or partition.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(V data);
/**
* Send the data to the default topic with the provided key and no partition.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
/**
* Send the data to the default topic with the provided key and partition.
* @param partition the partition.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);
/**
* Send the data to the provided topic with no key or partition.
* @param topic the topic.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, V data);
/**
* Send the data to the provided topic with the provided key and no partition.
* @param topic the topic.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
/**
* Send the data to the provided topic with the provided partition and no key.
* @param topic the topic.
* @param partition the partition.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);
/**
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);
/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
* @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
/**
* See {@link Producer#partitionsFor(String)}.
* @param topic the topic.
* @return the partition info.
* @since 1.1
*/
List<PartitionInfo> partitionsFor(String topic);
/**
* See {@link Producer#metrics()}.
* @return the metrics.
* @since 1.1
*/
Map<MetricName, ? extends Metric> metrics();
/**
* Execute some arbitrary operation(s) on the producer and return the result.
* @param callback the callback.
* @param <T> the result type.
* @return the result.
* @since 1.1
*/
<T> T execute(ProducerCallback<K, V, T> callback);
/**
* Flush the producer.
*/
void flush();
/**
* A callback for executing arbitrary operations on the {@link Producer}.
* @param <K> the key type.
* @param <V> the value type.
* @param <T> the return type.
* @since 1.1
*/
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
}
主要的send方法如下,这就是spring对producer的主要包装的地方:
/**
* Send the producer record.
* @param producerRecord the producer record.
* @return a Future for the {@link RecordMetadata}.
*/
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception == null) {
future.set(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null
&& KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
}
}
else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
producerRecord.partition(),
producerRecord.key(),
producerRecord.value(),
exception);
}
}
}
finally {
producer.close();
}
}
});
if (this.autoFlush) {
flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}
不要被CloseSafeProducer的close方法误导,它里头是个空方法。
- spring对send方法包装了一层之后,返回SettableListenableFuture,里头是个SendResult
- 然后对异常也进行了一次包装,包装为spring定义的KafkaException
- 支持了listener,同步调用
- 内置MessagingMessageConverter