- 消息协议是指用于实现消息队列功能时候所涉及的协议。消息协议可分为开放协议和私有协议,常见的开放协议有AMQP、MQTT、STOMP?XMPP等。但是并不是MQ框架一定需要实现以上的协议,有的特殊框架比如Kafka、Redis、ZeroMQ)自己基于TCP封装了一套协议,实现了MQ的功能。
- JMS:JMS是java消息服务应用程序接口,是java平台面向消息中间件的一套规范的API接口。用于在两个应用程序之间或者两个分布式系统之间进行发送消息,进行异步通信。目前JMS的版本是02年发布的1.1版本和13年发布的2.0版本。(JMS是与平台无关的,类似JDBC,绝大多数消息中间件厂商都支持JMS接口规范,也就是说可以通过JMS API来连接支持各种消息协议的中间件产品)
JMS提供了操作各类消息中间件的API接口,而不需要考虑没有一个消息中间件的具体实现。类似JDBC。
- 2.0版本相对1.1版本主要变化在于API的简化和消息使用场景增加了一些新的特性,核心模型并没有变化。
1、JMS体系架构
(1)点对点模型
在点对点模型中,应用程序由队列、发送者和接受者组成,每一条消息都被发送到特定的队列中,接受者从队列中获取相应的消息。
特点:
1、每一条消息只有一个消费者,消息一旦被消费了就不在保留在消息队列中。
2、发送者和消费者之间在时间上没有依赖。
3、消息存在者先后顺序。(除非使用了消息优先级)
4、当接受者接受到消息时候,会发送确认收到通知
(2)发布订阅模型
在发布订阅模型中,应用程序由主题、发布者和订阅者组成。发布者发布每一条消息,该消息通过主题传递给所有的订阅者。
特点:
1、每一条消息可以有多个订阅者
2、发布者和订阅者之间有时间上的依赖关系
3.、JMS允许创建一个可持久化的订阅,这样即使订阅者没有运行也能接收到所订阅的消息。
4、每条消息都会传给该主题下面的所欲偶订阅者
5、通常发布者不会知道也意识不到哪一个订阅者在接受消息
2、JMS基本概念
(1)JMS应用包含的部分
1、JMS客户端(发送和接受消息的java程序)
2、非JMS客户端
3、消息
4、JMS提供商(实现JMA API的实际消息系统)
5、授管对象
(2)JMS应用程序包含部分
1、生产者:创建并发送消息的JMS客户端
2、消费者:接收消息的JMS客户端
3、客户端:生产或者消费基于java的应用程序或者对象
4、队列:点对点模型中的队列
5、主题:发布/订阅模型中的主题
6、消息:在JMS客户端之间传递的数据对象(包含消息头、属性和消息体三部分)
3、接口编程
(1)ConnectionFactory接口(连接工厂)
创建Connection的工厂,根据不同的消息类型用户可以选择使用队列连接工厂或者使用主题连接工厂分别对应QueueConnectionFactory和TopicConnectionFactory
(2)Destination接口
Destination是一个包装了消息目的地标识符的授管对象,消息目的地是指:消息发布和消息接收的地点,要么是队列要么是主题。主要有两种类型的对象Queue和Topic
(3)Connection接口
Connection表示客户端和JMS系统之间建立了连接(实际上就是对应TCP/socket)的包装,Connection可以对应一个或者多个Session,Connection和连接工厂一样也有两种类型:QueueConnection和TopicConnection
(4)Session接口
Session是实际操作消息的接口,表示单线程的上下文,用于发送和接受消息。Session也分为QueueSession和TopicSession。
(5)MessageProducer接口
消息生产者又Session创建并用于将消息发送到Destination。消费者可以是同步或者是异步接收队列和主题类型的消息。生产者有两种类型,QueueSender和TopicPublisher
(6)MessageConsumer接口
消息消费者由Session创建,用于接收发送到Destination的消息。消费者有两种类型:QueueReceive和TopicSubscriber
(7)Message接口
消息在消费者和生产者之间传递的对象
(8)MessageListener消息监听器
如果注册了消息监听器,那么消息达到了会自动的调用监听器的onMessage方法
5、使用案例
(1)引入activeMq包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
(2)消息生产者
public class QueueProducer {
/**
* 默认用户名
*/
public final static String USERNAME= ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public final static String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认地址
*/
public final static String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//1、创建连工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(USERNAME,PASSWORD ,BROKER_URL );
try {
//2、创建连接
Connection connection = connectionFactory.createConnection();
//3、启动连接
connection.start();
//4、创建会话
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//5、创建队列
Queue queueName = session.createQueue("hello queue");
//6、创建生产消费者
MessageProducer producer = session.createProducer(queueName);
//发送消息
Message message=session.createTextMessage("first message!");
producer.send(message);
//提交事物
session.commit();
//关闭资源
session.close();
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
(3)消息消费者
public class QueueConsumer {
/**
* 默认用户名
*/
public final static String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public final static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认地址
*/
public final static String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//1、创建连工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//2、创建连接
Connection connection = connectionFactory.createConnection();
//3、启动连接
connection.start();
//4、创建会话
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//5、创建队列
Queue queueName = session.createQueue("hello queue");
//6、创建消费者
MessageConsumer consumer = session.createConsumer(queueName);
//实现message监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000*100);
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}