Apache NiFi 现已在 1.10 中提供!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993
您现在可以使用 JDK 8 或 JDK 11!我在JDK 11中运行,它似乎有点快。一个巨大的功能是增加参数!您可以使用它们将参数传递给 Apache NiFi 无状态!
一些较小的处理器已经从主下载中移动。有关迁移提示,请参阅以下链接:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance
发行说明: https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0
示例源代码:https://github.com/tspannhw/stateless-examples
更多新功能:
- 帕奎特阅读器/作家(参见:https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html。
- 普罗米乌斯报告任务。期待更多的普罗米有斯功能即将推出。
- 实验加密内容存储库。人们以前要我这个
- 参数!!替换变量/变量注册表的时间。参数在各方面都更好。
- 用于生成和构建 NiFi 的斯瓦格 API 库的工具包模块。
- 后Slack处理器。
- 发布卡夫卡分区支持。
- 地理丰富记录处理器。
- 进程组中的远程输入端口。
- 命令行诊断。
- 罗克斯DB流文件存储库。
- 放置大查询流式处理处理器。
- nifi.analytics.预测.启用 – 打开背压预测。
- ETL/ELT 的更多查找服务:数据库记录查找服务。
- 库杜查找服务。
- HBase_2_ListLookupService。
无 国籍
首先,我们将直接从NiFi注册表在命令行中运行。然后,我们将运行从YARN!是的,您现在可以在巨大的 Cloudera CDH/HDP/CDP YARN 群集上运行 Apache NiFi 流!让我们利用你的数百个Hadoop节点。
无状态示例
让我们构建无状态流
首先要记住的是,我们希望任何可能更改的都是我们可以通过 JSON 文件传递的参数。即使对于下拉列表,设置参数也非常简单!您甚至会提示您从选择列表中选取参数。在参数可用之前,您需要将它们添加到参数列表中,并将该参数上下文分配给处理组。
处理器配置中的参数显示为[代理]
blogspot.com/-WIFwvSVBems/XcHJa4NLLVI/AAAAAAAAYw8/0xg8o3STFrIWl404rbHTnBmu2RtJJqPOwCLcBGAsYHQ/s640/kafkaProducerParameters.png”标题=”配置处理器”宽度=”640″/*
连接到进程组、控制器服务、…
应用这些参数
参数(eter)现在是属性的选项
用于使用参数的弹出提示
在参数上下文中编辑参数
我们也可以在控制器服务中配置参数bp.blogspot.com/-h52kcDu6of8/XcHJYxpRBZI/AAAAAAAAYwo/lnRPGegY2SM29F3YFtYFkqOUK-LHU5oFQCLcBGAsYHQ/s1600/addingParametersToDropDowns.png”*
如此容易选择现有的。
将它们用于任何可以更改或您不想硬编码的内容。
阿帕奇卡夫卡消费者下沉
这是一个简单的两步Apache NiFi流,从Kafka读取输出,并将输出发送到一个接收器,例如文件。
让我们确保我们使用该参数上下文
要构建 JSON 配置文件,您需要来自 Apache NiFi 注册表的存储桶 ID 和流 ID您可以在类似于http://tspann-mbp15-hw14277:18080的 URL 上浏览该注册表。
我的命令行运行程序
/用户/tspann/文档/nifi-1.10.0-SNAPSHOT/bin/nifi.sh无状态运行从注册表连续— 文件 /用户/tspann/文档/nifi-1.10.0-SNAPSHOT/logs/kafkauser.json
从注册表运行 [一次]连续* — 文件 <文件名称 >
这是使用文件从命令行运行的基本用例。流必须存在于引用 Apache NiFi 注册表中。
JSON 配置文件(卡夫卡消费者.json)
{
"registryUrl": "http://tspann-mbp15-hw14277:18080",
"bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
"flowId": "0540e1fd-c7ca-46fb-9296-e37632021945",
"ssl": {
"keystoreFile": "",
"keystorePass": "",
"keyPass": "",
"keystoreType": "",
"truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
"truststorePass": "changeit",
"truststoreType": "JKS"
},
"parameters": {
"broker" : "4.317.852.100:9092",
"topic" : "iot",
"group_id" : "nifi-stateless-kafka-consumer",
"DestinationDirectory" : "/tmp/nifistateless/output2/",
"output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output"
}
}
示例运行
12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244
内部:9092 (id: 8 机架: 空)
12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 16 到节点时获取分区 iot-5 的请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)
12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 17 到节点处获取分区 iot-2 的请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)
12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [使用者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 19 到节点处获取分区 iot-3 请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)
12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [使用者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 16 到节点时获取分区 iot-0 请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)
12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 20 到节点时获取分区 iot-1 的请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler – [使用者客户端 Id=使用者-1,groupId_nifi-无状态-卡夫卡-使用者] 为节点 8 构建了增量提取(会话 Id=1943199939,EPOCH=5)添加了 0 个分区,更改了 0 个分区,从 10 个分区中删除了 0 个分区
12:25:38.729 [主] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id=使用者-1,groupId_nifi-无状态-卡夫卡-消费者] 发送READ_UNCOMMITTED增量请求(toSend_(toForget_(),暗示=(it-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) 代理 ip-10-0-1-244.ec2.内部:9092 (ID: 8 机架: 空)
12:25:38.737 [主] DEBUG org.apache.nifi.nfi.处理器.kafka.pubsub.ConsumeKafka_2_0 – ConsumeKafka_2_0[id]e405df7f-87ca-305a-95a9-d25e3c5dbb56_ 运行 ConsumeKafka_2_0.onTrigger 与 0 FlowFiles
输出示例
cat output/247361879273711.statelessFlowFile
{"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-MBP15-HW14277","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"}
我们还可以运行一次在此示例中,以发送一个卡夫卡消息。
发电机阿帕奇卡夫卡生产者
10.0-SNAPSHOT/bin/nifi.sh 无状态
运行从注册表一次 — 文件 /用户/tspann/文档/nifi-1.10.0-SNAPSHOT/日志/
卡夫卡.json
JSON 配置文件(卡夫卡.json)
{
"registryUrl": "http://tspann-mbp15-hw14277:18080",
"bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
"flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67",
"flowVersion": "1",
"ssl": {
"keystoreFile": "",
"keystorePass": "",
"keyPass": "",
"keystoreType": "",
"truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
"truststorePass": "changeit",
"truststoreType": "JKS"
},
"parameters": {
"broker" : "3.218.152.236:9092"
}
}
输出示例
12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions. 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8. 12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0) 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries 12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org
kafka.common.metrics.metrics – 添加了带有名称主题.iot.record-错误的传感器 12:32:37.745 [main] DEBUG org.apache.nifi.参数.expression语言感知参数参数分析器 – 对于输入 iot 找到 0 参数引用: * 12:32:37.745 [主 DEBUG]org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser – For input iot found 0 Parameter references: [] Flow Succeeded 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector – [生产者客户端 Id_product-1_ 创建具有SO_RCVBUF = 33304、SO_SNDBUF = 131768、SO_TIMEOUT = 0 到节点 8 的套接字
12:32:37.717 [卡夫卡-生产者-网络线程 ] product org.apache.kafka.client.NetworkClient – [生产者客户端 Id_生产者-1] 完成与节点 8 的连接。正在获取 API 版本。
12:32:37.717 [kafka-生产者-网络线程 ] product org.apache.kafka.client.networkClient – [生产者客户端 Id_生产者-1] 启动 API 版本从节点 8 提取。
12:32:37.732 [kafka-生产者-网络线程] PRODUCT.apache.kafka.client.NetworkClient – [生产者客户端 Id_product-1] 节点 8 录制的 API 版本:(生产(0): 0 到 7 [可用: 6], 提取 (1): 0 到 10 [可用: 8], 列表偏移(2): 0 到5 [可用: 3] , 元数据 (3): 0 到 7 [可用: 6], 引线和Isr(4): 0 到 2 [可用: 1], 停止副本 (5): 0 到 1 [可用: 0], 更新元数据 (6): 0 到 5 [可用: 4], 受控关机 (7): 0 到 2 [可用: 1], 偏移提交(8): 0 到 6 [可用: 4]: 0 到 5 [可用: 4], 查找协调器 (10): 0 到 2 [可用: 2] , JoinGroup (11): 0 到 4 [可用: 3], 心跳 (12): 0 到 2 [可用: 2], 离开组 (13): 0 到 2 [可用: 2], 同步组 (14): 0 到 2 [可用: 2] ,描述组 (15): 0 到 2 [可用]: 2*, 列表组(16):0到2[可用:2],SaslHandshake(17):0到1[可用:1],ApiVersions(18):0到2[可用:2],创建主题(19):0到3[可用:3],删除主题(20):0到3[可用:2],删除记录(21):0到1[可用:1],InitProducerId(22)到: 1], 偏移为引线(23): 0 到 2 [可用: 1], AddpartitionsToTxn(24):0到1[可用:1],AddOffsetsToTxn(25):0到1[可用:1],EndTxn(26):0到1[可用:1],写入TxnMarkers(27):0[可用:0],TxnOffset提交(28):0到2[可用:1],描述Acls(29)): 0 到 1 [可用: 1], 创建 Acls (30): 0 到 1 [可用: 1], 删除 Acls (31): 0 到 1 [可用: 1], 描述 Configs (32): 0 到 2 [可用: 2], AlterConfigs (33): 0 到 1 [可用: 1], Alter 复制日志Dirs (34): 0 到 1 [可用: 1]: 1], Sasl身份验证 (36): 0 到 1 [可用: 0], 创建分区 (37): 0 到 1 [可用: 1], 创建委派令牌 (38): 0 到 1 [可用: 1], 续订委派令牌 (39): 0 到 1 [可用: 1], 过期委派令牌 (40): 0 到 1 [可用: 1],描述委派令牌(41): 0 到 1 [可用: 1], 删除组 (42): 0 到 1 [可用: 1], 未知 (43): 0)
12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.记录的传感器。”
12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.bytes 的传感器
12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.压缩率的传感器
12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.record-retries 的传感器
12:32:37.740 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.记录错误的传感器
12:32:37.745 [main] DEBUG org.apache.nifi.参数.表达式语言感知参数参数 – 对于输入 iot 找到 0 参数引用: |
12:32:37.745 [main] DEBUG org.apache.nifi.参数.表达式语言感知参数参数 – 对于输入 iot 找到 0 参数引用: |
流成功
其他运行时选项:
RunYARNServiceFromRegistry <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>
RunOpenwhiskActionServer <Port>
引用:
- 关于 NiFi 1 的真棒文章
com/@abdelkrim.hadjidj/apache-nifi-1-10 系列简化错误处理-7de86f130acd”\https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10 系列简化错误处理-7de86f130acd
在进程组内添加 S2S 端口
帕奎特读者
帕奎特记录集