try mqtt
ABOUT MQTT
- A brief intro on MQTT:MQTT
- Awesome MQTT
- MQTT Broker 选型
- EMQ 开源版本和企业版本以及平台版本详细对比
TRY MQTT
纠结了一会,选择EMQX作为MQTT Broker 来搞一个demo。EMQX MQTT协议支持完善,支持集群,Apache License 2.0,文档全,开源社区相对活跃。美中不足用的是erlang开发(懒的学)
EMQX 官网文档:https://docs.emqx.io/broker/v3/cn/
EMQX Github:https://github.com/emqx/emqx
EMQX Docker Hub:https://hub.docker.com/r/emqx/emqx
EMQX文档相当完善,中文,建议完全阅读。
Targets:
-
运行EMQX作为MQTT Broker
容器部署:
docker pull emqx/emqx:v3.2.5
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
1883 MQTT 协议端口 8883 MQTT/SSL 端口 8083 MQTT/WebSocket 端口 8080 HTTP API 端口 18083 Dashboard 管理控制台端口 -
客户端使用spring-integration-mqtt作为mqtt客户端,
依赖:
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.2.1.RELEASE</version> </dependency>
Java Configuration:
@Configuration public class MqttConfiguration { @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{"tcp://localhost:1883"}); factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttSubscriber", mqttClientFactory(), "mqtt-test"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String payload = message.getPayload().toString(); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); System.out.println("收到来自" + topic + "的消息:" + payload); }; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler outbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("mqttPublisher", mqttClientFactory()); messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。 messageHandler.setDefaultTopic("mqtt-test"); return messageHandler; } }
Mqtt Message Gateway:
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttPublisher { void sendToMqtt(String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
来个LLT测试一下:
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class MqttTest { @Autowired private MqttPublisher mqttGateway; @Test public void init() { try { mqttGateway.sendToMqtt("mqtt-test", "this is a mqtt message from test"); System.out.println("send mqtt message successfully."); } catch (Exception e) { System.out.println("send mqtt message failed."); } } }
执行成功后,配置中定义的订阅消费者MessageHandler会收到消息,并打印:
收到来自mqtt-test的消息:this is a mqtt message from test