Java高级技术day83:ActiveMQ

一、ActiveMQ的安装

1.什么是ActiveMQ?

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个 完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久 的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

1.1什么是消息?

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串; 也可以更复杂,可能包含嵌入对象。

1.2什么是队列?

队列的特点是先进先出。

示例
  • 什么是消息队列?

“消息队列”是在消息的传输过程中保存消息的容器。

1.3常用的消息服务应用:

(1)ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完 全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。

(2)RabbitMQ

RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public License 开源协议。开发语言为 Erlang。

(3)RocketMQ

由阿里巴巴定义开发的一套消息队列应用服务。

2.消息服务的应用场景:

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使 用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同 时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系, 也不需要受对方的影响,即解耦和。
参考理解

示例
  • 异步处理:

用户注册流程:
(1)注册处理以及写数据库
(2)发送注册成功的手机短信
(3)发送注册成功的邮件信息

如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件, 然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应 的业务操作。

  • 订单处理(解耦):

生成订单流程:
(1)在购物车中点击结算
(2)完成支付
(3)创建订单
(4)调用库存系统

订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一 个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能 够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非 常快的,所以有消息中间件对解耦来说也是一个不错的方向。

  • 秒杀功能 ( 流量的削峰 ):

秒杀流程:
(1)用户点击秒杀
(2)发送请求到秒杀应用
(3)在请求秒杀应用之前将请求放入到消息队列
(4)秒杀应用从消息队列中获取请求并处理。
比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么 办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100 个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死。

3.JMS

3.1什么是JMS?

JMS(Java Messag Service)是 Java 平台上有关面向消息中间件的技术规范,它便于 消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接 口,简化企业应用的开发。

3.2JMS 模型:
  • 点对点模型(Point To Point):

生产者发送一条消息到 queue,只有一个消费者能收到。

示例
  • 发布订阅模型(Publish/Subscribe) :

发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。

示例

4.安装ActiveMQ:

4.1下载资源:

ActiveMQ 官网: http://activemq.apache.org ;

示例
  • 注意:

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。 ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。

4.2上传并解压:

将下载的资源上传到Linux服务器并解压;解压命令“tar -zxf apache-activemq-5.9.0-bin.tar.gz ”;
将文件拷贝到指定目录:“ cp apache-activemq-5.9.0 /usr/local/activemq -r ”;

4.3检查权限:

ls -al apache-activemq-5.9.0/bin 如果权限不足,则无法执行,需要修改文件权限: chmod 755 activemq ;

  • 启动ActiveMQ:

/usr/local/activemq/bin/activemq start ;

  • 检查进程查看是否启动成功:

ps aux | grep activemq ;下面表示启动成功。

image.png
  • 管理界面:

使用浏览器访问 ActiveMQ 管理应用, 地址如下: http://ip:8161/admin/ ;用户名:admin;密码admin。

示例
4.4修改访问端口:

修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml ;
配置文件修改完毕,保存并重新启动 ActiveMQ 服务。

示例
  • 修改用户名和密码:

修改 conf/users.properties 配置文件.内容为: 用户名=密码;
保存并重启 ActiveMQ 服务即可。

  • 重启和关闭ActiveMQ:

/usr/local/activemq/bin/activemq restart ;
/usr/local/activemq/bin/activemq stop ;

  • 配置文件activemq.xml :

配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服务时使用的配置. 可以修改 启动的访问端口. 即 java 编程中访问 ActiveMQ 的访问端口. 默认端口为 61616;
使用协议是: tcp 协议;
修改端口后, 保存并重启 ActiveMQ 服务即可。

4.5ActiveMQ目录介绍:

(1)bin 存放的是脚本文件;
(2)conf 存放的是基本配置文件;
(3)data 存放的是日志文件;
(4)docs 存放的是说明文档 ;
(5)examples 存放的是简单的实例;
(6)lib 存放的是 activemq 所需 jar 包;
(7)webapps 用于存放项目的目录 ;

示例

5.ActiveMQ 术语 :

(1)Destination :
目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。 MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination 才能接收消息。
(2)Producer :
消息生成者,负责发送 Message 到目的地。
(3)Consumer | Receiver :
消息消费者,负责从目的地中消费【处理|监听|订阅】Message。
(4)Message:
消息,消息封装一次通信的内容。

二、ActiveMQ应用:

1.ActiveMQ 常用 API 简介 :

