RocketMQ实战

PS:我这里使用的是自定义的RoketMQ进行消息的发送和消费的,原理都差不多,万变不离其宗。


创建配置文件类

  • 首先创建RocketMqConfig、RocketMqProducerConfig、RocketMqConsumerConfig类,里面包含RocketMQ所需的所有配置,等到创建Consumer和Producer的时候可以一键配置,文中所有代码块均省略getter\setter方法

    public class RocketMqConfig {
      private String namesrvAddr = "127.0.0.1:9876";
      private RocketMqProducerConfig producerConfig = new RocketMqProducerConfig();
      private RocketMqConsumerConfig consumerConfig = new RocketMqConsumerConfig();
    }
    
    public class RocketMqProducerConfig {
      private String groupName = "producer";
      private String instanceName = "producer_instance";
      private String topic = "topic";
    }
    
    public class RocketMqConsumerConfig {
      //组名
      private String groupName = "consumer";
      //实例名
      private String instanceName = "consumer_instance";
      // 订阅主题和标签Map
      private Map<String, String> subscriptions = new HashMap<>();
      //设置批量消费,以提升消费吞吐量,默认是1
      private int consumeMessageBatchMaxSize = 1;
    }
    

创建Consumer和Producer

  • 创建RocketMqProducer、RocketMqConsumer类,里面包括构造方法(有参、无参)、start、stop等方法

    public class RocketMqProducer implements MqProducer {
    
      private DefaultMQProducer producer;
      private RocketMqConfig config;
      private boolean isStarted = false;
    
      public RocketMqProducer(RocketMqConfig config) {
          this.config = config;
          producer = new DefaultMQProducer(config.getProducerConfig().getGroupName());
          producer.setInstanceName(config.getProducerConfig().getInstanceName());
          producer.setVipChannelEnabled(false);
          producer.setNamesrvAddr(config.getNamesrvAddr());
      }
        public boolean isStarted() {
          return isStarted;
      }
        public void start() {
          producer.start();
          isStarted = true;
      }
        public void stop() {
          producer.shutdown();
          isStarted = false;
      }
        public MqSendResult send(String tag, AbstractMessage t) {
            t.setTopic(config.getProducerConfig().getTopic());
            t.setTag(tag);
            Message msg = new Message(t.getTopic(), 
                                      t.getTag(), 
                                      t.getKey(),
                                      t.getMessageType().getType(),
                                      t.getBody(),
                                      true);
            SendResult sendResult = producer.send(msg);
            t.setMessageId(sendResult.getMsgId());
    
            // 返回结果
            MqSendResult mqSendResult = new MqSendResult();
            mqSendResult.setSuccess(true);
            mqSendResult.setCode(sendResult.getSendStatus().toString());
            mqSendResult.setMsgId(sendResult.getMsgId());
            return mqSendResult;
        }
    }
    
    public class RocketMqConsumer implements MqConsumer {
      private MqMessageHandler handler = null;
      private DefaultMQPushConsumer consumer;
      private RocketMqConfig config;
      private boolean isStarted = false;
      public RocketMqConsumer(RocketMqConfig config) {
          this.config = config;
          consumer = new DefaultMQPushConsumer(config.getConsumerConfig().getGroupName());
          consumer.setInstanceName(config.getConsumerConfig().getInstanceName());
          consumer.setVipChannelEnabled(false);
          consumer.setConsumeMessageBatchMaxSize(config.getConsumerConfig().getConsumeMessageBatchMaxSize());
          consumer.setNamesrvAddr(config.getNamesrvAddr());
          // 从队列头开始消费
          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
      }
    
      @Override
      public boolean isStarted() {
          return isStarted;
      }
    
      @Override
      public void start() {
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext Context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getbody());
                    }
                    if (handler != null) {
                        handler.handle(t);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
    
          try {
              for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
                  consumer.subscribe(sub.getKey(), sub.getValue());
              }
              consumer.start();
              isStarted = true;
          } catch (Exception ex) {
              isStarted = false;
              throw new Exception(MqLibConstants.LIB_MQ_ROCKETMQ, MqLibExceptionEnum.MQ_CLIENT_EXCEPTION, ex);
          }
      }
    
      @Override
      public void stop() {
          for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
              consumer.unsubscribe(sub.getKey());
          }
          consumer.shutdown();
          isStarted = false;
      }
    
      @Override
      public void setHandler(MqMessageHandler handler){
          this.handler = handler;
      }
    }
    

消息发送与消费

  • 创建Producer类,在其中创建RocketMqConfig、RocketMqProducer对象;

    public class Producer {
        public static void main(String[] args) {
            RocketMqConfig config = new RocketMqConfig();
            MqProducerClient producer = new MqProducerClient(config);
            producer.start();//启动生产者
            JsonMessage jsonMessage = new JsonMessage();//自定义消息类
            jsonMessage.setData("JsonMessage!");
            producer.send("json", jsonMessage);
        }
    }
    
  • 创建Consumer类,在其中创建RocketMqConfig、RocketMqConsumer对象;

    public class Consumer {
        public static void main(String[] args) {
            RocketMqConfig config = new RocketMqConfig();
            Map<String,String> map = new HashMap();
            map.put(config.getProducerConfig().getTopic(),"json");
            config.getConsumerConfig().setSubscriptions(map);
            MqConsumerClient consumer = new MqConsumerClient(config);
            consumer.start();
        }
    }
    

启动两个类,同时别忘了启动RocketMQ的服务。可以在RocketMQ可视化界面看到生产者、消费者以及消息等信息

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • 1. Apache Kafka是一个分布式流平台 1.1 流平台有三个关键功能: 发布和订阅流记录,类似于一个消息...
    程序员日常填坑阅读 209评论 0 0
  • RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式...
    AI乔治阅读 2,051评论 2 5
  • RocketMQ4.X JMS Java消息服务(Java Message Service),Java平台中关于面...
    方穹轩阅读 695评论 0 1
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,798评论 4 54
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 123,945评论 2 7