1、RocketMQ
[官网地址] (http://rocketmq.apache.org)
function : 应用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容等
2、linux 单机RocketMQ
可以参考官网quick-start
1、准备RocketMQ
-
从官网下载编译好的二进制文件,或者下载源码自己编译。
RocketMQ 当前的最新版本是4.2.0
系统要求: 64bit 的Linux 、Unix 或Mac 。
Java 版本大于等于JDKl.8 。
如果需要从GitHub 上下载源码和编译的话需要安装Maven 3.2.x 和Git 。
[root@aliyun rocketmq-all-4.2.0-bin]#> unzip rocketmq-all-4.2.0-bin-release.zip -d ./rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]# cd rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]# ls
里面含有以下内容: LICENSE NOTICE README.md benchmark/ bin/ conf/ lib/
+ LICENSE 、NOTICE 和README.md 包括一些版权声明和功能说明信息;
+ benchmark 里包括运行benchmark 程序的shell 脚本;
+ bin 文件夹里含有各种使用RocketMQ的shell脚本和cmd 脚本,比如启动NameServer的mqnamesrv启动Broker的mqbroker,集群管理脚本mqadmin 等;
+ conf 文件夹里有一些示例配置文件,包括三种方式的broker 配置文件、logback 日志配置文件等,用户在写配置文件的时候,一般基于这些示例配置文件,加上自己特殊的需求即可;
+ lib 文件夹里包括RocketMQ各个模块编译成的jar 包,以及RocketMQ 依赖的一些jar包,比如Netty、commons-lang、FastJSON 等。
2、启动RocketMQ服务
启动单机的消息队列服务比较简单,不需要写配置文件,只需要依次启动本机的NameServer 和Broker 即可。
启动NameServer:
[root@aliyun rocketmq-all-4.2.0-bin]#> nohup sh bin/mqnamesrv &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success . ..
启动B roker :
[root@aliyun rocketmq-all-4.2.0-bin]# nohup sh bin/mqbroker -n localhost:9876 &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 192.168.0.233 : 10911] boot success .. .
!!!!!! 1->内存不足
如果Java运行时环境的内存不足,修改jdk参数配置
rocketmq-all-4.2.0-bin/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
rocketmq-all-4.2.0-bin/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
!!!!!! 2-> org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
3、用命令行发送和接收消息
实际上就是运行写好的demo 程序,后续我们可以参考这些demo 来写自己的发送和接收程序。
运行示例程序,发送和接收消息:
[root@aliyun rocketmq-all-4.2.0-bin]# export NAMESRV_ADDR=localhost:9876
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND OK, msgid= ...
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread 主d Receive New Messages : [MessageExt . ..
4、关闭消息队列
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown broker
Send shutdown request to mqbroker (36695 ) OK
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown namesrv
Send shutdown request t o mqnamesrv (36664) OK
5、java client发送、消费消息 demo
- java client
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.0.0-incubating</version>
</dependency>
- Producer
创建DefaultMQProducer对象,设置好GroupName和NameServer后启动,把待发送的消息拼装成Message对象,用Producer发送。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer1");
// 设置NameServer地址 , 多个地址之间用;分隔 ,但是也可以通过环境变量的方式设置 。
producer.setNamesrvAddr("47.104.209.137:9876");
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message(
"TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//这里调用的是同步的方式,所以会有返回结果
SendResult sendResult = producer.send(msg);
//打印返回结果,可以看到消息发送的状态以及一些相关信息
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
- Consumer
设置GroupName 、NameServer 地址以及端口号。然后指明要操作的Topic 名称,最后进入发送和接收逻辑。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("47.104.209.137:9876");
//这里设置的是一个consumer的消费策略
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//设置一个Listener,主要进行消息的逻辑处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//返回消费状态
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用start()方法启动consumer
consumer.start();
System.out.println("Consumer Started.");
}
}