消息队列之ActiveMQ(一)

一、浩言

我们要变的变态,才能吸引更多的变态。

二、背景

最近在看消息队列的东西,准备引入到项目中使用,一直说要自己先学会使用,在使用中学习。所以这算是新年的第一个周末就要加班,就在网上找资料自己尝试配置下。顺便纪录下经历了,都是有坑要去踩的。

三、ActiveMQ 在Windows下的使用

我修改了配置文件activemq.xml,将0.0.0.0改成了127.0.0.1

     <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://127.0.0.1:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
            <transportConnector name="amqp" uri="amqp://127.0.0.1:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
            <transportConnector name="stomp" uri="stomp://127.0.0.1:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
            <transportConnector name="mqtt" uri="mqtt://127.0.0.1:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
            <transportConnector name="ws" uri="ws://127.0.0.1:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600000"/>
        </transportConnectors>

注意点:我在测试的时候一直报1G要大于maxFrameSize的100M,所以我在这个后面加了三个0

然后启动启动active


Paste_Image.png
Paste_Image.png

我们在windows下使用这个命令查看端口netstat -ano linux下可以使用netstat -lntp查看


Paste_Image.png

在浏览器中输入:http://127.0.0.1:8161/admin/index.jsp会有如下显示

active-mq.png

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> 
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"></property>
        <!-- <property name="brokerURL" value="http://127.0.0.1:61616"></property> -->
        <property name="useAsyncSend" value="true"></property>
        <property name="alwaysSessionAsync" value="true"></property>
        <property name="useDedicatedTaskRunner" value="true"></property>
    </bean>

    <!-- 发送消息的目的地 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
              <!-- 消息队列的名字 -->
        <constructor-arg value="mahone.queue"/>
    </bean>
</beans> 

注意点:我在这里面看到的我有个注释的配置brokerURL,我刚刚开始的时候一直配置成http...,然后怎么测试都不通,最后才发现是自己这里配置错了。

Paste_Image.png

下面附上测试代码

package com.mouse.moon.test.mq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestMq {
    public static void sendWithAuto(ApplicationContext context) {  
        ActiveMQConnectionFactory factory = null;  
        Connection conn = null;  
        Destination destination = null;  
        Session session = null;  
        MessageProducer producer = null;  
        try {  
            destination = (Destination) context.getBean("destination");  
            factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory");  
            conn = factory.createConnection();  
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            producer = session.createProducer(destination);  
            TextMessage message = session.createTextMessage("...Hello JMS-4!");  
            
            
            producer.send(message);  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {//关闭生产者  
                producer.close();  
                producer = null;  
            } catch (Exception e) {
                
            }  
                try {//关闭session  
                    session.close();  
                    session = null;  
                }  
                catch (Exception e) {  
                    
                }  
                
                try {  //停止连接
                    conn.stop();  
                } catch (Exception e) {  
                    
                }  
            try {//关闭连接  
                conn.close();  
            } catch (Exception e) {  
            }  
        }  
    }  
    public static void main(String[] args) {  
        System.out.println("-------------start--------------");
        final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/xml/activemq.xml");  
        sendWithAuto(context);  
        System.out.println("-------------end---------------");
    }  
}
Paste_Image.png
Paste_Image.png

在做测试的时候还遇到如下问题

org.apache.activemq.ConfigurationException: There are 1 consumer options that couldn't be set on the consumer. Check the options are spelled correctly. Unknown parameters=[{perfetchSize=100}]. This consumer cannot be started.

这是在设置名字的时候附带的参数,我把这个去掉了就只能看到生产者的名字是"mahone.queue""
消费端代码:

package com.mouse.moon.test.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 
 * @author Mahone
 * @description:消费端
 *
 */
public class TestConsumer  extends Thread implements MessageListener{
    
    private Connection conn = null;
    
    private Destination destination = null ;
    
    private Session session = null;

    @Override
    public void onMessage(Message message) {
        try{
            TextMessage tm = (TextMessage)message;
            System.out.println("receive message:"+tm.getText());
        }catch(Exception e){
            e.printStackTrace();
        }
        
    }
    
    @Override
    public void run(){
        receive();
    }
    
    public void receive(){
        ConnectionFactory factory = null;
        Connection conn = null ;
        
        try{
            final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/xml/activemq.xml");
            factory = (ActiveMQConnectionFactory)context.getBean("targetConnectionFactory");
            conn = factory.createConnection();
            conn.start();
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = (Destination)context.getBean("destination");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String args[]){
        TestConsumer mythread = new TestConsumer();
        mythread.start();
    }
}

Paste_Image.png

上述代码有参考该<a href='http://blog.csdn.net/lifetragedy/article/details/51836557'>博客</a>

四、消息队列的作用

1:解耦操作
例如说其中A更新好了之后,其他地方(B,C,D)也需要更新,那么使用消息队列之后,A更新后,将更新通知放入消息队列中,B,C,D中需要更新只需要订阅这个操作就好,实现解耦。

2、实现异步操作
例如上面说的,A更新后,其他的更新都是异步进行处理,并不需要同步操作。

3、对流量进行控制
有时候访问多了,流量就大了,忽高忽低的,如果放入队列中,按照顺序进行操作,即是先存起来,然后在按照顺序进行发送。

五:浩语

                                 __                                                     
                  __  _  ____ __|  |__ _____    ___
                  \ \/ \/ /  |  \  |  \\__  \  /  _ \   
                   \     /|  |  /   Y  \/ __ \(  <_> )
                    \/\_/ |____/|___|  (____  /\____/ 
                                          \/     \/          
                       任何事情都是要靠努力和用心。                                                   
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,790评论 8 167
  • 愿你有高跟鞋也有跑鞋,喝茶也能喝点小酒。愿你有勇敢的朋友,有牛逼的对手。愿你对过往的一切情深意重,但从不回头。愿你...
    Tina是个妖怪小姐姐阅读 199评论 0 1
  • 我是你的伞 愿为你遮挡炙烈的太阳 清凉的依恋,炙烈的爱 你的笑在阳光下最灿烂 在雨季来临的时候 我缠绵在你的身旁 ...
    淘猴侯孙行阅读 557评论 3 5
  • 熊洲游敦煌
    煜瑶阅读 112评论 0 0