在本文中,我想列出从设备到云中的服务器通信的详细步骤,这些服务器运行可扩展的 Kafka基础结构,以及使用Mosquitto 代理和 Kafka 连接从 Kafka 返回设备。
您可能还喜欢:构建自定义 Kafka 连接连接器。
设备到云连接
第一步
下载并安装蚊子的窗口在这里。
第二步
导航到 D:\Program Files\mosquitto\mosquitto.conf
并修改以下内容:
# =================================================================
# Extra listeners
# =================================================================
# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0
第三步
运行命令
D:\Program Files\mosquitto>mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic" -m "This request is coming from LOCALLAPTOP007 to Avengers"
D:\Program Files\mosquitto>mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic"
第四步
在服务器上安装kafka, avengers@12.XX.XXX.XXX
并使用以下命令测试生产者-使用者消息交换。
./kafka-topics.sh --list -zookeeper 12.XX.XXX.XXX:2181
bin/kafka-console-consumer.sh --bootstrap-server 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
bin/kafka-console-producer.sh --broker-list 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
第五步
修改 /etc/hosts
并添加如下所示的条目
10.XXX.XX.XX avengers.eastus.cloudapp.azure.com
#Above is the private ip of the cloud machine
12.XX.XXX.XXX avengers.eastus.cloudapp.azure.com
#Above is the public ip of the cloud machine
第六步
在云上安装蚊子(只是为了进行测试),使用此URL上的说明。
yum install mosquitto
# service mosquitto start
# systemctl enable mosquitto
第七步
使用以下命令检查端口 1883 和 9092 是否在云中打开以进行入站流量您还可以在服务器上运行以下命令以打开它们
firewall-cmd --zone=public --add-port=1883/tcp --permanent
firewall-cmd --reload
iptables-save | grep 1883
第八步
导航到 /etc/mosquitto/mosquitto.conf 并修改以下内容:
# =================================================================
# Extra listeners
# =================================================================
# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# then the default listener will not be started.
# The port number to listen on must be given. Optionally, an ip
# address or host name may be supplied as a second argument. In
# this case, mosquitto will attempt to bind the listener to that
# address and so restrict access to the associated network and
# interface. By default, mosquitto will listen on all interfaces.
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0
第九步
使用以下命令在蚊子代理之间测试、发布和订阅:
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"
步骤 10
在此处导航并解压缩以下文件的副本:
-
kafka-connect-mqtt-1.0-SNAPSHOT.jar
-
org.eclipse.paho.client.mqttv3-1.0.2.jar
到 /opt/kafka_2.12-2.3.0/libs
。
步骤 11
导航到 /opt/kafka_2.12-2.3.0/config
server.properties
,vi,只需进行以下更改:
导航到 /opt/kafka_2.12-2.3.0/config 和 vi 连接独立属性,只需进行以下更改:
advertised.listeners=PLAINTEXT://12.XX.XXX.XXX:9092
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092
然后,导航到 /opt/kafka_2.12-2.3.0/config
, vi , mqtt.properties
并确保文件具有以下条目:
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
kafka.topic=mqtt-mosquitto-topic-kafka
mqtt.client_id=mqtt-kafka-123456789
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://avengers.eastus.cloudapp.azure.com:1883
mqtt.topic=mqtt-mosquitto-topic
步骤 12
cd /opt/kafka_2.12-2.3.0
运行上述命令后,在单独的腻子窗口中运行以下所有命令。
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
bin/kafka-console-consumer.sh --bootstrap-server avengers.eastus.cloudapp.azure.com:9092 –topic mqtt-mosquitto-topic-kafka
首先从服务器内部运行以下内容,然后从 Windows 本地便携式计算机运行以下内容。
mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t "mqtt-mosquitto-topic" -m "This request is coming from LOCALLAPTOP007 to Avengers"
确保本地笔记本电脑上的蚊子发布者发送的消息以服务器 kafka 使用者的 base64 编码格式到达com/Landoop/流反应器/发布/下载/1.2.2/卡夫卡连接-mqtt-1.2.2-2.1.0-all.tar.gz
tar -xf kafka-连接-mqtt-1.2.2-2.1.0-all.tar.gz
cp 卡夫卡连接-mqtt-1.2.2-2.1.0-all.jar /家庭/插件
第二步
在 connect.properties
配置文件夹下创建新文件,如下所示:
[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat connect.properties
# Kafka broker IP addresses to connect to
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092
# Path to directory containing the connector jar
plugin.path=/root/plugins
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
[root@avengers config]#
第三步
使用以下说明创建一个名为”mqtt-sink”的新 Kafka 主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt-sink
bin/kafka-topics.sh --list -zookeeper localhost:2181
第四步
mqtt-sink.properties
在配置文件夹下创建新文件,如下所示:
[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat mqtt-sink.properties
name=mqtt-sink
connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=mqtt-sink
connect.mqtt.hosts=tcp://avengers.eastus.cloudapp.azure.com:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.kcql=INSERT INTO /avengerskafka/test SELECT * FROM mqtt-sink
WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.progress.enabled=true
[root@avengers config]#
第五步
cd /opt/kafka_2.12-2.3.0
运行上述命令后,在单独的腻子窗口中运行以下所有命令
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect.properties config/mqtt-sink.properties
bin/kafka-console-producer.sh --broker-list avengers.eastus.cloudapp.azure.com:9092 -topic mqtt-sink
Sent a test json message like
{"id":3,"temp":21.9,"timestamp":1530511201,"Note":"This message going from My Avengers server to the device LOCALLAPTOP007"}
首先从服务器内部运行以下内容,然后从 Windows 便携式计算机运行以下内容。
mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t "/avengerskafka/test" -q 1
Verify if the message sent from server is received by the laptop
引用
相关文章
com/文章/使用网格增益-卡夫卡连接器”rel=”nofollow”_使用网格增益与卡夫卡连接器。