springboot集成mqtt

1开发环境

采用maven仓库,初始化一个springboot的初始工程,pom文件额外导入spring integration和mqtt的依赖包。
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2自定义mqtt配置类

@Configuration
public class ConfigMqtt {
    //日志
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMqtt.class);
    ......
}

3配置信息

导入mqtt消息中间件的连接信息,本地开启apollo作为mqtt服务器,采用默认端口,因此配置如下:
/*
    可以通过在主配置文件里面定义,然后引用配置文件里面的信息,也可以在@Value后面直接指定数据。    
    */

    @Value("${username:admin}")//apollo默认用户名是admin
    private String username;

    @Value("${password:password}")//apollo默认密码是password
    private String password;

    @Value("${url:tcp://127.0.0.1:61613}")//本机的访问路径,采用tcp协议访问
    private String url;

    @Value("${producerClientId:mqttProducer}")//生产者的客户端ID
    private String producerClientId;

    @Value("${producerDefaultTopic:topic1}")//生产者发送的主题
    private String producerDefaultTopic;

    @Value("${consumerClientId:mqttConsumer}")//消费者的客户端ID
    private String consumerClientId;

    @Value("${consumerDefaultTopic:topic1}")//消费者订阅的主题,这里可以是多个
    private String consumerDefaultTopic;

4mqtt客户端和连接器

网上的代码大部分是先定义连接器选项,然后再通过DefaultMqttPahoClientFactory中的setConnectionOptions()方法配置连接信息,连接器信息是如下设置
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        // 设置连接的用户名
        options.setUserName(username);
        // 设置连接的密码
        options.setPassword(password.toCharArray());
        options.setServerURIs(StringUtils.split(url, ","));
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
  }
但是这里我并没有发现DefaultMqttPahoClientFactory有setConnectionOptions()的方法提示,通过翻看源码,DefaultMqttPahoClientFactory实现了MqttPahoClientFactory接口,MqttPahoClientFactory接口内部没有setConnectionOptions()方法,DefaultMqttPahoClientFactory内部也没有定义setConnectionOptions()的方法,此时我们可以修改DefaultMqttPahoClientFactory的底层源码加入setConnectionOptions()方法和ConnectionOptions属性,也可以不通过连接器直接用该类的set方法手动配置,第二种代码如下:
/**
     * MQTT客户端
     *
     * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setPassword("password");
        factory.setUserName("admin");
        factory.setCleanSession(true);
        factory.setServerURIs(url);
        factory.setConnectionTimeout(10);
        factory.setKeepAliveInterval(20);
        factory.setWill(new DefaultMqttPahoClientFactory.Will("willTopic",WILL_DATA,2,false));
        return factory;
    }
此时当项目启动mqtt的客户端会自动注入到spring容器中。

5mqtt生产者

/**
     * MQTT信息通道(生产者)
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(生产者)
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                producerClientId,
                mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(producerDefaultTopic);
        return messageHandler;
    }

6mqtt消费者

/**
     * MQTT消息订阅绑定(消费者)
     *
     * @return {@link org.springframework.integration.core.MessageProducer}
     */
    @Bean
    public MessageProducer inbound() {
        // 可以同时消费(订阅)多个Topic,MqttPahoMessageDrivenChannelAdapter内部根据参数的不同重载了多个构造方法,因此可以灵活运用,可以订阅多个主题,经过实验往后面加就行了。
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                consumerClientId, mqttClientFactory(), consumerDefaultTopic);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * MQTT信息通道(消费者)
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean//(name = CHANNEL_NAME_IN)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(消费者)
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
/*    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //LOGGER.error("===================={}============", message.getPayload());
                System.out.println(message.getPayload().toString());
                System.out.println(message.getHeaders().toString());
                String payload = message.getPayload().toString();
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                // 根据topic分别进行消息处理。
                if (topic.equals("topic1")) {
                    System.out.println("topic1: 处理消息 " + payload);
                } else if (topic.equals("topic2")) {
                    System.out.println("topic2: 处理消息 " + payload);
                } else {
                    System.out.println(topic + ": 丢弃消息 " + payload);
                }
            }
        };
    }*/

    @Bean
    //ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handler() {
        return message -> {
            System.out.println(message.getPayload().toString());
            System.out.println(message.getHeaders().toString());
            String payload = message.getPayload().toString();
            String topic = message.getHeaders().get("mqtt_topic").toString();
            // 根据topic分别进行消息处理。
            if (topic.equals("topic1")) {
                System.out.println("topic1: 处理消息 " + payload);
            } else if (topic.equals("topic2")) {
                System.out.println("topic2: 处理消息 " + payload);
            } else {
                System.out.println(topic + ": 丢弃消息 " + payload);
            }
        };
    }
消息处理器有两种,网上大部分采用的是lambda表达式的一种,采用这种写法idea可以会报错,原因是lambda表达式是jdk8以后的写法,如果idea的编译环境不是jdk8就会报错,此时在pom文件里面加maven插件
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
        <compilerArgument>-Xlint:deprecation</compilerArgument>
        <source>1.8</source>
        <target>1.8</target>
    </configuration>
</plugin>

7发送接口

自定义接口,需要注意的是接口上用@MessagingGateway注解,默认请求通道为生产者的信息通道。

8控制器Controller

比较简单,把发送接口注入进来调用自己写的方法即可。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容