ActiveMQ

ActiveMQ 使用版本: 5.13.5
下载地址:
链接:https://pan.baidu.com/s/1jIvN0gu 密码:am2r

jdk 版本1.8
Mac版下载地址:
链接:https://pan.baidu.com/s/1gfnLWAZ 密码:tj6t

启动mq步骤:

  1. 首先进到mq 的bin 文件夹下
    cd + 文件路径/bin
  2. 启动 activemq
    ./activemq start
启动mq.png
  1. 在启动时可能会报错, 应该是没有执行权限
    使用 ls -l 命令查看权限
    修改权限命令自行百度
查看权限.png
  1. 验证是否启动成功:
    http://127.0.0.1:8161/admin/queues.jsp
    账号和密码一般都是admin
    image.png

编写mq代码

创建maven项目

pom文件 添加依赖:

    <dependencies>
        <!-- spring核心配置-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <!--spring test 结合 junit 进行测试-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>3.2.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
    </dependencies>

点对点消息模式

如果有两个消费者同时开启, 两个消费者都会收到部分数据, 加起来的数据为一份. 因此要保证消息目的地唯一.

  1. 创建生产者:
    // 获取默认的用户名, 密码, 地址
    private static final String UserName = ActiveMQConnection.DEFAULT_USER;
    private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        // 创建连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory(UserName, password, url);
        try {
            // 创建连接
            Connection connection = factory.createConnection();
            // 启动连接
            connection.start();

            // 是否支持事务,如果为true,则第二个参数被设置为SESSION_TRANSACTED
            //Session.AUTO_ACKNOWLEDGE为自动确认,不管成功失败
            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端调用acknowledge方法时服务器删除消息
            //DUPS_OK_ACKNOWLEDGE允许重复确认模式。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地
            Destination destination = session.createQueue("FirstQueue");

            // 消费生产者
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < 5; i++) {
                // 创建文本消息
                TextMessage message = session.createTextMessage("message" + i);
                producer.send(message);
            }
            // 如果创建session时为true, 则放开下面语句
//            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  1. 创建消费者
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
        try {
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 点对点模式
            Destination destination = session.createQueue("test1");
            // 创建消费者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true){
                TextMessage message = (TextMessage) consumer.receive();
                if (message!=null){
                    System.out.println("收到的信息为: "+ message.getText());
                }else {
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

订阅模式

与点对点模式代码雷同, 只写不同部分
只是生产者和消费者中的消息目的地创建的方式不同

    // 创建消息目的地时将createQueue("name") 换成createTopic("name")
    Destination destination = session.createTopic("test1");

持久化订阅

生产者模块代码不变, 消费者添加身份识别
消费者代码修改如下:

 ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
        try {
            Connection connection = factory.createConnection();

            // 持久化订阅时添加
            connection.setClientID("bbb");
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 持久化订阅时添加
            Topic topic = session.createTopic("test1");
            MessageConsumer consumer = session.createDurableSubscriber(topic, "bbb");

            while (true){
                TextMessage message = (TextMessage) consumer.receive();
                if (message!=null){
                    System.out.println("收到的信息为: "+ message.getText());
                }else {
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }

注:该模式下一定要先注册消费者, 然后再发送消息

消息过滤

  1. 生产者
// 设置超时时间
producer.setTimeToLive(10 * 1000);
MapMessage message1 = session.createMapMessage();
message1.setString("name", "laowei");
message1.setIntProperty("age", 19);

MapMessage message2 = session.createMapMessage();
message2.setString("name", "xiaowang");
message2.setIntProperty("age", 10);

/**
 *message : 发送的消息
 * DeliveryMode: 是否持久化
 * priority优先级
 * timeToLive 消息过期时间
 */
producer.send(message1, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
producer.send(message2, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
  1. 消费者:
String condition = "age>=20";
Destination destination = session.createTopic("test1");
MessageConsumer consumer = session.createConsumer(destination, condition);

activeMQ 与spring 相结合

配置:

  1. activemq.properties
## ActiveMQ Config
activemq.brokerURL=tcp\://127.0.0.1\:61616
activemq.userName=admin
activemq.password=admin
activemq.pool.maxConnections=10
#queueName
activemq.queueName=myspringqueue
  1. spring-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd"
       default-autowire="byName">
       <!-- 读入配置属性文件 -->
       <context:property-placeholder location="classpath:activemq.properties" />
       <!-- 注释配置 -->
       <context:annotation-config />
       <!-- 扫描包起始位置 -->
       <context:component-scan base-package="com.laowei.springmq" />
       <import resource="classpath:spring-activemq.xml" />
</beans>
  1. sping-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
       <!-- 第三方MQ工厂: ConnectionFactory -->
       <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
              <!-- ActiveMQ Address -->
              <property name="brokerURL" value="${activemq.brokerURL}" />
              <property name="userName" value="${activemq.userName}"></property>
              <property name="password" value="${activemq.password}"></property>
       </bean>

       <!--
           ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
           可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包
        -->
       <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
              <property name="connectionFactory" ref="targetConnectionFactory" />
              <property name="maxConnections" value="${activemq.pool.maxConnections}" />
       </bean>
       <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
       <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
              <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
              <property name="targetConnectionFactory" ref="pooledConnectionFactory" />
       </bean>

       <!--这个是目的地-->
       <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg>
                     <value>${activemq.queueName}</value>
              </constructor-arg>
       </bean>
       <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
       <!-- 队列模板 -->
       <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
              <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
              <property name="connectionFactory" ref="connectionFactory"/>
              <property name="defaultDestinationName" value="${activemq.queueName}"></property>
       </bean>
       <!-- 配置自定义监听:MessageListener -->
       <bean id="msgQueueMessageListener" class="com.laowei.springmq.Consumer"></bean>

       <!-- 将连接工厂、目标对了、自定义监听注入jms模板 -->
       <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="destination" ref="msgQueue" />
              <property name="messageListener" ref="msgQueueMessageListener" />
       </bean>
</beans>
  1. 生产者代码:
@Service("activeMQProducer")
public class Product {
    private JmsTemplate jmsTemplate;

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessage(final String info){
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(info);
            }
        });
    }
}
  1. 消费者
public class Consumer implements SessionAwareMessageListener<Message> {
    public void onMessage(Message message, Session session) throws JMSException {
        if (message instanceof TextMessage){
            System.out.println("*******" +((TextMessage) message).getText());
        }
    }
}
  1. 测试代码:
public class TestProduct extends BaseJunit4Test{
    @Autowired
    private Product product;
    @Test
    public void sendmessage() throws InterruptedException {
        while (true){
            Thread.sleep(3000);
            product.sendMessage("hahaha");
        }
    }
}

代码地址:
https://github.com/weijun8687/ActiveMQ.git

参考文章:
http://blog.csdn.net/fulai0_0/article/details/52127320
http://www.mytju.com/classcode/news_readNews.asp?newsID=486

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容