软硬件环境
- ubuntu 18.04 64bit
- anaconda3 & python3.6.2
- paho-mqtt
预备知识
参考之前写的一篇博文 https://xugaoxiang.com/2019/12/08/mqtt/,博文测试时mqtt broker
采用的是mosquitto
,同时在测试发送和接收时采用mosquitto_sub
和mosquitto_pub
命令行工具。
安装paho-mqtt
conda install paho-mqtt
代码实践
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
'''
:param client:
:param userdata:
:param flags:
:param rc:
:return:
'''
print('connect with rc: {}'.format(rc))
if rc != 0:
print('pub connect failed.')
client.disconnect()
def on_disconnect(client, userdata, rc=0):
'''
:param client:
:param userdata:
:param rc:
:return:
'''
print('disconnect with rc: {}'.format(rc))
client.loop_stop()
def on_publish(client, userdata, mid):
'''
:param client:
:param userdata:
:param mid:
:return:
'''
print('publish success.')
def do_publish(topic, message):
'''
:param topic:
:param message:
:return: publish message via mqtt
'''
client = mqtt.Client()
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.connect(host='127.0.0.1', port=1883, keepalive=60)
client.loop_start()
try:
client.publish('{}'.format(topic), '{}'.format(message))
except:
print('publish {} exception.'.format(message))
finally:
client.disconnect()
上述代码完成的是信息发布,分别实现了客户端连接、发布、断开的回调,在connect
之后调用loop_start()
,其作用是开启新的后台线程,防止主线程被阻塞,在client
断开后要loop_stop
杀掉线程。
信息的订阅寄接收的过程跟发布是一样的,publish
换成subscribe
,接收在on_message
回调里进行处理。