rabbitmq入门

什么是rabbitmq?

RabbitMQ.png

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端

rabbitmq.png

第一种方式:点对点

点对点.png
  • P:生成者
  • C:消费者
  • 红色方块代表信道

生成者

void producerSendMessing() {
        //生产者代码
        //创建一个rabbitmq的连接工厂
        ConnectionFactory connectionFactory =new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setConnectionTimeout(1000);
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        try {
            //开启一个server连接
            Connection connection = connectionFactory.newConnection();
            //server连接创建一个信道
            Channel channel = connection.createChannel();
            //通过信道绑定一个队列,两个重载方法
            /*
            *  @param queue :队列的名称
             * @param durable 是否持久化,
             * @param exclusive 是否独占这个队列
             * @param autoDelete 是否消费之后,删除队列
             * @param arguments 其他参数设置,是一个map
            * */
            channel.queueDeclare("hello",true,false,false,null);
            /**
             * 三个重载方法,
             *
             * @param exchange 交换器
             * @param routingKey 路由密钥
             * @param支持消息的其他属性-路由标头等
             * @param正文消息正文
             */
            channel.basicPublish("","hello",null,"hello world!".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

消费者

void consumerReceivingMessing() {
        //生产者代码
        //创建一个rabbitmq的连接工厂
        ConnectionFactory connectionFactory =new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setConnectionTimeout(1000);
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        try {
            //开启一个server连接
            Connection connection = connectionFactory.newConnection();
            //server连接创建一个信道
            Channel channel = connection.createChannel();
            //通过信道绑定一个队列,两个重载方法
            /*
             *  @param queue :队列的名称
             * @param durable 是否持久化,
             * @param exclusive 是否独占这个队列
             * @param autoDelete 是否消费之后,删除队列
             * @param arguments 其他参数设置,是一个map
             * */
            channel.queueDeclare("hello",true,false,false,null);
            /**
             * @param queue队列名称
             * @param autoAck如果服务器应考虑消息,则为true。自动确认机制
             *交付后确认; 如果服务器应该期望,则返回false
             *明确的确认
             * @param回调用户对象的接口Consumer,默认实现类DefaultConsumer,需要传入信道
             */
            String hello = channel.basicConsume("hello", true,new DefaultConsumer(channel){
                    //最后一个参数是消息体
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body)
                            throws IOException
                    {
                        System.out.println(new String(body));
                    }

            });
            System.out.println(hello);
            //连接关闭
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

总结

  • 无论是生成者还是消费者都需要连接到rabbitmq的server。通过信道操作消息

work模型

Work queues, 也被称为(Task queues), 任务模型。 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费, 就会消失,因此任务是不会被重复执行的。


image.png

角色:
●P:生产者:任务的发布者
●C1:消费者,领取任务并且完成任务,假设完成速度较慢
●C2:消费者2:领取任务并完成任务,假设完成速度快

总结

  • 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

为了避免消费者1处理消息的业务慢,消费者2处理消息的业务快。但是因为轮询机制,导致消息被消费者1拿了过去进行堵塞,从而导致系统宕机。消息确认机制相应出现。

消费者需要拿到消息之后回复队列已处理才会拿到下一条消息,从而实现能者多劳

  • 消费者1:假设完成速度较慢
public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建一个信道
        Channel channel = mqConnection.createChannel();
        //信道每次只传递1个消息
        channel.basicQos(1);
        //绑定到一个work队列
        channel.queueDeclare("work",true,false,false,null);
        //获取消息
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //模拟消费者处理慢的场景
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2:--->"+new String(body));
                //参数1:根据标签回复那条消息,参数2:是否回复多条消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
  • 消费者2:领取任务并完成任务,假设完成速度快
public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建一个信道
        Channel channel = mqConnection.createChannel();
        //信道每次只传递1个消息
        channel.basicQos(1);
        //绑定到一个work队列
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:--->"+new String(body));
                //参数1:根据标签回复那条消息,参数2:是否回复多条消息
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        });
    }

注意

代码中在消费者里面设置了信道里面只消费1条消息,并且处理业务之后会回复消息已被消费

fanout广播模型

fanout.png

在广播模式下,消息发送流程是这样的:
●可以有多个消费者
●每个消费者有自己的queue (队列)
●每个队列都要绑定到Exchange (交换机)
●生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
●交换机把消息发送给绑定过的所有队列
●队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者代码

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建一个信道
        Channel channel = mqConnection.createChannel();
        //声明一个交换机。参数1:交换机名称,参数2:选择交换机模式。fanout:广播模式
        channel.exchangeDeclare("logs","fanout");
        channel.basicPublish("logs","",null,("fanout条消息").getBytes());
        //关闭连接
        RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);

    }

