1开发环境
采用maven仓库,初始化一个springboot的初始工程,pom文件额外导入spring integration和mqtt的依赖包。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2自定义mqtt配置类
@Configuration
public class ConfigMqtt {
//日志
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMqtt.class);
......
}
3配置信息
导入mqtt消息中间件的连接信息,本地开启apollo作为mqtt服务器,采用默认端口,因此配置如下:
/*
可以通过在主配置文件里面定义,然后引用配置文件里面的信息,也可以在@Value后面直接指定数据。
*/
@Value("${username:admin}")//apollo默认用户名是admin
private String username;
@Value("${password:password}")//apollo默认密码是password
private String password;
@Value("${url:tcp://127.0.0.1:61613}")//本机的访问路径,采用tcp协议访问
private String url;
@Value("${producerClientId:mqttProducer}")//生产者的客户端ID
private String producerClientId;
@Value("${producerDefaultTopic:topic1}")//生产者发送的主题
private String producerDefaultTopic;
@Value("${consumerClientId:mqttConsumer}")//消费者的客户端ID
private String consumerClientId;
@Value("${consumerDefaultTopic:topic1}")//消费者订阅的主题,这里可以是多个
private String consumerDefaultTopic;
4mqtt客户端和连接器
网上的代码大部分是先定义连接器选项,然后再通过DefaultMqttPahoClientFactory中的setConnectionOptions()方法配置连接信息,连接器信息是如下设置
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(username);
// 设置连接的密码
options.setPassword(password.toCharArray());
options.setServerURIs(StringUtils.split(url, ","));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
但是这里我并没有发现DefaultMqttPahoClientFactory有setConnectionOptions()的方法提示,通过翻看源码,DefaultMqttPahoClientFactory实现了MqttPahoClientFactory接口,MqttPahoClientFactory接口内部没有setConnectionOptions()方法,DefaultMqttPahoClientFactory内部也没有定义setConnectionOptions()的方法,此时我们可以修改DefaultMqttPahoClientFactory的底层源码加入setConnectionOptions()方法和ConnectionOptions属性,也可以不通过连接器直接用该类的set方法手动配置,第二种代码如下:
/**
* MQTT客户端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setPassword("password");
factory.setUserName("admin");
factory.setCleanSession(true);
factory.setServerURIs(url);
factory.setConnectionTimeout(10);
factory.setKeepAliveInterval(20);
factory.setWill(new DefaultMqttPahoClientFactory.Will("willTopic",WILL_DATA,2,false));
return factory;
}
此时当项目启动mqtt的客户端会自动注入到spring容器中。
5mqtt生产者
/**
* MQTT信息通道(生产者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
producerClientId,
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(producerDefaultTopic);
return messageHandler;
}
6mqtt消费者
/**
* MQTT消息订阅绑定(消费者)
*
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic,MqttPahoMessageDrivenChannelAdapter内部根据参数的不同重载了多个构造方法,因此可以灵活运用,可以订阅多个主题,经过实验往后面加就行了。
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
consumerClientId, mqttClientFactory(), consumerDefaultTopic);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT信息通道(消费者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean//(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(消费者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
/* @Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//LOGGER.error("===================={}============", message.getPayload());
System.out.println(message.getPayload().toString());
System.out.println(message.getHeaders().toString());
String payload = message.getPayload().toString();
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 根据topic分别进行消息处理。
if (topic.equals("topic1")) {
System.out.println("topic1: 处理消息 " + payload);
} else if (topic.equals("topic2")) {
System.out.println("topic2: 处理消息 " + payload);
} else {
System.out.println(topic + ": 丢弃消息 " + payload);
}
}
};
}*/
@Bean
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handler() {
return message -> {
System.out.println(message.getPayload().toString());
System.out.println(message.getHeaders().toString());
String payload = message.getPayload().toString();
String topic = message.getHeaders().get("mqtt_topic").toString();
// 根据topic分别进行消息处理。
if (topic.equals("topic1")) {
System.out.println("topic1: 处理消息 " + payload);
} else if (topic.equals("topic2")) {
System.out.println("topic2: 处理消息 " + payload);
} else {
System.out.println(topic + ": 丢弃消息 " + payload);
}
};
}
消息处理器有两种,网上大部分采用的是lambda表达式的一种,采用这种写法idea可以会报错,原因是lambda表达式是jdk8以后的写法,如果idea的编译环境不是jdk8就会报错,此时在pom文件里面加maven插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgument>-Xlint:deprecation</compilerArgument>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
7发送接口
自定义接口,需要注意的是接口上用@MessagingGateway注解,默认请求通道为生产者的信息通道。
8控制器Controller
比较简单,把发送接口注入进来调用自己写的方法即可。