SoFunction
Updated on 2025-03-04

Use Python and MQTT to implement asynchronous communication function

What is the MQTT protocol?

MQTT is a lightweight publish/subscribe messaging protocol designed for low bandwidth and high latency network environments, ideal for communication between IoT devices. Its main features include:

  • Publish/Subscribe Model: Supports many-to-many messaging.
  • Lightweight design: Lower network overhead.
  • Support QoS level: Provides different messaging reliability.

Project background

The sample code in this article implements a Python-based MQTT client. The following features are covered in the code:

  • Secure connection to the MQTT proxy via SSL.
  • Support dynamic subscription to multiple topics.
  • Process messages asynchronously, improving performance and scalability.
  • Provides custom message processing capabilities.

Core code analysis

The following are the main functions and module analysis in the code:

MQTT client class

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
         = ()
        .username_pw_set(, )
        .tls_set(ca_certs=self.ca_cert)
        .on_connect = self.on_connect
        .on_message = self.on_message

  • tls_set: Enable SSL/TLS to ensure communication security.

  • Subscription: Automatically subscribe to the specified topic when the connection is successful.

Custom message processing

def set_message_handler(self, handler):
    self.custom_message_handler = handler

The user can pass in a custom callback function through this method to process messages according to business logic.

Start the client asynchronously

async def start_async(self):
    ()
    await asyncio.get_event_loop().run_in_executor(None, .loop_forever)

Ensure efficient processing of messages through asynchronous event loops while avoiding blocking the main thread.

Sample code integration

In the main fileIn the following process is defined:

  • Initialize the MQTT client and pass in the necessary parameters.
  • Register a custom message handling function.
  • useasyncioImplement concurrent execution of message processing and other tasks.
async def on_mqtt_message(topic, payload):
    print(f"Custom handler: {topic} -> {payload}")

mqtt_client.set_message_handler(on_mqtt_message)
await mqtt_client.start_async()

User Guide

Installation dependencies

Make sure to be installedpaho-mqttLibrary:

pip install paho-mqtt

Configure MQTT proxy

Update the proxy address, port, username, password, and certificate path in the code.

Run the program

Run the program with the following command:

python 

Summarize

Quickly build a real-time communication system based on the MQTT protocol. This architecture is not only suitable for IoT scenarios, but also plays a role in a variety of applications that require real-time data push, such as chat applications and real-time monitoring systems.

Sample code

import  as mqtt
from datetime import datetime
import asyncio

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
        """
         Initialize the MQTT client
         """
         = broker
         = port
         = username
         = password
        self.ca_cert = ca_cert
         = topics
         = ()

        # Configure the MQTT client        .username_pw_set(, )
        .tls_set(ca_certs=self.ca_cert)
        .on_connect = self.on_connect
        .on_message = self.on_message

        self.custom_message_handler = None  # Custom message processor
    def set_message_handler(self, handler):
        """
         Set custom message processing callback function
         """
        self.custom_message_handler = handler

    def on_connect(self, client, userdata, flags, rc):
        """
         Callback when the connection is successful
         """
        if rc == 0:
            print("SSL connection is successful")
            for topic in :
                (topic)
                print(f"Subscribed Topic: {topic}")
        else:
            print(f"Connection failed,Return code: {rc}")

    def on_message(self, client, userdata, msg):
        """
         Callback when receiving the message
         """
        current_time = ()
        payload = ()
        print(f"Received a message: {} -> {payload} time: {current_time}")

        if self.custom_message_handler and self.event_loop:
            asyncio.run_coroutine_threadsafe(
                self.custom_message_handler(, payload),
                self.event_loop
            )

    def connect(self):
        """
         Connect to the MQTT server
         """
        (, , keepalive=60)

    async def start_async(self):
        """
         Run MQTT client asynchronously
         """
        ()  # Make sure to connect to the MQTT server        print("Starting MQTT client loop...")

        # Asynchronously running the event loop of the MQTT client        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, .loop_forever)

import asyncio
from mqtt import MQTTClient

# MQTT ConfigurationMQTT_BROKER = "Your server address"
MQTT_PORT = 8883  # Use SSL portsMQTT_USERNAME = "username"
MQTT_PASSWORD = "password"
CA_CERT = "./"  # CA certificate pathTOPICS = ["clients/disconnect", "uhome/esp32"]  # Subscribe to the topic list

async def main():
    loop = asyncio.get_running_loop()

    mqtt_client = MQTTClient(
        broker=MQTT_BROKER,
        port=MQTT_PORT,
        username=MQTT_USERNAME,
        password=MQTT_PASSWORD,
        ca_cert=CA_CERT,
        topics=TOPICS
    )

    async def on_mqtt_message(topic, payload):
        print(f"Custom handler: {topic} -> {payload}")

    mqtt_client.set_message_handler(on_mqtt_message)
    mqtt_client.event_loop = loop  # Pass the event loop to the MQTT client    await mqtt_client.start_async()


    await (websocket_task, periodic_task)


if __name__ == "__main__":
    (main())

This is the end of this article about using Python and MQTT to implement asynchronous communication functions. For more related content of Python MQTT asynchronous communication, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!