RocketMQ学习

RocketMQ 学习

安装部署**

  1. 下载rocketmq解压:下载地址

  2. 启动nameserver,进入bin目录下执行

    # -n 参数指定nameserver的访问地址与端口号,&后台启动
    sh mqnamesrv -n host:port &
    #备注:在启动过程中可能会因为无法分配内存启动失败,打开bin目录下的runserver.sh修改启动参数
    
  3. 启动broker,进入bin目录下执行

    # -n 指定nameserver的访问地址, -c 指定broker的配置文件
    sh mqbroker -n host:port -c ../conf/broker.conf autoCreateTopicEnable=true &
    #备注:修改broker.conf,添加 brokerIP1 = host 否则外网访问可能会报错
    
  4. 开放访问端口号

    开放nameservr访问端口 9876
    开放broker访问端口 10909 10911 10912
    备注:当时因为少开放了一个10912端口导致程序一直报错,错误提示忘了是啥啦
    

DefaultMQProducer属性配置

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    /**
     * 默认生产者,
     */
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

    /**
     * 生产着组别,同组别的所有生产着实例作用相同
     */
    private String producerGroup;

    /**
     * 不指定topic时的默认topic
     */
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

    /**
     * 默认topic队列的大小
     */
    private volatile int defaultTopicQueueNums = 4;

    /**
     * 发送消息超时时间
     */
    private int sendMsgTimeout = 3000;

    /**
     * 对消息进行压缩的阀值
     */
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    /**
     * 同步模式下发送消息失败的重试次数
     */
    private int retryTimesWhenSendFailed = 2;

    /**
     * 异步模式下发送消息失败的重试次数
     */
    private int retryTimesWhenSendAsyncFailed = 2;

    /**
     * 当消息发送失败时用其他的broker进行重试
     */
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    /**
     * 最大允许的消息大小
     */
    private int maxMessageSize = 1024 * 1024 * 4; // 4M
}

DefaultMQPushConsumer属性配置

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    /**
     * 默认消费者
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

    /**
     * 消费者组别
     */
    private String consumerGroup;

    /**
     * 消费模式,默认集群消费,rocketmq支持集群消费和广播消费
     */
    private MessageModel messageModel = MessageModel.CLUSTERING;

    /**
     * 消费者开始消费的位置,默认从消费者停止之前的最后一个offset处开始消费
     */
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

    /**
     * 回溯消费时间,默认回溯消费半小时前的数据
     */
    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));

    /**
     * 消费者消费消息时的消息队列分配策略
     */
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

    /**
     * Subscription relationship
     */
    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

    /**
     * 消息监听器
     */
    private MessageListener messageListener;

    /**
     * Offset Storage
     */
    private OffsetStore offsetStore;

    /**
     * 最小消费者线程数量
     */
    private int consumeThreadMin = 20;

    /**
     * 最大消费者线程数量
     */
    private int consumeThreadMax = 64;

    /**
     * 动态调整消费者线程的消息阀值
     */
    private long adjustThreadPoolNumsThreshold = 100000;

    /**
     * Concurrently max span offset.it has no effect on sequential consumption
     */
    private int consumeConcurrentlyMaxSpan = 2000;

    /**
     * 消息队列流量控制阀值
     */
    private int pullThresholdForQueue = 1000;

    private int pullThresholdSizeForQueue = 100;

    private int pullThresholdForTopic = -1;

    private int pullThresholdSizeForTopic = -1;

    /**
     * Message pull Interval
     */
    private long pullInterval = 0;

    /**
     * 批量消费的消息数量
     */
    private int consumeMessageBatchMaxSize = 1;

    /**
     * 批量拉取的消息数量
     */
    private int pullBatchSize = 32;

    private boolean postSubscriptionWhenPull = false;

    private boolean unitMode = false;

    /**
     * 消费者消费消息失败后的最大重试测试,-1表示重试16次
     */
    private int maxReconsumeTimes = -1;

    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * 消息消费的超时时间
     */
    private long consumeTimeout = 15;

使用案例

// TODO: 2019-06-05  
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345