RabbitMQ/安装/队列消息

当在很短的HTTP请求间需要执行复杂的任务时,队列的主要任务是:避免立刻执行资源密集型任务,这样的概念在web应用中极其有用。使用任务队列的另一个好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。

安装:
1.安装Erlang
配置环境变量 ERLANG_HOME C:\Program Files \erl8.0
添加到PATH %ERLANG_HOME%\bin;
2.安装RabbitMQ
默认安装的Rabbit MQ 监听端口是5672。使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器,已管理员身份在RabbitMQ的sbin目录下执行rabbitmq-plugins.bat" enable rabbitmq_management然后重启服务net stop RabbitMQ && net start RabbitMQ。访问http://localhost:15672 ,默认的登陆账号:guest,密码:guest。查看运行状态rabbitmqctl status
3.添加用户

  • 列出RabbitMQ的用户,rabbitmqctl.bat list_users
  • 添加用户,rabbitmqctl.bat add_user geffzhang zsy@2014
  • 设置标签,RabbitMQ支持一些有权限意义的标签,如 administrator,monitoring,policymaker,management。rabbitmqctl.bat set_user_tags admin administrator
  • 赋予权限rabbitmqctl.bat set_permissions -p / geffzhang ".*" ".*" ".*"

发送队列消息:
添加依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>${rabbitmq.version}</version>
</dependency>

发送队列消息:

package testrabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.TimeoutException;

/**
 * Created by zzhblh on 2016/8/28.
 */
public class Sender {
    //队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

接收队列消息:

package testrabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.util.concurrent.TimeoutException;

/**
 * Created by zzhblh on 2016/8/28.
 */
public class Recevicer {
    //队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }

    }
}

消息应答:
为了保证消息永远不会丢失,RabbitMQ也支持消息应答(message acknowledgments)。消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ进行信息删除。如果消费者被杀死而没有发送应答,RabbitMQ会重新转发给别的消费者。这种机制并没有超时时间这么一说,RabbitMQ只有在消费者连接断开是重新转发此信息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
public class Work  
{  
    //队列名称  
    private final static String QUEUE_NAME = "workqueue";  
  
    public static void main(String[] argv) throws java.io.IOException,  
            java.lang.InterruptedException  
    {  
        //区分不同工作进程的输出  
        int hashCode = Work.class.hashCode();  
        //创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //声明队列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(hashCode  
                + " [*] Waiting for messages. To exit press CTRL+C");  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消费队列  
        //关闭自动应答(分发后立即自动应答,不关心后续任务成功与否),打开应答机制
        boolean ack = false ; 
        channel.basicConsume(QUEUE_NAME, ack, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(hashCode + " [x] Received '" + message + "'");  
            //发送应答  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
            System.out.println(hashCode + " [x] Done"); 
  
        }  
  
    }  
}  

要确认消息, 或者拒绝消息, 使用对应的 basic_ack 或者 baskc_reject 方法。区别是 reject 方法 它只能处理一条消息,但是 Consuming 可以是一次性提取多条信息的,所以 RabbitMQ 为此做了扩展, 提供了 basic_nack 方法,支持一次性拒绝多条消息。在 reject 和 nack 中还有一个 requeue 参数, 表示被拒绝的消息是否可以被重新分配. 默认是 True . 如果消息被 reject 之后, 不希望再被其它的 Consuming 得到, 可以把 requeue 参数设置成 False。

消息持久化:
当RabbitMQ退出或者异常退出,将会丢失所有的队列和信息。我们需要做两件事来确保信息不会被丢失:我们需要给所有的队列和消息设置持久化的标志。

  • 声明队列为持久化的
    在Producer端设置:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
  • 声明信息为持久化的。
    在Producer端通过设置MessageProperties的值为PERSISTENT_TEXT_PLAIN:
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

分发机制:

  • Round-robin分发:
    默认的分发机制,RabbitMQ会一个一个的发送信息给下一个消费者,而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。如果有10个任务,3个消费者,则消费者A获得第1,4,7个任务,消费者A获得第2,5,8个任务,消费者A获得第3,6,8个任务。
  • 公平分发(Fair dispatch):
    有些情况下,默认分发机制(Round-robin)并非是我们想要的。我们可以使用basicQos方法,传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。换句话说,只有在消费者空闲的时候会发送下一条信息。在Comsumer端设置:channel.basicQos(prefetchCount)
    且这种模式下支持动态增加消费者,因为消息并没有发送出去,动态增加了消费者马上投入工作。而默认的转发机制会造成,即使动态增加了消费者,此时的消息已经分配完毕,无法立即加入工作,即使有很多未完成的任务。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容

  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,360评论 51 785
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,979评论 3 41
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,341评论 2 34
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,219评论 0 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139