What is the MQTT protocol?
MQTT is a lightweight publish/subscribe messaging protocol designed for low bandwidth and high latency network environments, ideal for communication between IoT devices. Its main features include:
- Publish/Subscribe Model: Supports many-to-many messaging.
- Lightweight design: Lower network overhead.
- Support QoS level: Provides different messaging reliability.
Project background
The sample code in this article implements a Python-based MQTT client. The following features are covered in the code:
- Secure connection to the MQTT proxy via SSL.
- Support dynamic subscription to multiple topics.
- Process messages asynchronously, improving performance and scalability.
- Provides custom message processing capabilities.
Core code analysis
The following are the main functions and module analysis in the code:
MQTT client class
class MQTTClient: def __init__(self, broker, port, username, password, ca_cert, topics): = () .username_pw_set(, ) .tls_set(ca_certs=self.ca_cert) .on_connect = self.on_connect .on_message = self.on_message
tls_set
: Enable SSL/TLS to ensure communication security.Subscription: Automatically subscribe to the specified topic when the connection is successful.
Custom message processing
def set_message_handler(self, handler): self.custom_message_handler = handler
The user can pass in a custom callback function through this method to process messages according to business logic.
Start the client asynchronously
async def start_async(self): () await asyncio.get_event_loop().run_in_executor(None, .loop_forever)
Ensure efficient processing of messages through asynchronous event loops while avoiding blocking the main thread.
Sample code integration
In the main fileIn the following process is defined:
- Initialize the MQTT client and pass in the necessary parameters.
- Register a custom message handling function.
- use
asyncio
Implement concurrent execution of message processing and other tasks.
async def on_mqtt_message(topic, payload): print(f"Custom handler: {topic} -> {payload}") mqtt_client.set_message_handler(on_mqtt_message) await mqtt_client.start_async()
User Guide
Installation dependencies
Make sure to be installedpaho-mqtt
Library:
pip install paho-mqtt
Configure MQTT proxy
Update the proxy address, port, username, password, and certificate path in the code.
Run the program
Run the program with the following command:
python
Summarize
Quickly build a real-time communication system based on the MQTT protocol. This architecture is not only suitable for IoT scenarios, but also plays a role in a variety of applications that require real-time data push, such as chat applications and real-time monitoring systems.
Sample code
import as mqtt from datetime import datetime import asyncio class MQTTClient: def __init__(self, broker, port, username, password, ca_cert, topics): """ Initialize the MQTT client """ = broker = port = username = password self.ca_cert = ca_cert = topics = () # Configure the MQTT client .username_pw_set(, ) .tls_set(ca_certs=self.ca_cert) .on_connect = self.on_connect .on_message = self.on_message self.custom_message_handler = None # Custom message processor def set_message_handler(self, handler): """ Set custom message processing callback function """ self.custom_message_handler = handler def on_connect(self, client, userdata, flags, rc): """ Callback when the connection is successful """ if rc == 0: print("SSL connection is successful") for topic in : (topic) print(f"Subscribed Topic: {topic}") else: print(f"Connection failed,Return code: {rc}") def on_message(self, client, userdata, msg): """ Callback when receiving the message """ current_time = () payload = () print(f"Received a message: {} -> {payload} time: {current_time}") if self.custom_message_handler and self.event_loop: asyncio.run_coroutine_threadsafe( self.custom_message_handler(, payload), self.event_loop ) def connect(self): """ Connect to the MQTT server """ (, , keepalive=60) async def start_async(self): """ Run MQTT client asynchronously """ () # Make sure to connect to the MQTT server print("Starting MQTT client loop...") # Asynchronously running the event loop of the MQTT client loop = asyncio.get_event_loop() await loop.run_in_executor(None, .loop_forever)
import asyncio from mqtt import MQTTClient # MQTT ConfigurationMQTT_BROKER = "Your server address" MQTT_PORT = 8883 # Use SSL portsMQTT_USERNAME = "username" MQTT_PASSWORD = "password" CA_CERT = "./" # CA certificate pathTOPICS = ["clients/disconnect", "uhome/esp32"] # Subscribe to the topic list async def main(): loop = asyncio.get_running_loop() mqtt_client = MQTTClient( broker=MQTT_BROKER, port=MQTT_PORT, username=MQTT_USERNAME, password=MQTT_PASSWORD, ca_cert=CA_CERT, topics=TOPICS ) async def on_mqtt_message(topic, payload): print(f"Custom handler: {topic} -> {payload}") mqtt_client.set_message_handler(on_mqtt_message) mqtt_client.event_loop = loop # Pass the event loop to the MQTT client await mqtt_client.start_async() await (websocket_task, periodic_task) if __name__ == "__main__": (main())
This is the end of this article about using Python and MQTT to implement asynchronous communication functions. For more related content of Python MQTT asynchronous communication, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!