使用新云网 EFM 构建 MiNiFi AI 物联网应用
启动运行本地深度学习分类、业务规则、转换、路由、压缩并有效地将数据发送到中央云位置进行处理和存储的 IoT 应用程序是很困难的。随着云泰拉现代开源流媒体工具的出现,它现在变得简单了。
边缘流管理器允许我将 EFM Web 接口上生成的拖放 GUI 代码部署到运行 MiNiFi 代理的数千个远程设备。在我的示例中,我有一个 MiNiFi Java 代理安装在树莓派与珊瑚传感器和谷歌珊瑚 TPU。我可以通过 EFM 以及逻辑部署模型。
部署 IoT 应用程序后,它将开始在设备逻辑和分类中运行它,并安全地通过 HTTP 或 Kafka/MQTT 的 TCP/IP 发送结果。一旦收到在我的中央NiFi集群,我可以做的事情,如转换数据类型,查询实时数据,并永久存储在一个快速数据存储,如阿帕奇库杜/Impala,Apache HBase,或Apache蜂巢。
您可能还喜欢:人工智能和物联网:为什么它们是一个成功的组合
blogspot.com/-OqLplj9RnWE/XlBAAMsu23I/AAAAAAAAZfo/lzElKIADx8ERKtH-OJVuf86lKeVoWThmQCLcBGAsYHQ/s1600/efmParameters.png”*
从 MiNiFi Java 代理接收的 NiFi 流
在我的CDP-DC集群的集群中,我使用从我的远程NiFi网关发送的卡夫卡消息来发布对卡夫卡的警报,并将记录推送到ApacheHBase和ApacheKudu。我们使用流式处理 SQL 筛选数据。
我们可以使用 SQL 路由、创建平均值等聚合、选择字段子集以及限制返回的数据。使用 Apache Calcite 的强大功能,NiFi 中的流式处理 SQL 是针对记录数据类型(包括 CSV、XML、Avro、Parquet、JSON 和 Grokable 文本)的游戏规则的改变者。读取和写入不同的格式,并在 SQL 完成后进行转换。或只是选择 + 从 FLOWFILE获取一切。
当我们从卡夫卡主题跟踪数据血统和来源时,我们可以看到来自Atlas的此流。
我们可以搜索卡夫卡主题的地图集。
从珊瑚卡夫卡主题到NiFi到库杜
bp.blogspot.com/-4AMYt3v_vSk/XlAupjiPtZI/AAAAAAAAZdg/B-oEeePMbCwuAvP8v3sHbZGVfgOwrx4vgCEwYBhgL/s1600/89d039dd-6a5a-1e18-bb78-321dc894022e%2540Princeton0.png”*
珊瑚卡夫卡主题详情
检查珊瑚库杜表上的 Hive 元存储数据
地图集中的 NiFi 流详细信息
有关警报主题的详细信息
来自阿特拉斯的统计数字
参见: https://wwwdev/2020/02/connecting-apache-nifi-to-apache-atlas.html
网络摄像机图像示例
JSON 记录示例
{“cputemp”:59,”id”:”20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9″,”温度”:”39.44″,”内存”:91.1,score_1 “:”0.29″,”开始时间”:”02/21/2020 14:07:13″,”label_1″:”头发喷雾”,”tempf”:”102.34″,”磁盘用法”:”50373.5 MB” “消息”:”成功”,”ambient_light”:”329.92″,”主机”:”科拉伦夫”,”cpu”:34.1,”macaddress”:”b8:27:eb:99:64:6b”,”压力”:”102.76 “,”score_2″:”0.14″,”ip”:”127.0.1.1″,”te”:”5.10″,”系统时间”:”02/21/2020 14:07:18″,”label_2″:”syringe”,”湿度”:”10.21″*
查询库杜结果在色调
将警报从 NiFi 推送到松弛
我在 Apache NiFi 1.11.3 上运行,我想指出一个新功能。下载流:将突出显示的流/p组下载为 JSON。
查看 NiFi 计数器以监控进度:
我们可以看到在珊瑚 T普图上摄用 IoT 传感器数据和运行 AI 算法是多么容易sh)
日期=$(日期 ="日期 ="%Y-%m-%d_%H%M%S")
Jpg
-W-- 图像/选择/演示/图像/$DATE2>/dev/null
库杜表 DDL
Python 3 (test.py)
parse_args()
• 准备标签。
标签 = ReadLabelFile(’/选择/演示/示例-相机/all_models/imagenet_labels.txt’)
• 初始化引擎。
引擎 = 分类引擎(’/选择/演示/示例-相机/all_models/inception_v4_299_quant_edgetpu.tflite’)
• 运行推理。
img = 图像.open(args.image)
分数 = |
kCount = 1
• 迭代推理结果
导致发动机。分类与图像(img,top_k=5):
分数[‘label_’ = str(kCount)] = 标签[结果[0]
分数[‘score_’ = str(kCount)] = “{..2f}”.格式(结果[1])
kCount = kCount = 1
环境 = 环境板()
host_name = 套接字.获取主机名()
host_ip = 套接字.gethostbyname(host_name)
cpuTemp_int(浮点(getCPU温度))
uuid2 = “{0}_{1}”.格式(”%Y%m%d%H%M%S”,gmtime()),uuid.uuid4())
用法 = psutil.disk_usage(”/”)
结束 = 时间.时间()
行.更新(分数)
行[‘主机]= os.uname()[1]
行[‘ip’] = host_ip
行[‘macaddress’] = psutil_iface(’wlan0’)
行[‘cputemp’] = 圆形(cpuTemp,2)
行 [‘te’] = “{0:.2f}”.格式((结束启动))
行[‘开始时间’] = 启动tf
行[‘系统时间]= 日期时间.日期时间.现在()strftime(’%m/%d/%Y%H:%M:%S’)
行[‘cpu’] = psutil.cpu_percent(间隔=1)
行[‘磁盘使用’] = “{:.1f} MB”.格式(浮动(使用.free) / 1024 / 1024)
行[“内存”= = psutil.virtual_memory()百分比
行[id’] = str(uuid2)
行[消息]= “成功”
行[温度]= “{0:.2f}”.格式(环境.温度)
行[‘湿度’] = “{0:.2f}”.格式(环境.湿度)
行[‘tempf’] = “{0:.2f}”.格式((环境.温度 = 1.8) = 32)
行[‘ambient_light’] = “{0}”格式(环境.ambient_light)
行[‘压力’] = “{0:.2f}”.格式(环境.压力)
msg = “温度:{0}”格式(行[温度])
msg = “IP:{0}”.格式(行[ip’)
update_display(环境显示,msg)
[ i = 2
除了:
行[消息]= “错误”
打印(json.dumps(行)”数据-朗””文本/x-python”|
xxxxxxx
2222px;”>
导入时间
导入系统
导入子流程
导入os
进口基础64
进口uuid
导入日期时间
9896px;”>
进口基础64
进口json
从时间导入gmtime时间
导入数学
导入随机字符串
导入时间
进口psutil
9896px;”>
从珊瑚。环境。板进口环境板
从卢马。核心。渲染导入画布
从PIL导入图像绘制图像字体
导入os
进口阿格帕塞
从埃地普。分类.发动机导入分类引擎
9896px;”>
导入套接字
开始时间。时间()
开始时间日期时间。日期时间.现在()。稳时('%m/%d/%Y%H:%M:%S')
defReadLabelFile(file_path):
打开(file_path'r'作为f:
1px;”[线] f。读行()
ret = |
行行:
对线。条带()。拆分(最大分割=1)
ret=int(对=0对=1]条形()
返回ret
9896px;”>
defupdate_display(显示msg):
画布(显示为绘制):
绘制。文本(( 00msg填充="白色")
defgetU 温度():
resos.popen("vcgencmd measure_temp").读行()
返回(res
替换(”C\n”,“)
• 获取本地接口的 MAC 地址
defpsutil_iface(iface):
[ 类型: (str) -> 可选 [str]
进口psutil
尼克斯·普苏蒂尔.net_if_addrs()
如果iface在nics:
1px;”[ nic = nics]iface]
我在nic:
如果我.家庭psutil.AF_LINK:
返回i。地址
• /选择/演示/示例-相机/all_models
行|
1px;”•尝试:
#i = 1
#while i = 1:
解析器argparse。参数分析器()
解析器。add_argument("-图像"帮助="要识别的图像的文件路径")。要求=真实)
args解析器。parse_args()
标签ReadLabelFile('/选择/演示/示例 - 相机/all_models/imagenet_labels.txt)
• 初始化引擎。
发动机分类引擎('/选择/演示/示例 - 相机/all_models/inception_v4_299_quant_edgetpu.tflite)
• 运行推理。
img图像。打开(args.图片)
1px;”[分数]
kCount1
• 迭代推理结果
导致发动机。分类与图像(imgtop_k=5):
分数="label_"=str(kCount标签=结果=0|
分数['score_'=str(kCount"{.2f}"。格式(结果=1|)
1px;”• kCount = kCount = 1
环境环境板()
host_name个套接字。获取主机名()
host_ip个套接字。获得主机名(host_name)
cpuTemp=int(浮点(获取CPU温度())
uuid2'{0}_{1}'.格式("百分比y%m%d%H%M%S",gmtime()),uuid 。uuid4())
1px;”[用法= psutil.disk_usage(“/”)
结束时间。时间()
行。更新(分数)
行="主机"os。uname()*1|
行="ip"host_ip
行="macaddress"psutil_iface('wlan0')
行="cputemp"轮(cpuTemp,2)
1px;”[行]“te”= “{0:.2f}”。格式((结束–开始))
行="开始时间"启动tf
行="系统时间"日期时间。日期时间.现在()。稳时('%m/%d/%Y%H:%M:%S')
行="cpu"psutil。cpu_percent(间隔=1)
行="磁盘用法""{.1f} MB"。格式(浮动(用法)。免费/1024/1024)
virtual_memory()。百分比
行="id"str(uuid2)
行="消息""成功"
行="温度""{0:.2f}"。格式(环境)。温度)
行="湿度""{0:.2f}"。格式(环境)。湿度)
行="tempf""{0:.2f}"。格式((环境)。温度=1.8×32)
1px;”[行]“ambient_light”= “{0}”。格式(环境)。ambient_light)
行="压力""{0:.2f}"。格式(环境)。压力)
msg"Temp:{0}"。格式(行="温度"*)
msg"IP:{0}"。格式(行="ip"*)
update_display(环境.显示msg
[ i = 2
1px;”*除:
行="消息""错误"
打印(json.转储(行)
源代码 (GitHub)
传感器/设备/硬件:
- 谦度-HDC2010 湿度传感器
- 光-OPT3002环境光传感器
- 大气测量-BMP280气压传感器
- PS3 眼摄像头和麦克风 USB
- 树莓派 3B+
- 谷歌珊瑚环境传感器板
- 谷歌珊瑚USB加速器TPU
引用:
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://coral.ai/docs/enviro-board/datasheet/
- https://github.com/tspannhw/nifi-minifi-coral-env
- https://github.com/tspannhw/nifi-minifi-coral
dev/2019/08/google-coral-tpu-with-edge-devices-and.html”rel=”无开式推荐者”目标=”_blank”\https://www.datainmotion.dev/2019/08/google-Coral-tpu-边缘设备-和.html