try mqtt

ABOUT MQTT

  1. A brief intro on MQTT:MQTT
  2. Awesome MQTT
  3. MQTT Broker 选型
  4. EMQ 开源版本和企业版本以及平台版本详细对比

TRY MQTT

纠结了一会,选择EMQX作为MQTT Broker 来搞一个demo。EMQX MQTT协议支持完善,支持集群,Apache License 2.0,文档全,开源社区相对活跃。美中不足用的是erlang开发(懒的学)

EMQX 官网文档:https://docs.emqx.io/broker/v3/cn/

EMQX Github:https://github.com/emqx/emqx

EMQX Docker Hub:https://hub.docker.com/r/emqx/emqx

EMQX文档相当完善,中文,建议完全阅读。

Targets:

  1. 运行EMQX作为MQTT Broker

    容器部署:

    docker pull emqx/emqx:v3.2.5
    
    docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
    
    1883 MQTT 协议端口
    8883 MQTT/SSL 端口
    8083 MQTT/WebSocket 端口
    8080 HTTP API 端口
    18083 Dashboard 管理控制台端口
  2. 客户端使用spring-integration-mqtt作为mqtt客户端,

    依赖:

                <dependency>
                    <groupId>org.springframework.integration</groupId>
                    <artifactId>spring-integration-mqtt</artifactId>
                    <version>5.2.1.RELEASE</version>
                </dependency>
    

    Java Configuration:

    @Configuration
    public class MqttConfiguration {
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(new String[]{"tcp://localhost:1883"});
            factory.setConnectionOptions(options);
            return factory;
        }
       
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
       
        @Bean
        public MessageProducer inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttSubscriber",
                    mqttClientFactory(), "mqtt-test");
            adapter.setCompletionTimeout(5000);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
       
        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return message -> {
                String payload = message.getPayload().toString();
                String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
                System.out.println("收到来自" + topic + "的消息:" + payload);
            };
        }
       
        @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
       
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler outbound() {
            // 在这里进行mqttOutboundChannel的相关设置
            MqttPahoMessageHandler messageHandler =
                    new MqttPahoMessageHandler("mqttPublisher", mqttClientFactory());
            messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。
            messageHandler.setDefaultTopic("mqtt-test");
            return messageHandler;
        }
    }
    

    Mqtt Message Gateway:

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttPublisher {
        void sendToMqtt(String payload);
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    }
    

    来个LLT测试一下:

    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
    public class MqttTest {
        @Autowired
        private MqttPublisher mqttGateway;
       
        @Test
        public void init() {
            try {
                mqttGateway.sendToMqtt("mqtt-test", "this is a mqtt message from test");
                System.out.println("send mqtt message successfully.");
            } catch (Exception e) {
                System.out.println("send mqtt message failed.");
            }
        }
    }
    

    执行成功后,配置中定义的订阅消费者MessageHandler会收到消息,并打印:

    收到来自mqtt-test的消息:this is a mqtt message from test