本文首次发表在 MQTT 上。酷博客。
目前市场上现有的大多数 MQTT 代理为 WebSockets 提供本机支持,从而使任何 MQTT JavaScript 库能够将 MQTT 消息封装到 WebSocket 帧中来建立通信:这称为通过 WebSocket 的 MQTT
这种方法最大的好处是允许所有现代浏览器,包括那些运行在智能手机上,发送和接收MQTT消息。这具有将协议扩展到 Web 的效果,使其对 M2H(机器到人)方案也越来越有吸引力。
然而,互联网在数据包丢失和可用带宽方面的不可预见性,特别是在移动网络上,使得这种”哑管”方法相当不可靠。例如,由于网络、浏览器和用户过载的风险,将 IoT 传感器生成的所有大型实时数据发送到基于浏览器的应用程序和移动应用程序可能既不可行又毫无用处。在这些情况下,动态限制是关键。应限制通过 WebSocket 流的数据,以便通过重新采样来适应可用带宽。这样,不同客户端在不同的网络上订阅相同的高频 MQTT 主题,即使具有微小的带宽连接,也会看到最新的数据。
Mqtt。酷超越网络套接字
MQTT 是位于 Web 客户端和经纪商之间的优化网关。Cool 能够使实时数据流更加有效。此外,如果您的基础结构不支持 WebSocket(例如,由于透明代理或阻止它们的严格公司防火墙),MQTT。Cool 会自动为每个客户端选择最佳传输,可能会回落到 HTTP 流式处理和 HTTP 长轮询。使用 MQTT。酷,WebSocket 不再是从 Web 连接到 MQTT 代理的唯一方法。
通过特殊技术(如排队、重采样和合并)应用数据限制,MQTT。无论使用何种传输,都可动态优化消息流。
可以通过两种不同的方式限制数据:
-
自适应限制:通过 MQTT。冷却自动重新采样数据动态,同时应用合并以适应任何网络拥塞。
-
客户端控制限制:通过此限制,每个客户端可以为其下游通道显式配置最大带宽,以及为每个扇出订阅的最大更新频率
mqtt.cool/服务器/指南/MQTT。Cool_入门_指南.html_shared_connection”rel=”不跟随”目标=”_blank”_入门指南,但为了完整起见:
-
共享连接是从 MQTT 的单个代理连接。冷却到MQTT经纪人,在上面不同的MQTT。冷客户端是隧道和多路复用的。
-
同样,扇出订阅是由 MQTT 完成的单个 MQTT 订阅。冷却代理以管理发布于该主题并由多个远程 MQTT 订阅的所有消息。酷客户端。
共享连接和扇出订阅是两个主要机制,使任何 MQTT 代理都能够在网络上实现大量扇出。
Mqtt。Cool 提供了极大的灵活性和高水平的优化,因为它基于Lightstreamer,这是一个高性能服务器,由全球大量顶级客户使用,可高效可靠地通过网络提供实时数据。
MQTT 限制的展示
为了更好地说明 MQTT 如何管理限制。酷,我们创建了MQTT限制演示,它侧重于客户端控制的限制。演示是在与我们的朋友从Gambit通信,这有助于我们与他们的强大的MIMIC MQTT模拟器,稍后将描述。
该演示演示了构建管理实时遥测的 Web 客户端是多么容易,更重要的是,如何进一步操作带宽和频率方面的消息传入流。
客户端可视化来自 10 个不同 IoT 传感器的实时更新,这些传感器持续检测并发布到 MQTT 代理自身和移动对象之间的距离。
为了明确限制数据和非节流数据之间的差异,为每个传感器显示两个图形:限制数据的红色点和非节流数据的橙色线。
此外,客户端允许您与 UI 交互,以准确控制全局和单独限制数据流的影响,如下图所示。
控制频率
对于每个帧,您可以处理一个滑块,以动态更改与传感器相关的传入消息可以到达的最大更新速率。这会影响红色点在帧上显示的频率,从以下动画中可以看到,其中频率选择器从”无限制”移动到”1 更新/秒”,然后移动到”3 更新/秒”cheeli.com.cn/wp-content/uploads/2020/03/screen-capture-mqtt-throttling-post-1.gif” 宽度=”668″/*
控制带宽
同样,在页面顶部,选择器允许您更改所有订阅使用的带宽,从而影响所有帧中呈现的所有红点的总体频率。在下面的动画中,带宽选择器从”有限 kpbs”更改为”5 kpbs”,然后更改为”15 kbps”:
与 UI 交互可帮助您了解带宽和频率约束如何设置由 MQTT 动态管理的上限。冷却,在不同的级别:带宽约束全局应用于连接,而频率约束分别应用于每个 MQTT 订阅。
请注意,实时更新既不缓冲也不延迟,而是重新采样和平整。换句话说,当订阅有机会更新(基于循环算法)时,它将接收最新的可用消息,而不是旧消息。
演示的体系结构
下图显示了用于构建演示的整体体系结构:
订阅者
通过使用Web 客户端 API,JavaScript客户端为每个传感器/主题提交 MQTT 订阅以接收实时数据并在相关图表上显示它们。
使用 Web 客户端 API 是必要的,因为 MQTT。Cool 不会与 Web 客户端”说话”纯 MQTT:它透明地将 MQTT 协议重新映射到光流器协议上,以便从 Lightstreamer 通过 Web 移动大量数据的容量中获益。这意味着您有本机 MQTT 客户端从一侧直接连接到 MQTT 代理,Web 客户端连接到 MQTT。冷却 Web 客户端 API 来自另一边。
发布者
正如已经预料到的,我们利用了MIMIC MQTT模拟器提供的灵活性和功率,通过它非常容易地生成无限范围的模拟场景,因为它能够生成任意、可自定义、可扩展和可预测的遥测数据。
Gambit 家伙在其MQTT 实验室上部署了模拟 IoT 传感器,以发布以子环波生成的距离变化,每个传感器具有不同的频率以显示不同的流量模式mqtt.cool:1883。说到这一点,作为经纪人公开访问,随时使用它的任何测试目的!
网关
作为订阅者和经纪商 MQTT 之间的中介。酷服务器扮演真正的 Web 网关的角色。演示的实时版本连接到https://cloud.mqtt.cool,这是我们的在线MQTT的地址。酷实例。如进一步详细介绍,您可以在演示的本地副本中将其替换为您自己的实例。
挖掘代码
该演示基于 jQuery,并且,如前所述,已使用MQTT 开发。酷 Web 客户端 API,支持另外两个小库:
-
用于图形渲染的 Flotr2
-
用于滑块可视化和操作的范围滑块.js
让我们首先检查从js/app.js文件获取的一些有趣的代码片段,其中定义了所有应用程序代码。
主机地址
在文件的顶部,您可以找到 MQTT 的地址。酷服务器和 MQTT 代理:
$(function() {
// Define urls for MQTT.Cool and the external MQTT broker.
const MQTT_COOL_URL = 'http://localhost:8080';
const BROKER_URL = 'tcp://broker.mqtt.cool:1883';
...
})
如果要在笔记本电脑上克隆项目并针对其他 MQTT。酷服务器,请 MQTT_COOL_URL
根据您的环境替换常量(请注意,这一点不适用于 MQTT 代理,因为 MIMIC 模拟器当前正在将数据发布到我们的云代理)。
传感器
定义其他变量后,模拟远程 IoT 传感器由传感器构造函数抽象,该函数由传感器 ID和帧 ID标识:
-
传感器 ID 用于形成由 MIMIC 模拟器发布遥测数据的主题,用于该模拟传感器。本主题用于提交 MQTT 订阅。
-
框架 ID 用于查找 HTML 页面上用于呈现动画图形的区域。
function Sensor (sensorId, frameId) {
this.sensorId = sensorId;
this.frameId= frameId;
this.topic = '/gambit/' + this.sensorId + '/telemetry';
...
以下阵列列出了 Gambitt 提供的所有传感器:
const SENSORS = [
new Sensor('20:19:AB:F4:0D:0D', 'sensor1'),
new Sensor('20:19:AB:F4:0D:0E', 'sensor2'),
new Sensor('20:19:AB:F4:0D:0F', 'sensor3'),
new Sensor('20:19:AB:F4:0D:10', 'sensor4'),
new Sensor('20:19:AB:F4:0D:11', 'sensor5'),
new Sensor('20:19:AB:F4:0D:12', 'sensor6'),
new Sensor('20:19:AB:F4:0D:13', 'sensor7'),
new Sensor('20:19:AB:F4:0D:14', 'sensor8'),
new Sensor('20:19:AB:F4:0D:15', 'sensor9'),
new Sensor('20:19:AB:F4:0D:16', 'sensor10')
];
消息处理
A
实例化后,此构造函数将返回在接收消息实例(参数)时将调用的回调 要识别源传感器已发出消息,我们只需遍接所有 SENSORS 项,直到唯一可能的主题(如先前定义)与Message不匹配。 由于 MIMIC 模拟器已配置为将有效负载配置为 JSON 字符串,因此我们分析Message.payloadString字段以返回 JSON 对象: 最后,触发传感器对象以更新由 以下是 现在,让我们逐步完成连接和订阅。 首先,我们必须打开一个针对网关的新会话: 以下是 最后一个参数是
连接到 MQTT 后,将触发强制的”连接成功“回调。很酷的成功。这是通常发生 MQTT 连接的地方:提供的mqttCoolSession 参数实际上提供了创建MqttClient实例的入口点,该实例配置为在指定地址连接到代理: 然后,客户端与 最后,我们可以连接到代理,连接后,订阅每个传感器的主题: 以下是连接成功的完整版本: 有关与 MQTT 建立联系的深入讨论。酷,看看“MQTT”部分。冷却连接模式”入门指南。 因为我们还希望操作全局带宽,我们必须利用 此对象管理客户端和 MQTT 之间的所有通信酷会话,存在相应的 通过这个对象,我们可以在方便的时候显式更改带宽;但我们首先必须缓存以后使用它的引用。这是LsClient来帮助我们的地方: 现在,让我们来研究一下应用程序如何允许您修改给定 IoT 传感器的最大更新频率。 我们以前看到的传感器构造函数还定义了一个内部函数(initThetheselector),它设置关联选择器的初始值。更重要的是,函数接受每次移动滑块以选择新值时调用回调: 回调只需将订阅重新提交到同一主题,但提供不同的SubscribeOptions对象( 重新订阅后,消息开始以指定的速率流动。 更新带宽的逻辑类似,但这次您必须按照 initBandwidhtSelector函数(在外部作用域中定义)初始化全局带宽选择器的状态,并且需要在处理滑块时触发回调: 所有订阅可用的最大允许带宽会立即更改 频率和带宽的操作仅对呈现为红点的传入消息产生影响,因此与从受限制的客户端请求的 MQTT 订阅相关,如前所见。 但是,直观地将受限制的数据与原始数据(即非限制数据)进行比较有助于我们更好地了解带宽和频率如何影响消息流。 这就是为什么我们在演示中使用另一种类型的 MQTT 的原因。酷连接,专用连接,其MQTT订阅接收数据,因为它们来自MQTT代理,无需任何进一步操作,这可能由客户端或MQTT触发。酷服务器。流经此连接的消息在帧中显示为连续橙色线。 以下片段显示为原始客户端所做的连接和后续订阅: 采用的模式是通常的模式。事实上,代码段与节流情况类似,但两个显著差异已突出显示: 第 4-5 行:客户端标识符(尽管不是在所有可能的客户端上假装是唯一的)会触发会话以返回将在服务器端触发专用代理连接的 MQTT 客户端。 第 7 行: 该演示展示了可用于改进 MQTT 流管理的基本技术,并很好地控制了消息流的频率和带宽 在不久的将来,新的 API 扩展(不破坏类似 Paho 的设计)将通过以下改进使客户端控制的限制变得更加容易: 订阅选项的临时添加,用于控制频率,使重新订阅不再需要。 通过MQTTCoolSession公开的显式操作直接控制带宽。 使用 MQTT 享受 MQTT 消息传送。酷,敬请期待!const MessageHandler = function (sensorType) {
return function (message) {
...
};
}
message
。const sourceSensor = SENSORS.find(function (s) {
return s.topic == message.destinationName
});
const payload = JSON.parse(message.payloadString);
sensorType
(”节流”或”原始”根据关联的客户端 MessageHandler
)向上查看的图表,传递有效负载携带的值:sourceSensor.update(sensorType, payload);
MessageHanlder
整体外观:const MessageHandler = function (sensorType) {
return function (message) {
const sourceSensor = SENSORS.find(function (s) {
return s.topic == message.destinationName;
});
const payload = JSON.parse(message.payloadString);
sourceSensor.update(sensorType, payload);
};
}
连接和订阅
mqttcool.openSession(MQTT_COOL_URL, 'demouser', '', {
onConnectionSuccess: function (mqttCoolSession) {
...
},
onLsClient: function (lsClient) {
...
}
}
demouser
我们的在线服务器对所有部署的实时演示所需的用户名(在这种情况下,必须提供空密码)。 throttledClient = mqttCoolSession.createClient(BROKER_URL);
MessageHandler
用于调度可限制的消息的实例相关联。为此,我们设置了MqttClient.onMessage 到达回调:throttledClient.onMessageArrived = new MessageHandler('throttled');
throttledClient.connect({
onSuccess: function () {
for (var i = 0; i < SENSORS.length; i++) {
throttledClient.subscribe(SENSORS[i].topic);
}
}
});
onConnectionSuccess: function (mqttCoolSession) {
throttledClient = mqttCoolSession.createClient(BROKER_URL);
throttledClient.onMessageArrived = new MessageHandler('throttled');
throttledClient.connect({
onSuccess: function () {
for (var i = 0; i < SENSORS.length; i++) {
throttledClient.subscribe(SENSORS[i].topic);
}
}
}
基础 API
LighststreamerClient
对象,即光流器 Web 客户端 API的入口点,在其中的 MQTT 上。构建了酷 Web 客户端 API。LightstreamerClient
对象。onLsClient: function(lsClient) {
lightstreamerClientReference = lsClient
}
频率管理
function Sensor(sensorId, frameId) {
...
// Initialize the Frequency Selector of this IoT Sensor
function initFrequencySelector(callback) {
...
}
// Trigger the Frequency Selector initialization passing the callback
initFrequencySelector(function(subOptions) {
throttledClient.subscribe(self.topic, subOptions);
});
}
subOpts
参数),该对象包含 maxFrequency
选择器提供的更新值。onSlideEnd: function (position, value) {
...
const subOptions = {};
if (value !== 'Unlimited') {
subOptions['maxFrequency'] = value;
}
onSlideEndCallback(subOptions);
}
带宽管理
LightstramerClient
上述预期使用缓存的对象。initBandwidthSelector(function (value) {
lightstreamerClientRef.connectionOptions.setMaxBandwidth(value);
});
通过调用入门指南的附录“访问光流器客户端 API”深入探讨了两个客户端 API 之间的关系,并演示如何从中获益。非节流数据
mqttcool.openSession(MQTT_COOL_URL, 'demouser', '', {
onConnectionSuccess: function (mqttCoolSession) {
const clientId = 'client-' + new Date().getTime().toString();
rawClient = mqttCoolSession.createClient(BROKER_URL, clientId);
rawClient.onMessageArrived = new MessageHandler('raw');
rawClient.connect({
onSuccess: function () {
for (var i = 0; i < SENSORS.length; i++) {
rawClient.subscribe(SENSORS[i].topic);
}
}
});
})
MessageHanlder
现在以原始传感器类型实例化,从而寻址相应的图表类型,即橙色线。结论