Introduction to MQTT:
Overview:
MQTT is a lightweight Message Queuing Telemetry Transport that aims to enable low bandwidth and high latency communication between devices. It is a messaging protocol based on the publish/subscribe model (Publish/Subscribe) that was originally developed by IBM and has now become an open standard and is widely used in the field of the Internet of Things (IoT).
MQTT features include:
1. Lightweight: The MQTT protocol is simple to design and has a lightweight message head. It is suitable for devices in restricted environments such as sensors, embedded devices, etc.
2. Simple and easy to use: MQTT adopts a publish/subscribe mode, decoupling between the sender (publisher) and the receiver (subscriber) of the message, and the communication process is simple and easy to understand.
3. Low bandwidth and high latency: The MQTT protocol design takes into account the conditions of network bandwidth limitation and high latency, and can maintain stable message transmission in an unsatisfactory network environment.
4. Reliability: MQTT supports message persistence and confirmation mechanisms to ensure reliable transmission of messages. It also provides a QoS (Quality of Service) level, which can be flexibly configured according to actual needs.
5. Flexibility: MQTT supports multiple message formats and load types, and can transmit various types of messages such as text and binary data. It also supports SSL/TLS encryption to ensure communication security.
6. Suitable for a variety of scenarios: Due to its lightweight and flexibility, MQTT is widely used in the Internet of Things, sensor networks, remote monitoring, message notification and other scenarios, becoming one of the important communication protocols for connecting devices.
Without further ado, just look at how the code is connected
Due to project requirements, what we do is to dynamically read the mqtt connection information that has been configured in the database when the project is started, and dynamically connect to the server based on this information, and perform persistence and related logical processing after receiving the message.
1. Load the dependency first
<!-- mqtt --> <dependency> <groupId></groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2. Because you need to connect at the start of the project, but you have to wait for the project to be initialized to get the mapper you want to use, so you need to implement the ApplicationRunner interface in this class, without using other methods, there are many implementations, but I use this one
package ; import ; import ; import ; import ; import ; import .mqttv3.*; import .; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; /** * @author 1097022316 * Establish an MQTT connection after startup and persist data and related logic processing */ @Component public class StartInit implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); // Here is to query all mqtt connection related information from the database, such as ip, topic, etc. List<DeviceCollector> collectorList = (new QueryWrapper<DeviceCollector>().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic")); ("collectorList = " + collectorList); //If there is no connection, end directly if ((collectorList)) { return; } List<MqttBean> mqttBeans = new ArrayList<>(); //Assemble the information queryed from the database into the objects required for mqttclient connection // Generally, it is ip, port, username, password, topic, clientid. Here is a simple usage. If there are advanced usages, you can explore it yourself (by the way, please give me some comments below) (mqtt -> { MqttBean bean = new MqttBean("tcp://" + () + ":" + (), (), (), (), ()); (bean); }); //Distribute the traversal and loop connection of the mqtt connection object information we assembled (bean -> { try { MqttClient mqttClient = new MqttClient((), (), persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); //Set relevant connection parameters, some are necessary and some are not necessary. You can click in to view the source code by yourself. (false); (true); (()); (().toCharArray()); (connOpts); //Add the connection object globally ((), mqttClient); (new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { ("Reconnected successfully. url = " + serverURI); } else { ("Connected successfully for the first time."); } } /** * Set up the reconnect mechanism */ @Override public void connectionLost(Throwable cause) { (() + "Connection lost" + ()); if (!()) { try { (1000 * 60 * 5); //Try to connect ("bean = " + bean); boolean flag = (new MqttBean((), (), (), (), ())); if (flag) { (() + "Reconnect, resubscribe!"); } } catch (InterruptedException e) { (() + "The MQTT connection is abnormal" + (), "CLCW"); throw new RuntimeException(e); } } } @Override public void messageArrived(String topic, MqttMessage message) { //This is what we do when the message is pushed ("dosomethings..."); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { ("Message sent in full"); } }); ((), 2); } catch (Exception e) { (); (() + "The MQTT connection is abnormal"); (() + "The MQTT connection is abnormal"+(),"LJCW"); try { (1000 * 60 * 30); //Try to connect ("bean = " + bean); boolean flag = (new MqttBean((), (), (), (), ())); if (flag) { (() + "Reconnect, resubscribe!"); } } catch (InterruptedException ex) { //You can save the error message locally here throw new RuntimeException(ex); } } }); } }
import ; import ; import ; /** * @author 1097022316 */ @Data @AllArgsConstructor @NoArgsConstructor public class MqttBean { private String url; private String userName; private String password; private String topics; private String clientId; }
The focus is on several parameters and configuration parameters in MqttClient, as well as those rewrite methods. Just look at the source code. The use here is relatively rough, but it simply realizes connection and reconnection. Some complex things such as heartbeat or wills are useless. If you want to study, you can check it yourself.
The above is the detailed content of SpringBoot integrating MOTT to dynamically read database connection information and connect to MQTT server. For more information about SpringBoot MOTT reading database, please pay attention to my other related articles!