kafka_03_Kafka消息发送

Kafka消息发送

1. 构建ProducerRecord对象

该类如下:

public class ProducerRecord<K, V> {

    //The topic the record will be appended to
    private final String topic;
    //The partition to which the record should be sent
    private final Integer partition;
    //the headers that will be included in the record
    private final Headers headers;
    //The key that will be included in the record
    private final K key;
    //The record contents
    private final V value;
    //The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the timestamp using System.currentTimeMillis().
    private final Long timestamp;
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    
    //Create a record with no key
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
    //省略getters和setters
}

这里有5个构造方法,但是最后用的就是其中的一个。实际应用中,构建ProducerRecord对象是非常频繁的操作。

2. 发送消息

发送消息有三种模式,发后即忘(fire-and-forget), 同步(sync),异步(async)

2.1 fire-and-forget(上一篇博客介绍的就是发后即忘)

 producer.send(record);

特点: 效率高,可靠性差

2.2 同步

代码如下:

package com.ghq.kafka.server;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 消息的发送
 */
public class ProducerSendMessage {

    public static final String brokerList = "192.168.52.135:9092";
    public static final String topic = "topic-demo";

    public static Properties initProperties(){
        Properties prop = new Properties();
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
        /**
         *
         * 配置重试次数,这里重试10次,重试10次之后如果消息还是发送不成功,那么还是会抛出异常
         * 那些类可以重试呢?
         * org.apache.kafka.common.errors.RetriableException 及其子类
         *
         */
        prop.put(ProducerConfig.RETRIES_CONFIG,10);
        return prop;
    }

    public static void sync() {

        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        //3. 发送消息
        while (true){
            /**
             * send方法本身就是异步的
             */
            Future<RecordMetadata> future = producer.send(record);

            try {
                /**
                 * get方法是阻塞的
                 * 这里返回 RecordMetadata,包含了发送消息的元数据信息
                 */
                RecordMetadata metadata = future.get();
                System.out.println("topic:"+metadata.topic());
                System.out.println("partition:"+metadata.partition());
                System.out.println("offset:"+metadata.offset());
                System.out.println("hasTimestamp:"+metadata.hasTimestamp());
                System.out.println("-----------------------------------------");
                Thread.sleep(1000);
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }

        }
        //4. 关闭资源
        //producer.close();

    }
}

特点:性能差,可靠性高

注:什么异常可以重试?RetriableException

RetriableException.jpg

2.3 异步

public static void async() {
        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        Future<RecordMetadata> future = producer.send(record, new Callback() {

            /**
             * metadata 和 exception 互斥
             * 消息发送成功:metadata != null exception == null
             * 消息发送失败:metadata == null exception != null
             */
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if (exception != null) {
                    System.out.println("消息发送失败:"+metadata);
                }else {
                    System.out.println("消息发送成功:"+metadata);
                }
            }
        });
        
        //这里采用lambda表达式
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.out.println("消息2发送失败:"+metadata);
            }else {
                System.out.println("消息2发送成功:"+metadata);
            }
        });
        producer.close();
    }

输出结果如下:

消息1发送成功:topic-demo-0@22
消息2发送成功:topic-demo-2@24

特点:性能 :同步 < 异步 < 发后即忘,可靠性:同步 > 异步 > 发后即忘

结束。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容