Python library that needs to be installed
Use python to write programs to test the publish and subscription capabilities of MQTT. First, install:pip install paho-mqtt
Test release (pub)
My MQTT is deployed on Alibaba Cloud's server, so I wrote a python program on my machine for testing.
Then reopen a terminal in the shell and subscribe to a topic "chat" mosquitto_sub -t chat
The publishing function of testing remote MQTT on this machine is to treat yourself as a sender. When you send the message, all objects that have subscribed to the topic will receive the message they send. # encoding: utf-8 import as mqtt HOST = "" PORT = 1883 def test(): client = () (HOST, PORT, 60) ("chat","hello liefyuan",2) # Post a message with the topic 'chat' and the content 'hello liefyuan' client.loop_forever() if __name__ == '__main__': test()
Publish/Subscribe to test
# -*- coding: utf-8 -*- import as mqtt MQTTHOST = "" MQTTPORT = 1883 mqttClient = () # Connect to the MQTT serverdef on_mqtt_connect(): (MQTTHOST, MQTTPORT, 60) mqttClient.loop_start() # publish messagedef on_publish(topic, payload, qos): (topic, payload, qos) # Message processing functiondef on_message_come(lient, userdata, msg): print( + " " + ":" + str()) # subscribe messagedef on_subscribe(): ("/server", 1) mqttClient.on_message = on_message_come # Message arrival processing functiondef main(): on_mqtt_connect() on_publish("/test/server", "Hello Python!", 1) on_subscribe() while True: pass if __name__ == '__main__': main()
Annotation function:
(self, host, port, keepalive, bind_address) (self, topic, payload, qos, retain) (self, topic, qos)
Test subscription (sub)
Writing a program to test the subscription function on this machine is to let your program be a recipient, and you will wait for it yourself when the same topic has not published (pub) information.
# encoding: utf-8 import as mqtt def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) ("chat") def on_message(client, userdata, msg): print(+" " + ":" + str()) client = () client.on_connect = on_connect client.on_message = on_message ("", 1883, 60) client.loop_forever()
