目录
概述
1 ThingsCloud平台上创建项目
1.1 创建项目
1.2 配置App UI
2 认识paho.mqtt.client
3 实现MQTT Client
3.1 实现的接口介绍
3.2 paho.mqtt.client库函数介绍
3.3 MQTT Client类实现
3.3.1 创建项目
3.3.2 编写MQTT Client类代码
3.3.3 Log工具源码
4 实现User Client功能
4.1 功能介绍
4.1.1 上报数据
4.1.2 订阅数据
4.2 功能代码实现
4.2.1 编写代码
4.2.2 配置ThingsCloud参数
5 系统功能测试
5.1 运行Client连接ThingsCloud
5.2 App查看数据
5.3 注意问题
使用paho.mqtt.client库实现一个MQTT Client,并使其连接到ThingsCloud物联网平台。
概述
本文主要介绍使用paho.mqtt.client库实现一个MQTT Client,并使其连接到ThingsCloud物联网平台。其中包括在ThingsCloud创建项目的方法,配置参数的步骤,设计App UI的详细过程。还是用该MQTT Client发布数据至ThingsCloud,并在ThingsCloud上通过可视面板呈现出来。MQTT Client也能订阅ThingsCloud发布的Topic数据。
1 ThingsCloud平台上创建项目
1.1 创建项目
登录ThingsCloud,并在该平台上创建项目,
https://www.thingscloud.xyz/
创建项目的方法和步骤,参看文档:
https://www.thingscloud.xyz/docs/
创建完成项目后,进入到项目数据参数主页面
1.2 配置App UI
配置AppUI 需要三个步骤,具体实现过程如下:
Step - 1: 配置参数类型
Step - 2: 在设备类型面板上配置App,app上显示的参数会和客户端传递到平台上的数据绑定起来
Step -3: 打开编辑器就可以设计App UI,并将UI item和数据绑定起来
Step - 4: 创建用户,这样App上才能使用该用户名进行登录
2 认识paho.mqtt.client
paho.mqtt.client是一个Python MQTT客户端库,它提供了与MQTT代理进行通信的功能。MQTT是一种轻量级的消息传递协议,通常用于物联网应用中的设备间通信。
使用paho.mqtt.client,可以创建一个MQTT客户端,并使用其提供的方法来连接到MQTT代理,发布和订阅主题,接收和处理消息。
登录网站可以了解paho.mqtt.client的相关内容。登录地址如下:
https://mqtt.org/software/
3 实现MQTT Client
3.1 实现的接口介绍
使用paho.mqtt.client编写一个 MQTT客户端,可以实现,消息订阅,发布功能。主要功能如下:
1)和Broker相关的参数通过配置文件来实现,不能写死在代码里
2)实现publish topic和subscriber topic功能
3)实时打印publish log和subscriber log
4)使用专用的logging库来处理log数据
5)使用class的方式来实现软件功能
3.2 paho.mqtt.client库函数介绍
在使用paho.mqtt.client库函数编写代码之前,必须对它所提供的重要接口函数的用法有一个清晰的认识,这对后面的如何使用这些接口非常重要。paho.mqtt.client提供了需要参数可供使用,这里只介绍一些,编程中必须用到的接口。其他函数的功能和用法可参考官方文档。
1)connect ()
连接MQTT Broker函数,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
host | MQTT Broker地址 |
port | 连接Broker的端口号 |
keeplive | 心跳包时间间隔 |
2)username_pw_set()
设置Client的用户名和对应的密码,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
username | Client的用户名 |
password | Client的用户名对应的密码 |
3)loop_start()
调用该接口,启动MQTT Client工作线程,这时就可以进行publish或者subscrib 消息,函数原型如下:
4)loop_stop()
调用该接口,销毁MQTT Client工作线程,函数原型如下:
5)subscribe()
订阅消息接口,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
topic | 订阅的主题,例如:subscribe("my/topic", 2) |
qos | 消息的服务质量等级 |
options and properties | 这两个参数在MQTT v5.0的版本中使用,当前版本不使用这两个参数 |
6)unsubscribe()
取消订阅消息接口,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
topic | 取消订阅的主题,例如:unsubscribe("my/topic") |
properties | 这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数 |
7)publish()
发布消息接口,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
topic | 发布消息的主题 |
payload | 消息内容 |
qos | 消息服务等级 |
retain | Broker是否保留消息 |
properties | 这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数 |
3.3 MQTT Client类实现
3.3.1 创建项目
本项目使用PyCharm 作为开发工具,运行代码前必须安装 paho.mqtt.client库。详细安装方法和步骤见官网文档。创建项目之后编写如下代码
3.3.2 编写MQTT Client类代码
编写一个MQTTClient的user类,实现MQTT Client的基本功能,函数列表和介绍如下:
函数名 | 描述 |
---|---|
start | 注册回调函数和连接MQTT Client |
stop | 断开连接和销毁MQTT Client |
usr_subscribe | 订阅函数 |
usr_publish | 发布消息函数 |
usr_unsubscribe | 取消订阅函数 |
usr_on_message | 订阅消息回调函数 |
usr_log_callback | log监控函数 |
receive_msg | 接收消息函数 |
详细代码如下:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @describe : mqtt handler
# @Time : 2022/03/18 16:21
# @Author : ming fei.tang
import logging
from queue import Queue
import paho.mqtt.client as mqtt
__all__ = ["MQTTClient"]
class MQTTClient:
def __init__(self, host, port, qos, heartbeat, client_id, username, password):self.host = hostself.port = portself.qos = qosself.queue = Queue()self.mqtt_client_id = client_idself.heartbeat = heartbeatself.username = usernameself.password = passwordself.mqtt_client = None
def usr_on_message(self, user, data, msg):payload = msg.payload.decode('utf-8')payload = payload.replace('\n', '').replace('\r', '').replace(' ', '')logging.debug('subscribe: %s , payload: %s, QoS = %s' % (msg.topic, payload, msg.qos))
self.queue.put(msg)
def usr_subscribe(self, topic):self.mqtt_client.subscribe(topic, self.qos)logging.info('subscribe the topic: %s' % topic)
def usr_unsubscribe(self, topic):self.mqtt_client.unsubscribe(topic)logging.info('unsubscribe %s' % topic)
def receive_msg(self, timeout=None):logging.info('waiting for message.')if timeout is None:timeout = self.heartbeatreturn self.queue.get(timeout=timeout)
def usr_publish(self, topic, payload, qos, retain=False):self.mqtt_client.publish(topic, payload, qos, retain)logging.debug('public topic = %s, payload = %s , qos = %s, retain = %s' % (topic, payload, qos, retain))
def usr_log_callback(self, client, userdata, level, msg):# logging.info('public topic: %s ' % msg)pass
def start(self):if self.mqtt_client is None:self.mqtt_client = mqtt.Client(client_id=self.mqtt_client_id)self.mqtt_client.on_log = self.usr_log_callbackself.mqtt_client.on_message = self.usr_on_messageself.mqtt_client.username_pw_set(self.username, self.password)self.mqtt_client.connect(self.host, self.port, self.heartbeat)self.mqtt_client.loop_start()logging.info("client('%s') is connected" % self.mqtt_client_id)else:logging.error("mqtt_client object is None")
def stop(self):if self.mqtt_client is not None:self.mqtt_client.loop_stop()logging.info("client('%s') is disconnected" % self.mqtt_client_id)self.mqtt_client.disconnect()self.mqtt_client = None
3.3.3 Log工具源码
创建logging_tool.py,编写如下代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/4/9 13:05
# @Author : ming fei.tang
# @File : log tools
# ---------------------
import os
import logging
import datetime
import sys
import coloredlogs
__all__ = ["LogTool"]
class LogTool:def __init__(self):self.log_folder = 'clog' + os.sep + datetime.datetime.now().strftime('%Y%m%d')
def setup_logging(self, level=logging.DEBUG, filename=None):if os.path.exists(self.log_folder) is False:os.makedirs(self.log_folder)
log_filename = Noneif filename is not None:log_filename = self.log_folder + os.sep + filename
log_format = '%(asctime)5s - %(levelname)5s - %(lineno)4s - %(filename)18s - %(message)s'if log_filename is not None:console = logging.StreamHandler(stream=sys.stdout)console.setLevel(logging.getLogger().level)console.setFormatter(logging.Formatter(log_format))logging.getLogger().addHandler(console)
logging.basicConfig(filename=log_filename, level=level, format=log_format)coloredlogs.install(level=level, fmt=log_format, milliseconds=True)
def remove_log_folder(self):if os.path.exists(self.log_folder) is False:os.remove(self.log_folder)
4 实现User Client功能
4.1 功能介绍
4.1.1 上报数据
上报的数据结构
payload = {"temperature": 0,"humidity": 62.5, "luminosity": 2000, "co2": 62.5, "switch": True}
数据类型说明:
参数 | 数据类型 | 描述 |
---|---|---|
temperature | Number (数值) | 温度值 |
humidity | Number (数值) | 湿度值 |
luminosity | Number (数值) | 光照值 |
co2 | Number (数值) | CO2浓度值 |
switch | Switch (开关量) | 开关量状态值 |
上报数据的Topic:
attributes
4.1.2 订阅数据
订阅数据的Topic:
attributes/push
ThingsCloud发布的控制量为:
4.2 功能代码实现
4.2.1 编写代码
使用Pycharm 创建工程,创建user_mqttClient.py,实现如下代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2024/3/9 21:05
# @Author : ming fei.tang
# @File : mqtt client test
# ---------------------
import json
import logging
import random
import time
from lib.MQTT_Client import MQTTClient
from lib.logging_tool import LogTool
class UserMqttClient(MQTTClient):def __init__(self, broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password):# prepare log filesuper().__init__(broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password)self.debug = LogTool()self.debug.setup_logging()self.debug.remove_log_folder()try:self.start()except Exception as e:logging.exception(e)assert False, e
def load_para(self):pass
def user_publish(self, topic):switch = False# = {"temperature": 12.5, "light": 1500, "switch": True,"humidity": 62.5}payload = {"temperature": 0,"humidity": 62.5, "luminosity": 2000, "co2": 62.5, "switch": True}while True:payload["temperature"] = 12.5 + random.randint(1, 10) * 0.1# lightpayload["luminosity"] = 2550 + random.randint(10, 55)# humiditypayload["humidity"] = 62.5 + random.randint(10, 15) * 0.1
# co2payload["co2"] = 1757 + random.randint(1, 10)
payload["switch"] = switch
pub_payload = json.dumps(payload)self.usr_publish(topic, pub_payload, 1, True)time.sleep(30)if switch:switch = Falseelse:switch = True
if __name__ == '__main__':host = "sh-3-mqtt.iot-api.com"port = 1883client_id = ""username = ""password = ""
user_client = UserMqttClient(host, port, 0, 60, client_id, username, password)
user_client.usr_subscribe("switch")user_client.usr_subscribe("attributes/push")
user_client.user_publish(topic="attributes")
4.2.2 配置ThingsCloud参数
User MQTT Client要连接到ThingsCloud必须配置一些参数,这些参数是ThingsCloud分配给每个Client的。在ThingsCloud平台的设备,面板中找到这些参数,并把它们配置到Client中。
在Client的代码需要将上面的参数填在如下位置:
5 系统功能测试
5.1 运行Client连接ThingsCloud
在Ubuntu上运行Client代码会看见:
此时,ThingsCloud平台已经显示该Client上线
如果配置过可视化面板,可以在这个面板,通过UI可以实时显示当前的数据
5.2 App查看数据
进入到用户应用面板,点击微信小程序。
点击微信小程序,扫描其提供的二维码,就能进入到App
进入到App后,可以实时查看当前的数据:
5.3 注意问题
上报至ThingsCloud数据的频率不能太快,否则会出现如下情况