Flask是一个用Python编写的轻量级Web应用程序框架,被称为“微框架”,因为它使用一个简单的核心来扩展其他功能,如ORM,表单验证工具,文件上传,各种开放身份验证技术等。
MQTT是一种基于发布/订阅模式的轻量级物联网(IoT)消息传输协议。它可以为联网设备提供实时可靠的消息服务,代码非常少,带宽更小。广泛应用于物联网、移动互联网、智能硬件、车联网、电力和能源等行业。
本文主要介绍如何在 Flask 项目中使用 MQTT,并实现 MQTT 客户端与 MQTT 代理之间的连接、订阅、消息传递、退订等功能。
我们将使用 Flask-MQTT 客户端库,这是一个 Flask 扩展,可以看作是 paho-mqtt 的装饰器,以简化 Flask 应用程序中的 MQTT 集成。
项目初始化
本项目使用 Python 3.8 进行开发和测试,用户可以使用以下命令来验证 Python 的版本。
$ python3 --version
Python 3.8.2
使用 Pip 安装 Flask-MQTT 库。
pip3 install flask-mqtt
使用 Flask-MQTT
我们将采用基于 MQTT 云服务 – EMQX 云创建的免费公共 MQTT 代理。以下是服务器访问信息:
- 代理:
broker.emqx.io
- TCP 端口:1883
- 网络套接字端口:8083
进口烧瓶-MQTT
导入 Flask 库和 Flask-MQTT 扩展,然后创建 Flask 应用程序。
from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask(__name__)
配置 Flask-MQTT 扩展
完整的配置项请参考 Flask-MQTT 配置文档。
写入连接回调函数
我们可以在这个回调函数中处理成功或失败的 MQTT 连接,此示例将在连接成功后订阅 /flask/mqtt
主题。
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
print('Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
print('Bad connection. Code:', rc)
写入消息回调函数
此函数将打印主题收到的 /flask/mqtt
消息。
创建消息发布接口
我们创建一个简单的 POST API 来发布 MQTT 消息。
在实际情况下,API 可能需要一些更复杂的业务逻辑处理。
@app.route('/publish', methods=['POST'])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})
运行烧瓶应用程序
当 Flask 应用程序启动时,MQTT 客户端将连接到服务器并订阅主题 /flask/mqtt
。
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000)
测试
现在,我们使用 MQTT 客户端 – MQTT X 来连接、订阅和发布测试。
接收消息
-
在 MQTT X 中创建连接并连接到 MQTT 服务器。
-
我们将在 Flask 运行窗口中看到 MQTT X 发送的消息。
-
订阅
/flask/mqtt
MQTT X 中的主题。 -
使用邮递员调用
/publish
API:将消息Hello from Flask
发送到/flask/mqtt
主题。
发布消息
完整代码
from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
app = Flask(__name__)
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5 # Set KeepAlive time in seconds
app
on_connect()
def handle_connect(client, userdata, flags, rc):
如果 RC == 0:
打印(“连接成功”)
mqtt_client.订阅(主题) # 订阅主题
还:
打印(’连接不良。代码:’, rc)
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
数据 = 字典(
topic=message.topic,
payload=message.payload.decode()
)
print(’收到关于主题的消息: {topic} 有效负载: {payload}’.format(**data))
@app.route(’/publish’, methods=[‘POST’])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data[‘topic’], request_data[‘msg’])
return jsonify({‘code’: publish_result[0]})
如果__name__ == ‘__main__’:
app.run(host=’127.0.0.1’, port=5000)
局限性
Flask-MQTT 目前不适合与多个工作线程实例一起使用。因此,如果您使用像 gevent 或 gunicorn 这样的WSGI服务器,请确保您只有一个worker实例。
总结
到目前为止,我们已经完成了一个简单的 MQTT 客户端,使用 Flask-MQTT 并且可以在 Flask 应用程序中订阅和发布消息。