API 都是接口类型,由定义在 javax.jms 包中;
是 JMS 标准接口定义。

(1)ConnectionFactory:

链接工厂, 用于创建链接的工厂类型;

(2)Connection :

链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建。

(3)Session

会话, 一次持久有效有状态的访问. 由链接创建。

(4)Destination & Queue

目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队 列. 由会话创建. interface Queue extends Destination

(5)MessageProducer

消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建。

(6)MessageConsumer

消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中 获取消息的工具. 由会话创建。

(7)Message

消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者 从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】 具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取。

2.使用ActiveMQ处理文本消息:

2.1创建消息生产者:
创建项目.png
  • 修改POM文件添加ActiveMQ坐标:
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>
2.2编写消息的生产者:
/**
 * 消息的生产者
 * 
 * @author zhang
 *
 */
public class HelloProducer {

    public static void testProducer(String msg) {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息发送者
        MessageProducer producer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createQueue("helloword-destination");
            // 创建消息生产者
            producer = session.createProducer(des);
            // 创建消息对象
            message = session.createTextMessage(msg);
            // 发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
2.3创建消息消费者
创建工程
  • 修改 POM 文件添加 ActiveMQ 坐标:
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>
2.4编写消息的消费者:
/**
 * 消息的消费者
 * @author zhang
 *
 */
public class HelloConsumer {
    public static void testConsumer() {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息消费者
        MessageConsumer consumer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createQueue("helloword-destination");
            // 创建消息生产者
            consumer = session.createConsumer(des);
            // 创建消息对象
            message = consumer.receive();
            //处理消息
            String text = ((TextMessage)message).getText();
            System.out.println("从ActiveMQ服务中获取的文本信息"+text);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3.处理对象消息:

3.1定义消费对象(生成getter和setter):
    private int userid;
    private String username;
    private int userage;
3.2创建消息的生产者:
/**
 * 消息的生产者
 * @author zhang
 *
 */
public class HelloProducer2 {

    public static void testProducer(User user) {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息发送者
        MessageProducer producer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createQueue("helloword-destination");
            // 创建消息生产者
            producer = session.createProducer(des);
            // 创建消息对象
            message = session.createObjectMessage(user);
            // 发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

3.2创建消息的消费者:
/**
 * 消息的消费者
 * @author zhang
 *
 */
public class HelloConsumer2 {
    public static void testConsumer() {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息消费者
        MessageConsumer consumer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createQueue("helloword-destination");
            // 创建消息生产者
            consumer = session.createConsumer(des);
            // 获取消息对象
            message = consumer.receive();
            //处理消息
            ObjectMessage objectMessage = (ObjectMessage)message;
            User user = (User)objectMessage.getObject();
            System.out.println("从ActiveMQ服务中获取的文本信息"+user);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 实现:


    示例

4.实现队列服务监听:

4.1创建消息生产者:
public class HelloProducer3 {

    public static void testProducer(String msg) {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息发送者
        MessageProducer producer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createQueue("02-message");
            // 创建消息生产者
            producer = session.createProducer(des);
            // 创建消息对象
            message = session.createTextMessage(msg);
            // 发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
4.2创建消息的消费者:
public class HelloConsumer3 {
    public static void testConsumer() {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息消费者
        MessageConsumer consumer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createQueue("02-message");
            // 创建消息消费者
            consumer = session.createConsumer(des);
            // 创建消息消费者对象
            consumer.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    System.out.println(message);
                    // 处理消息
                    String text = null;
                    try {
                        text = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println("从ActiveMQ服务中获取的文本信息:" + text);
                }
            });

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
  • 实现:


    示例

5.Topic 模型 :

Publish/Subscribe 处理模式(Topic)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消 息。 和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。 当生产者发布消息,不管是否有消费者。都不会保存消息 一定要先有消息的消费者,后有消息的生产者。

示例
5.1创建生产者:
public class HelloProducerTopic {

    public static void testProducer(String msg) {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息发送者
        MessageProducer producer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createTopic("topic");
            // 创建消息生产者
            producer = session.createProducer(des);
            // 创建消息对象
            message = session.createTextMessage(msg);
            // 发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 创建消费者:

创建多个消费者;代码相同;

public class HelloConsumerTopic implements Runnable{
    
    public void testConsumer() {
        // 定义工厂
        ConnectionFactory factory = null;
        // 定义连接对象
        Connection conn = null;
        // 定义会话
        Session session = null;
        // 目的地
        Destination des = null;
        // 定义消息消费者
        MessageConsumer consumer = null;
        // 定义消息
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
            // 创建连接对象
            conn = factory.createConnection();
            // 启动连接
            conn.start();
            /*
             * transacted:是否使用事务 可选值为: true|false true:使用事务 当设置次变量 值。
             * Session.SESSION_TRANSACTED false:不适用事务,设置次变量 则 acknowledgeMode 参数必须设置
             * acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自动消息确认机制
             * Session.CLIENT_ACKNOWLEDGE:客户端确认 机制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制
             */
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地
            des = session.createTopic("topic");
            // 创建消息消费者
            consumer = session.createConsumer(des);
            // 创建消息消费者对象
            consumer.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    System.out.println(message);
                    // 处理消息
                    String text = null;
                    try {
                        text = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println("从ActiveMQ服务中获取的文本信息Topic1:" + text);
                }
            });

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        testConsumer();
    }
}

6.测试:

6.1生产者测试:
public class Test {
    public static void main(String[] args) {
//      HelloProducer.testProducer("Hello");
//      User user = new User();
//      user.setUserid(01);
//      user.setUsername("张三");
//      user.setUserage(20);
//      HelloProducer2.testProducer(user);
//      HelloProducer3.testProducer("Dave");
        HelloProducerTopic.testProducer("Dave");
    }
}
  • 实现效果:


    先启动消费者
6.2消费者测试:
public class Test {
    public static void main(String[] args) {
//      HelloConsumer.testConsumer();
//      HelloConsumer2.testConsumer();
//      HelloConsumer3.testConsumer();
        HelloConsumerTopic topic1 = new HelloConsumerTopic();
        Thread thread = new Thread(topic1);
        thread.start();
        HelloConsumerTopic2 topic2 = new HelloConsumerTopic2();
        Thread thread2 = new Thread(topic2);
        thread2.start();
        HelloConsumerTopic3 topic3 = new HelloConsumerTopic3();
        Thread thread3 = new Thread(topic3);
        thread3.start();
    }
}

三、spring整合ActiveMQ

1.创建生产者项目:

  • 修改POM文件:
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.zlw</groupId>
        <artifactId>11-parent</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <groupId>com.zlw</groupId>
    <artifactId>11spring-actviemq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>
    <dependencies>
        <!-- ActiveMQ客户端完整jar包依赖 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>
        <!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!-- Spring-JMS插件相关jar包依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志处理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
        </dependency>

        <!-- JSP相关 -->
        <dependency>
            <groupId>jstl</groupId>
            <artifactId>jstl</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jsp-api</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 配置Tomcat插件 -->
            <plugin>
                <groupId>org.apache.tomcat.maven</groupId>
                <artifactId>tomcat7-maven-plugin</artifactId>
                <configuration>
                    <path>/</path>
                    <port>8080</port>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
1.1整合项目:
  • applicationContext-service.xml:
<!-- 扫描bean对象 -->
        <context:component-scan base-package="com.zlw.service"/>
  • springmvc:
            <!-- 扫描@Controller -->
            <context:component-scan base-package="com.zlw.web.controller"></context:component-scan>
            
            <!-- 注册两个新对象 主要是为了来处理springmvc 中的其他 anntation 如:@requestmapping  -->
            <mvc:annotation-driven></mvc:annotation-driven>
            <!-- 视图解析器 -->
            <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
                <property name="prefix" value="/WEB-INF/jsp/"></property>
                <property name="suffix" value=".jsp"></property>
            </bean>
            
            <!-- 配置静态资源映射 -->
            <mvc:resources location="/WEB-INF/css/" mapping="/css/**"/>
            <mvc:resources location="/WEB-INF/js/" mapping="/js/**"/>
  • web.xml:
    <!-- 上下文参数,告诉spring配置文件路径 -->
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:applicationContext-*.xml</param-value>
    </context-param>

    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>

    <!-- 配置springmvc -->
    <servlet>
        <servlet-name>springmvc</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:springmvc.xml</param-value>
        </init-param>

        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>springmvc</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

    <filter>
        <filter-name>encoding</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>

        <init-param>
            <param-name>encoding</param-name>
            <param-value>utf-8</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encoding</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
  • 整合ActiveMQ(applicationContext-jms):
    <!-- 需要创建一个连接工厂,连接 ActiveMQ. ActiveMQConnectionFactory. 需要依赖 ActiveMQ 提供的 amq 标签 -->
    <!-- amq:connectionFactory 是 bean 标签的子标签, 会在 spring 容器中创建一个 bean 对象. 可以为对象命名. 
         类似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
    <amq:connectionFactory
        brokerURL="tcp://192.168.226.130:61616" userName="admin"
        password="admin" id="amqConnectionFactory" />
    <!-- spring 管理 JMS 相关代码的时候,必须依赖 jms 标 签库. spring-jms 提供的标签库 -->
    <!-- 定义 Spring-JMS 中的连接工厂对象 CachingConnectionFactory - spring 框架提供的 连接工厂对象. 
        不能真正的访问 MOM 容器. 类似一个工厂的代理对象. 需要提供一个真实工 厂,实现 MOM 容器的连接访问. -->
    <bean id="pooledConnectionFactory"
        class="org.apache.activemq.pool.PooledConnectionFactoryBean">
        <property name="connectionFactory" ref="amqConnectionFactory"></property>
        <property name="maxConnections" value="10"></property>
    </bean>

    <!-- 配置有缓存的 ConnectionFactory,session 的 缓存大小可定制。 -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
        <property name="sessionCacheSize" value="3"></property>
    </bean>

    <!-- JmsTemplate 配置 -->
    <bean id="template" class="org.springframework.jms.core.JmsTemplate">
        <!-- 给定连接工厂, 必须是 spring 创建的连接工 厂. -->
        <property name="connectionFactory" ref="connectionFactory"></property>
        <!-- 可选 - 默认目的地命名 -->
        <property name="defaultDestinationName" value="test-spring"></property>
    </bean>
1.2定义对象:
    private int userid;
    private String username;
    private String usermail;
    private int userage;
1.3创建发送消息的service:
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void addUser(Users user) {
        // 发送消息
        jmsTemplate.send(new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                Message message = session.createObjectMessage(user);
                return message;
            }
        });
    }
}
1.4创建Controller:
@Controller
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @RequestMapping("addUser")
    public String userAdd(Users users) {
        userService.addUser(users);
        return "ok";
    }
}
  • 添加JSP页面:
<body>
    <form action="/user/addUser" method="post">
        <p>
            ID:<input type="text" name="userid" />
        </p>
        <p>
            姓名:<input type="text" name="username" />
        </p>
        <p>
            年龄:<input type="text" name="userage" />
        </p>
        <p>
            邮箱:<input type="text" name="usermail" />
        </p>
        <p>
            <input type="submit" value="提交" />
        </p>
    </form>
</body>

2.创建消费者项目:

2.1修改POM文件:
    <dependencies>
        <!-- activemq 客户端 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>
        <!-- spring 框架对 JMS 标准的支持 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <!-- ActiveMQ 和 spring 整合的插件 -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>
    </dependencies>
2.2创建接受消息的service:
@Service
public class UserServiceImpl implements UserService {

    @Override
    public void showUser(Users user) {
        System.out.println(user);
    }
}
2.3创建Listener处理消息:
@Component(value = "myListener")
public class MyMessageListener implements MessageListener{
    @Autowired
    private UserService userService;

    @Override
    public void onMessage(Message message) {
       //处理消息
        ObjectMessage objectMessage = (ObjectMessage)message;
        Users user = null;
        try {
            user = (Users) objectMessage.getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        this.userService.showUser(user);
    }
}
2.4测试:
public class Test {
    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext cpa = new ClassPathXmlApplicationContext(new String[] {"classpath:applicationContext-jms.xml","classpath:applicationContext-service.xml"});
        cpa.start();
        System.err.println("spring容器启动!!!");
        System.in.read();
    }
}

示例
示例

4.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • 个人专题目录[https://www.jianshu.com/p/140e2a59db2c] 一、JMS简介 全称...
    Java及SpringBoot阅读 2,075评论 0 10
  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,479评论 0 11
  • 一、ActiveMQ简介 1.什么是ActiveMQ ActiveMQ是Apache出品,最流行的,能力强劲的开源...
    青年心路阅读 1,561评论 0 0
  • 简介 ActiveMQ 特点 ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高...
    预流阅读 5,904评论 4 21
  • 出图书馆 太阳很大 风也很大 我站在台阶上 打了两个喷嚏 世界明亮又干燥 恍惚里 还以为 现在才是三月份初春
    见微知阅读 112评论 0 0