ActiveMQ—java编码实现ActiveMQ通讯(queue队列)

一、JMS总体架构

JMS总体架构.png

二、JMS开发基本步骤:

  1. 创建一个connection factory

  2. 通过connection factory来创建 JMS connection

  3. 启动JMS connection (注意勿忘:一定要启动,否则无法收到消息。)

  4. 通过connection 创建JMS session

  5. 创建JMS destination (queue 或者 Topic)

  6. 创建JMS producer 或者创建JMS message并设置destination

  7. 创建JMS consumer或者注册一个JMS message listener

  8. 发送或者接收JMS message(s)

  9. 关闭所有JMS资源(connection、session、producer、consumer等)

三、创建maven工程引入jar包

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.10.0</version>
</dependency>

四、编写消息生产者(producer)

package com.apesbook.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProducer {

    public static final String BROKER_URL = "tcp://192.168.5.159:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3.创建会话session
        // 两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6.通过使用messageProducer生产3条消息发送到MQ队列里面
        for (int i = 1; i <= 6; i++) {
            // 7.创建消息
            TextMessage textMessage = session.createTextMessage("msg----"+i);
            // 通过messageProducer发送消息
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到MQ完成");
    }
}

五、编写消息消费者(consumer)

  1. 同步阻塞方式(receive())
package com.apesbook.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

/**
 * Description:
 * Author:XCK
 * Date:2019/8/30
 */
public class JmsConsumer_receive {

    public static final String BROKER_URL = "tcp://192.168.5.159:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException, InterruptedException {
        System.out.println("我是1号消费者");
        // 1. 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2.通过工厂创建connection,并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3. 通过工厂创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.通过session创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /**
         * 同步阻塞方式(receive())
         * 订阅者或接收者调用MessageConsumer的receive()方法来接收消息,receive()方法在能够接收到消息之前(或超时之前)将一直阻塞。
         */
        while (true) {
            // 1.等待接收消息,可以设置等待超时时间(过期不候)messageConsumer.receive(4000L)
            TextMessage textMessage = (TextMessage) messageConsumer.receive();

            if (textMessage != null) {
                if (textMessage.getText().equals("msg----3")){
                    Thread.sleep(5000);
                }
                System.out.println("****消费者接收到消息:" + textMessage.getText());
            } else {
              break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

  1. 异步非阻塞方式(监听器 onMessage()):
package com.apesbook.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

/**
 * Description: http://192.168.5.159:8161/admin/queues.jsp
 * Author:XCK
 * Date:2019/8/31
 */
public class JmsConsumer_messageListener {

    public static final String BROKER_URL = "tcp://192.168.5.159:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("我是2号消费者");
        // 1.创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2.通过连接工厂创建connection,并启动、并启动、并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();// 启动(这一步非常关键,千万别忘记)

        // 3.通过connection创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5.创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /**
         * 通过监听的方式消费消息,
         * 异步非阻塞方式(监听器 onMessage())
         * 订阅者或接收者调用 MessageConsumer的 setMessageListener(MessageListener messageListener) 注册一个消息监听器,
         * 当消息到达之后,系统自动调用监听器 MessageListener 的 onMessage(Message message) 方法
         */
        messageConsumer.setMessageListener(new MessageListener()
        {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("***消费者监听收到消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 让程序一直保持运行
        System.in.read();
        // 关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

六、案例分析

案例分析:
1.先生产消息,再启动1号消费者。
问题:1号消费者能接收到消息吗?
答案:可以。

2.先生产消息,先启动1号消费者,再启动2号消费者。
问题:2号消费者还能消费到消息吗?
答案:1号可以消费到消息
2号不可以消费到消息

3.先启动两个消费者,再生产6条消息,
问题:请问 消费情况如何?
答案:一人一半

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容