消费者代码

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建一个信道
        Channel channel = mqConnection.createChannel();
        //绑定一个交换机
        channel.exchangeDeclare("logs","fanout");
        //获取临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queue,"logs","");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:--->"+new String(body));
            }
        });
    }

Routing模式:1direct(直连模型)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
●队列与交换机的绑定,不能是任意绑定了,而是要指定-个RoutingKey (路由key)
●消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。
●Exchange不再把消息交给每- 个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息


image.png

生产者代码

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建一个信道
        Channel channel = mqConnection.createChannel();
        //创建一个交换器,参数1:交换器名称,参数2:交换器模式:路由
        channel.exchangeDeclare("log_direct","direct");
        //声明一个routingKey
        String routingKey="error";
        //发送消息
        channel.basicPublish("log_direct",routingKey,null,("这是direc模型发送的消息:"+routingKey).getBytes());
        //关闭链接
        RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
    }

消费者1代码

public static void main(String[] args) throws IOException {
        //获取链接
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建信道
        Channel channel = mqConnection.createChannel();
        //信道绑定交换器
        channel.exchangeDeclare("log_direct","direct");
        //获取临时队列名称
        String queue = channel.queueDeclare().getQueue();
        String routingKey="info";
        //信道绑定交换器,路由键,队列
        channel.queueBind(queue,"log_direct",routingKey);
        //获取消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }

消费者2代码

public static void main(String[] args) throws IOException {
        //创建一个连接
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建一个信道
        Channel channel = mqConnection.createChannel();
        //绑定一个交换机
        channel.exchangeDeclare("log_direct","direct");
        //获取信道的一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //定义三个路由键
        String routingKey="error";
        String routingKeyWarring="warring";
        String routingKeyInfo="info";
        //信道绑定队列,交换机和路由键
        channel.queueBind(queue,"log_direct",routingKey);
        channel.queueBind(queue,"log_direct",routingKeyWarring);
        channel.queueBind(queue,"log_direct",routingKeyInfo);
        //获取消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:消费了"+new String(body));
            }
        });
    }

Routing模式:topic(订阅模型)

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配
符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以"分割,例如: item. insert


topic.png
  • 通配符:
    * 代表只匹配一个单词,比如user.*的路由键可以接受user.add,user.delete,user.update
    #代表只匹配多个单词,比如user.#的路由键可以接受user.add.all,user.delete.all

生产者代码

public static void main(String[] args) throws IOException {
        //创建连接
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        //创建信道
        Channel channel = mqConnection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("topic","topic");
        //路由
        String routingKey="user.save";
        //发送消息
        channel.basicPublish("topic",routingKey,null,("生产了"+routingKey+"消息").getBytes());
        //关闭连接
        RabbitMqUtils.closeConnectionAndChannel(channel,mqConnection);
    }

消费者1代码是 * 的通配符

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.exchangeDeclare("topic","topic");
        String queue = channel.queueDeclare().getQueue();
        //路由键匹配一个单词比如user.save
        channel.queueBind(queue,"topic","user.*");
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }

消费者2代码是#号的通配符

public static void main(String[] args) throws IOException {
        Connection mqConnection = RabbitMqUtils.getMqConnection();
        Channel channel = mqConnection.createChannel();
        channel.exchangeDeclare("topic","topic");
        String queue = channel.queueDeclare().getQueue();
        //路由键匹配多个单词比如user.id.delete
        channel.queueBind(queue,"topic","user.#");
        //消费消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,311评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,339评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,671评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,252评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,253评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,031评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,340评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,973评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,466评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,937评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,039评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,701评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,254评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,259评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,497评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,786评论 2 345

推荐阅读更多精彩内容