Flask是一个用Python编写的轻量级Web应用程序框架,被称为“微框架”,因为它使用一个简单的核心来扩展其他功能,如ORM,表单验证工具,文件上传,各种开放身份验证技术等。

MQTT是一种基于发布/订阅模式的轻量级物联网(IoT)消息传输协议。它可以为联网设备提供实时可靠的消息服务,代码非常少,带宽更小。广泛应用于物联网、移动互联网、智能硬件、车联网、电力和能源等行业。

本文主要介绍如何在 Flask 项目中使用 MQTT,并实现 MQTT 客户端与 MQTT 代理之间的连接、订阅、消息传递、退订等功能。

我们将使用 Flask-MQTT 客户端库,这是一个 Flask 扩展,可以看作是 paho-mqtt 的装饰器,以简化 Flask 应用程序中的 MQTT 集成。

项目初始化

本项目使用 Python 3.8 进行开发和测试,用户可以使用以下命令来验证 Python 的版本。

 

$ python3 --version
Python 3.8.2

使用 Pip 安装 Flask-MQTT 库。

 
pip3 install flask-mqtt

使用 Flask-MQTT

我们将采用基于 MQTT 云服务 – EMQX 云创建的免费公共 MQTT 代理。以下是服务器访问信息:

  • 代理: broker.emqx.io
  • TCP 端口:1883
  • 网络套接字端口:8083

进口烧瓶-MQTT

导入 Flask 库和 Flask-MQTT 扩展,然后创建 Flask 应用程序。

 

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt

app = Flask(__name__)

配置 Flask-MQTT 扩展

 
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io' app.config['MQTT_BROKER_PORT'] = 1883 app.config['MQTT_USERNAME'] = '' # Set this item when you need to verify username and password app.config['MQTT_PASSWORD'] = '' # Set this item when you need to verify username and password app.config['MQTT_KEEPALIVE'] = 5 # Set KeepAlive time in seconds app.config['MQTT_TLS_ENABLED'] = False # If your server supports TLS, set it True topic = '/flask/mqtt' mqtt_client = Mqtt(app)

完整的配置项请参考 Flask-MQTT 配置文档。

写入连接回调函数

我们可以在这个回调函数中处理成功或失败的 MQTT 连接,此示例将在连接成功后订阅 /flask/mqtt 主题。

 

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic) # subscribe topic
   else:
       print('Bad connection. Code:', rc)

写入消息回调函数

此函数将打印主题收到的 /flask/mqtt 消息。

 
@mqtt_client.on_message() def handle_mqtt_message(client, userdata, message): data = dict( topic=message.topic, payload=message.payload.decode() )    print('Received message on topic: {topic} with payload: {payload}'.format(**data))

创建消息发布接口

我们创建一个简单的 POST API 来发布 MQTT 消息。

在实际情况下,API 可能需要一些更复杂的业务逻辑处理。

 

@app.route('/publish', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})

运行烧瓶应用程序

当 Flask 应用程序启动时,MQTT 客户端将连接到服务器并订阅主题 /flask/mqtt

 

if __name__ == '__main__':
   app.run(host='127.0.0.1', port=5000)

测试

现在,我们使用 MQTT 客户端 – MQTT X 来连接、订阅和发布测试。

接收消息

  1. 在 MQTT X 中创建连接并连接到 MQTT 服务器。

    Create a connection in MQTT X and connect to the MQTT server</li>
</ol>
<p>png“ data-new=”false“ data-size=”286478“ data-sizeformatted=”286.5 kB“ data-src=”https://dz2cdn1.dzone.com/storage/temp/16545247-1669737530796.png“ data-src=”temp“ data-url=”https://dz2cdn1.dzone.com/storage/temp/16545247-1669737530796.png“ src=”http://www.cheeli.com.cn/wp-content/uploads/2022/11/16545247-1669737530796.png“ style=”width: 571px;”/></p>
<li>
<p>发布 <code>Hello from MQTT X</code> 到 <code>/flask/mqtt</code> MQTT X 中的主题。</p>
<p><img class=

  2. 我们将在 Flask 运行窗口中看到 MQTT X 发送的消息。 

  3. We will see the message sent by MQTT X in the Flask running window.

    发布消息

    1. 订阅 /flask/mqtt MQTT X 中的主题。

      Subscribe to the /flask/mqtt topic in MQTT X.

    2. 使用邮递员调用 /publish API:将消息 Hello from Flask 发送到 /flask/mqtt 主题。

      Use Postman to call the /publish API: Send the message Hello from Flask to the /flask/mqtt topic.

    3. 完整代码

       

      from flask import Flask, request, jsonify
      from flask_mqtt import Mqtt
      
      app = Flask(__name__)
      
      app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
      app.config['MQTT_BROKER_PORT'] = 1883
      app.config['MQTT_USERNAME'] = ''  # Set this item when you need to verify username and password
      app.config['MQTT_PASSWORD'] = ''  # Set this item when you need to verify username and password
      app.config['MQTT_KEEPALIVE'] = 5  # Set KeepAlive time in seconds
      app

      on_connect()
      def handle_connect(client, userdata, flags, rc):
      如果 RC == 0:
      打印(“连接成功”)
      mqtt_client.订阅(主题) # 订阅主题
      还:
      打印(’连接不良。代码:’, rc)

      @mqtt_client.on_message()
      def handle_mqtt_message(client, userdata, message):
      数据 = 字典(
      topic=message.topic,
      payload=message.payload.decode()
      )
      print(’收到关于主题的消息: {topic} 有效负载: {payload}’.format(**data))

      @app.route(’/publish’, methods=[‘POST’])
      def publish_message():
      request_data = request.get_json()
      publish_result = mqtt_client.publish(request_data[‘topic’], request_data[‘msg’])
      return jsonify({‘code’: publish_result[0]})

      如果__name__ == ‘__main__’:
         app.run(host=’127.0.0.1’, port=5000)


      局限性

      Flask-MQTT 目前不适合与多个工作线程实例一起使用。因此,如果您使用像 geventgunicorn 这样的WSGI服务器,请确保您只有一个worker实例。

      总结

      到目前为止,我们已经完成了一个简单的 MQTT 客户端,使用 Flask-MQTT 并且可以在 Flask 应用程序中订阅和发布消息。

Comments are closed.