RocketMQ源码阅读(六)-消息生产者

上文介绍了双Master模式的就你部署, 本文分析消息生产者发送消息的过程.

1.例子

在源码包的org.apache.rocketmq.example.quickstart下有一个Producer发送消息的例子, 对它的代码进行微小的变更, 如下:

public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //设置name server
        producer.setNamesrvAddr("192.168.1.150:9876");
        //启动producer
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                //创建一个Message
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //发送
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        //关闭
        producer.shutdown();
    }

运行这段代码, 就会发现消息被发送到broker上, 例子比较简单, 不再贴出运行结果.
此处有两个问题:

  • 1.producer如何获取broker的地址?
  • 2.producer会把消息发送到所有的broker吗, 还是其中特定的broker?

根据网上找到的资料, 关于第一个问题, 是从name server上获取关于broker的信息(这个与自己的猜测也比较符合), 关于第二个问题, producer会从name server获取topic相关的路由信息, 其中包含了topic分布在哪几台broker上, 以及每台broker上对应的writequeue数目, producer会根据writequeue采用一定的负载均衡策略(默认是RoundRobin)分发到各个writequeue.

2.检验

从name server上获取关于broker的信息, 这个应该没啥问题, 根据上一篇文章 Linux环境搭建RocketMQ双Master模式的描述, broker在启动后会去name server注册.
关于第二个问题, broker会去name server上获取关于topic的路由信息. 也就是说创建topic的时候, 需要先在name server上发布信息.
接下来使用RocketMQ的admin tool来试验一下.
首先, 查看name server上当前有的topic.

topic列表

接下来, 使用mqadmin创建一个新的topic TopicTestBlance,

新建topic
TopicTestBlance路由信息

可以看到, 默认情况下, topic会同时分布在两个broker上, 并且writequeue数目的数目都是8. 这时运行第一部分中的程序, 将topic改为TopicTestBlance,

producer发送消息

可以看到producer会均匀的把消息发送到两台broker各自的队列上.
接下来更改TopicTestBlance的路由信息,将其中一台的队列数目改为2, 而另一台改为6.


更新路由信息

再运行程序, 结果如图所示:


producer发送消息

基本可以确定, producer确实时按照队列轮训的顺序向各个队列发送消息的. 下文去源码中验证.

3.源码分析

已上面例子中的程序为楔子, 一点点查看源码.

初始化

首先看DefaultMQProducer的构造函数.

    public DefaultMQProducer(final String producerGroup) {
        this(producerGroup, null);
    }
    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

其中, 两个参数:

  • producerGroup, 一些列producer的集合, 发送普通消息时,意义不大, 发布事务消息时比较重要, 本文不分析事务消息.
  • rpcHook, RocketMQ源码阅读(二)-通信模块中提过RocketMQ的通信模块允许用户在发送请求前或收到响应后执行hook函数.
    DefaultMQProducer内部封装了一个DefaultMQProducerImpl对象, 实现DefaultMQProducer的核心功能.
    使用producer之前先要调用start()函数进行一些初始化操作, DefaultMQProducer的start函数调用的其实是DefaultMQProducerImpl的start函数, 下面是其源代码:
public void start(final boolean startFactory) throws MQClientException {
    //判断producer当前的状态, 初始化完成后是CREATE_JUST状态
    switch (this.serviceState) {
        //初始化完成
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            //校验producerGroupName是否合法, 同一个进程内, producerGroupName必须是唯一的.
            this.checkConfig();
            //用进程id作为producerGroup的默认值
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            //mQClientFactory主要负责与name serve和broker的通信,
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            //将produerGroup, produer注册到本地的producerTable表中, 一个produerGroup对应一个producer
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            //在topicPublishInfoTable表中, 放入一个默认的topic和它的路由信息
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                //启动mQClientFactory
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            //serviceState状态变更为RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "//
                + this.serviceState//
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
    //发送心跳给所有的broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

这里注意最后一步sendHeartbeatToAllBrokerWithLock, 说明此时producer已经得到了所有broker的信息, 但是producer是在哪一步获取该信息的呢? 回溯代码发现答案就在mQClientFactory.start()中, mQClientFactory.start()方法会启动一些列task, 其中一个task会去定时拉取broker信息和topic路由信息.
代码片段如下:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.updateTopicRouteInfoFromNameServer();
        } catch (Exception e) {
            log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
        }
    }
}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);

发送

先贴一张时序图:


发送消息

消息发送的核心代码在DefaultMQProducerImpl中:

private SendResult sendDefaultImpl(//
    Message msg, //
    final CommunicationMode communicationMode, //
    final SendCallback sendCallback, //
    final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    //调用Id, 好像没啥实际作用, 打印在了日志中
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    //先去topicPublishInfoTable中查找路由信息, 
    //如果没有则调用updateTopicRouteInfoFromNameServer从name server获取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        //重试次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            //选取MessageQueue, 默认按照RoundRobin的方式, 注意如果发送失败需要重试, producer会优先选择同一个broker下的下一个队列
            //一直到该broker下的队列全部失败, 才会尝试其他broker上的队列
            MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (tmpmq != null) {
                mq = tmpmq;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    //发送消息
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                    endTimestamp = System.currentTimeMillis();
                    //延迟故障容错, 维护每个Broker的发送消息的延迟
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时, 进行重试
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { // 打印异常, 更新Broker可用性信息, 继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) { // 打印异常, 更新Broker可用性信息, 继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) { // 打印异常, 更新Broker可用性信息, 继续循环, 某些特殊情况, 直接返回
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                    }
                } catch (InterruptedException e) { 
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }

        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }

        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
            times,
            System.currentTimeMillis() - beginTimestampFirst,
            msg.getTopic(),
            Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
        // 根据不同情况, 抛出不同的异常
        MQClientException mqClientException = new MQClientException(info, exception);
        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }
    // Namesrv找不到异常
    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }
    // 消息路由找不到异常
    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

其中sendKernelImpl函数会把消息分装成RocketMQ规定的格式, 然后利用底层通信模块封装的函数将消息发送.出去.

4.总结

本文大致分析了一下producer的工作原理, 建立在对RocketMQ和通信模块的基础上分析, producer的工作原理并不复杂. 后文分析broker接收消息的工作原理.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容