Updated on 2025-03-04

Use Python and MQTT to implement asynchronous communication function

What is the MQTT protocol?

MQTT is a lightweight publish/subscribe messaging protocol designed for low bandwidth and high latency network environments, ideal for communication between IoT devices. Its main features include:

  • Publish/Subscribe Model: Supports many-to-many messaging.
  • Lightweight design: Lower network overhead.
  • Support QoS level: Provides different messaging reliability.

Project background

The sample code in this article implements a Python-based MQTT client. The following features are covered in the code:

  • Secure connection to the MQTT proxy via SSL.
  • Support dynamic subscription to multiple topics.
  • Process messages asynchronously, improving performance and scalability.
  • Provides custom message processing capabilities.

Core code analysis

The following are the main functions and module analysis in the code:

MQTT client class

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
         = ()
        .username_pw_set(, )
        .on_connect = self.on_connect
        .on_message = self.on_message

  • tls_set: Enable SSL/TLS to ensure communication security.

  • Subscription: Automatically subscribe to the specified topic when the connection is successful.

Custom message processing

def set_message_handler(self, handler):
    self.custom_message_handler = handler

The user can pass in a custom callback function through this method to process messages according to business logic.

Start the client asynchronously

async def start_async(self):
    await asyncio.get_event_loop().run_in_executor(None, .loop_forever)

Ensure efficient processing of messages through asynchronous event loops while avoiding blocking the main thread.

Sample code integration

In the main fileIn the following process is defined:

  • Initialize the MQTT client and pass in the necessary parameters.
  • Register a custom message handling function.
  • useasyncioImplement concurrent execution of message processing and other tasks.
async def on_mqtt_message(topic, payload):
    print(f"Custom handler: {topic} -> {payload}")

await mqtt_client.start_async()

User Guide

Installation dependencies

Make sure to be installedpaho-mqttLibrary:

pip install paho-mqtt

Configure MQTT proxy

Update the proxy address, port, username, password, and certificate path in the code.

Run the program

Run the program with the following command:



Quickly build a real-time communication system based on the MQTT protocol. This architecture is not only suitable for IoT scenarios, but also plays a role in a variety of applications that require real-time data push, such as chat applications and real-time monitoring systems.

Sample code

import  as mqtt
from datetime import datetime
import asyncio

class MQTTClient:
    def __init__(self, broker, port, username, password, ca_cert, topics):
         Initialize the MQTT client
         = broker
         = port
         = username
         = password
        self.ca_cert = ca_cert
         = topics
         = ()

        # Configure the MQTT client        .username_pw_set(, )
        .on_connect = self.on_connect
        .on_message = self.on_message

        self.custom_message_handler = None  # Custom message processor
    def set_message_handler(self, handler):
         Set custom message processing callback function
        self.custom_message_handler = handler

    def on_connect(self, client, userdata, flags, rc):
         Callback when the connection is successful
        if rc == 0:
            print("SSL connection is successful")
            for topic in :
                print(f"Subscribed Topic: {topic}")
            print(f"Connection failed,Return code: {rc}")

    def on_message(self, client, userdata, msg):
         Callback when receiving the message
        current_time = ()
        payload = ()
        print(f"Received a message: {} -> {payload} time: {current_time}")

        if self.custom_message_handler and self.event_loop:
                self.custom_message_handler(, payload),

    def connect(self):
         Connect to the MQTT server
        (, , keepalive=60)

    async def start_async(self):
         Run MQTT client asynchronously
        ()  # Make sure to connect to the MQTT server        print("Starting MQTT client loop...")

        # Asynchronously running the event loop of the MQTT client        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, .loop_forever)

import asyncio
from mqtt import MQTTClient

# MQTT ConfigurationMQTT_BROKER = "Your server address"
MQTT_PORT = 8883  # Use SSL portsMQTT_USERNAME = "username"
MQTT_PASSWORD = "password"
CA_CERT = "./"  # CA certificate pathTOPICS = ["clients/disconnect", "uhome/esp32"]  # Subscribe to the topic list

async def main():
    loop = asyncio.get_running_loop()

    mqtt_client = MQTTClient(

    async def on_mqtt_message(topic, payload):
        print(f"Custom handler: {topic} -> {payload}")

    mqtt_client.event_loop = loop  # Pass the event loop to the MQTT client    await mqtt_client.start_async()

    await (websocket_task, periodic_task)

if __name__ == "__main__":

