中间件
非底层操作系统软件,非业务应用软件,不能直接给最终用户使用和带来价值的软件。
消息中间件
关注于数据的发送和接收,利用高可靠的异步消息传递机制集成分布式系统。
AMQP
AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端和消息中间件可传递消息。,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
JMS
Java消息服务(Java Message Service)是一个Java平台中面向消息中间件的API,用于在两个应用程序间或分布式系统中发送消息,进行异步通信。
JMS相关概念
- 提供者:实现JMS规范的消息中间件服务器
- 客户端:发送或接收消息的应用程序
- 生产者/发布者:创建并发送消息的客户端
- 消费者/订阅者:接收并处理消息的客户端
- 消息:应用程序之间传递的数据内容
- 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
JMS消息模式
队列模型
- 客户端包括生产者和消费者
- 队列中的每个消息只能被一个消费者消费
- 消费者可以随时消费队列中的消息
主题模型
- 客户端包括发布者和订阅者
- 主题中的消息被所有订阅者消费
- 消费者不能消费订阅之前就发送到主题中的信息
JMS编码接口
- ConnectionFactory 用于创建连接到消息中间件的连接工厂
- Connection 代表了应用程序和消息服务器之间的通信链路
- Destination 指消息发布和接收的地点,包括队列和主题
- Session 表示一个单线程的上下文,用于发送和接收消息
- MessageCosumer 由会话创建,用于接收发送到目标的消息
- MessageProducer 由会话创建,用于发送消息到目标
- Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体
生产者实现
/**
* 生产者
*/
public class QueueProducer {
// ActiveMQ地址
private static final String URL = "tcp://127.0.0.1:61616";
// 队列名称
private static final String QUEUE_NAME = "queue.test";
private static final Logger log = LoggerFactory.getLogger(QueueProducer.class);
public static void main(String[] args) throws JMSException {
// 创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 创建Connection
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建Destination
Destination destination = session.createQueue(QUEUE_NAME);
// 创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 创建消息
TextMessage textMessage = session.createTextMessage("test: " + i);
// 发布消息
producer.send(textMessage);
log.info("send message: " + textMessage.getText());
}
// 关闭连接
connection.close();
}
}
消费者实现
/**
* 消费者
*/
public class QueueConsumer {
// ActiveMQ地址
private static final String URL = "tcp://127.0.0.1:61616";
// 队列名称
private static final String QUEUE_NAME = "queue.test";
private static final Logger log = LoggerFactory.getLogger(QueueConsumer.class);
public static void main(String[] args) throws JMSException {
// 创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 创建Connection
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标
Destination destination = session.createQueue(QUEUE_NAME);
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 创建监听器,异步监听消息并处理(消费)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
log.info("receive message: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
ActiveMQ
安装
- Windows:解压安装包后通过bin目录下的activemq.bat启动服务,或使用InstallService.bat安装服务并启动
- Linux 解压安装包后进入bin目录,通过./activemq start和./activemq stop启动和停止服务
控制台
访问ActiveMQ控制台:http://127.0.0.1:8161/admin/(默认登录账号/密码:admin/admin)。
在导航栏可以访问Queues(队列消息)和Topics(主题消息)。
通过控制台可以看到队列名、待消费消息的数量、当前消费者数量、入队消息数量、出队消息数量和操作等。
集群
- 实现高可用,以排除单点故障引起的服务中断
- 实现负载均衡,以提升效率为更多客户提供服务
集群方式
- 客户端集群:让多个消费者消费同一个队列
- Broker clusters:多个Broker之间同步消息
- Master/Slave:实现高可用
Spring对JMS的支持
JmsTemplate
用于发送消息
- 在Spring容器中注册JmsTemplate即可操作JMS
- JmsTemplate是线程安全的,可以在整个应用内操作
DefaultMessageListenerContainer
消息监听容器
- 配置JMS连接
- 配置消费队列
- 配置消费监听器
- 配置消费者数量
依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
spring-jms配置文件
配置生产者、Destination和消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 配置扫描jms包 -->
<context:component-scan base-package="com.wch.ssm.jms"/>
<!-- ActiveMQ提供的ConnectionFactory,符合JMS规范 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!-- Spring jms提供的连接池 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- 队列Destination -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue.test"/>
</bean>
<!-- 主题Destination -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic.test"/>
</bean>
<!-- 注册JmsTemplate,用于发送消息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 配置消息监听器 -->
<bean id="consumerTextMessageListener" class="com.wch.ssm.jms.consumer.ConsumerTextMessageListener"/>
<!-- queueDestination消息监听容器 -->
<bean id="jmsContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerTextMessageListener"/>
<!-- 配置多个消费者 -->
<property name="concurrency" value="3-5"/>
</bean>
<!-- topicDestination消息监听容器 -->
<bean id="jmsContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="consumerTextMessageListener"/>
</bean>
</beans>
配置生产者及其实现类
public interface ProducerService {
/**
* 发送TextMessage
*
* @param message TextMessage
*/
void sendTextMessage(String message);
}
@Service
public class ProducerServiceImpl implements ProducerService {
@Resource
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination queueDestination;
@Resource(name = "topicDestination")
private Destination topicDestination;
private static final Logger log = LoggerFactory.getLogger(ProducerServiceImpl.class);
@Override
public void sendTextMessage(String message) {
// 创建MessageCreator
MessageCreator messageCreator = new MessageCreator() {
/**
* 创建TextMessage
* @param session session
* @return TextMessage
* @throws JMSException JMSException
*/
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
log.info("{} produce message: {}", Thread.currentThread().getName(), textMessage.getText());
return textMessage;
}
};
for (int i = 0; i < 1000; i++) {
// 发送队列消息
jmsTemplate.send(queueDestination, messageCreator);
// 发送主题消息
jmsTemplate.send(topicDestination, messageCreator);
}
}
}
配置消费者
要求实现MessageListener接口,对监听到的消息进行处理(如何消费)
public class ConsumerTextMessageListener implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(ConsumerTextMessageListener.class);
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
log.info("{} consume message: {}", Thread.currentThread().getName(), textMessage.getText());
} catch (JMSException e) {
log.error("consume message failed");
}
}
}
测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring/spring-jms.xml")
public class JmsTest {
@Resource
private ProducerService producerService;
@Test
public void jmsTest() {
producerService.sendTextMessage("test");
}
}
控制台打印
SpringBoot集成JMS
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置
SpringBoot不能同时支持queue和topic两种消息模型,默认支持queue,通过配置特定的ListenerContainer使得同时支持两种消息模型。
@Configuration
public class JmsConfig {
@Value("${jms-name.queue-test}")
private String queueName;
@Value("${jms-name.topic-test}")
private String topicName;
/**
* 配置Queue
* @return Queue
*/
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}
/**
* 配置Topic
* @return Topic
*/
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
/**
* 配置JmsMessagingTemplate,对JmsTemplate的封装
* @param jmsTemplate jmsTemplate
* @return JmsMessagingTemplate
*/
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) {
return new JmsMessagingTemplate(jmsTemplate);
}
/**
* 为Topic配置MessageListenerContainer,默认为Queue Container
* @param connectionFactory connectionFactory
* @return JmsListenerContainerFactory
*/
@Bean
public JmsListenerContainerFactory<?> topicListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// 设置topic消息类型
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
配置生产者
@Service
@EnableScheduling
public class Producer {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
@Resource
private Queue queue;
@Resource
private Topic topic;
private static int count = 0;
private static final Logger log = LoggerFactory.getLogger(Producer.class);
/**
* 每1000ms分别生产一条消息给两个消息模型
*/
@Scheduled(fixedRate = 1000)
public void sendTextMessage() {
String queueMessage = "to queue " + count;
String topicMessage = "to topic " + count++;
// 发送消息给队列
log.info("{} produce message: {}", Thread.currentThread().getName(), queueMessage);
jmsMessagingTemplate.convertAndSend(this.queue, queueMessage);
// 发送消息给主题
log.info("{} produce message: {}", Thread.currentThread().getName(), topicMessage);
jmsMessagingTemplate.convertAndSend(this.topic, topicMessage);
}
}
配置消费者
@Service
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
// 设置监听多个消息模型
@JmsListeners(value = {
// 监听queue,设置1-3个消费者
@JmsListener(destination = "${jms-name.queue-test}", concurrency = "1-3"),
// 监听topic,配置特定的监听容器
@JmsListener(destination = "${jms-name.topic-test}", containerFactory = "topicListenerContainerFactory")
})
public void consumeMessage(Message message) throws JMSException {
TextMessage textMessage = (TextMessage) message;
log.info("{} consume message: {}", Thread.currentThread().getName(), textMessage.getText());
}
}