当今的应用用户需要最好的用户体验。他们习惯于从所有设备(计算机、手机、平板电脑等)访问他们的应用程序。随着平台不断过渡到软件即服务 (SaaS),开发人员不断使用功能强大的工具进行协作,这些工具可承受每秒处理数千个请求的范围。这就是Apache Kafka,一个以处理高度紧张的环境而闻名的坚固工具。
在这篇文章中,我们将向您介绍Apache Kafka的基础知识,并着手使用Java和Kafka构建一个安全、可扩展的消息应用程序。
先决条件:Java8+,互联网连接,和免费的Okta开发者帐户。
阿帕奇·卡夫卡简介
Apache Kafka 是一个分布式流式处理平台,利用发布/订阅消息模式与应用程序进行交互;它旨在创建持久消息。
让我们更详细地细分这些概念。
分布式流式处理平台
当您想要运行 Kafka 时,您需要启动其代理:Kafka 在计算机上运行的简单实例,就像任何其他服务器一样。代理负责将消息发送、接收和存储到磁盘中。
单个代理不足以确保 Kafka 能够处理高吞吐量的消息。这一目标是通过许多经纪人同时合作、相互沟通和协调来实现的。
卡夫卡集群将一个或多个经纪人组合在一起。应用程序连接到群集,用于管理所有分布式详细信息,而不是连接到单个节点。
您可能还喜欢:
卡夫卡教程为大家,不管你在发展阶段。
使用持久消息发布/订阅消息系统
发布/订阅是分布式系统中的常见模式。下图说明了 Kafka 中此模式的基本结构:
该图像包括两个到目前为止未提及的组件:生产者和消费者。
生产者是向群集发送消息的应用程序。在此示例中,生产者 1、2 和 3 正在发送消息。然后,群集选择应存储它们的经纪人并将其发送到选定的代理使用者是连接到群集并接收来自生产者发布的消息的应用程序。任何对使用生产者发送的消息感兴趣的应用程序都必须连接到 Kafka 使用者。
由于 Kafka 存储消息的时间很长(默认值为 7 天),因此,即使许多使用者在发送消息时不在那里,也可以让许多使用者接收相同的消息!
卡夫卡主题
当您向Kafka 代理发送消息时,您需要通过指定主题来指定消息的发送位置。主题是使用者可以订阅的消息类别。此机制可确保使用者只接收与其相关的消息,而不是接收发布到群集的每个消息。
现在您已经了解了 Kafka 的基本体系结构,让我们下载并安装它。
安装并运行卡夫卡
要下载卡夫卡,请访问卡夫卡网站。将此压缩文件的内容提取到您首选项的文件夹中。
在 Kafka 目录中,转到 bin
该文件夹。在这里,你会发现许多bash脚本,将可用于运行Kafka应用程序。如果使用 Windows,则文件夹中也具有相同的脚本 windows
。本教程使用 Linux 命令,但如果您运行的是 Microsoft 操作系统,则只需使用等效的 Windows 版本。
启动动物园管理员来管理您的卡夫卡集群
Apache Kafka 始终作为分布式应用程序运行。这意味着群集必须在此过程中处理一些分布式挑战,例如同步配置或选择领导者来处理群集。
卡夫卡使用动物园管理员来跟踪这些细节。不过,不要担心下载它。卡夫卡已经与动物园管理员一起发货,让你快速起床和跑步。
让我们开始一个动物园管理员的例子!在 bin
Kafka 目录中的文件夹中,运行以下命令:
1px;”*./zookeeper-server-start.sh ./配置/管理员.属性
默认情况下,此命令在端口 2181 上启动 Zoo 管理员服务器。动物园管理员负责协调集群内的卡夫卡经纪人。本教程将使用 Kafka 项目中的默认配置,但始终可以根据需要更改这些值。
运行卡夫卡经纪人
下一步是运行代理本身。从另一个终端运行以下命令: bin
xxxxxxxxx
./kafka-server-start.sh . . ./配置/服务器.属性
您可能已经猜到,此命令运行 Kafka 服务器时默认端口 9092 上具有默认配置。
创建卡夫卡主题
现在,您已经运行了代理和 Zoo 管理员,您可以指定一个主题以开始从创建者发送消息。您将在文件夹中运行一个命令 bin
,就像在前面的步骤中所做的那样:
sh – 创建 — 主题我的主题 – zoo 管理员 |
本地主机:2181 — 复制因子 1 — 分区 1″ 数据朗 =”文本/x-sh”*
xxxxxxxxx
--创建--主题- 管理员|
--复制因子1-- 分区1
此命令创建一个名为指向 myTopic
您从第一个命令开始的 Zoostos 实例的主题。还必须指定两个不同的参数: replication-factor
和 partitions
。现在不要担心他们;它们用于控制与卡夫卡分布式系统相关的特定方面。在运行简单设置时,可以为这两个参数指定”1″。
现在,您已经启动并运行了所有内容,您可以开始将 Kafka 与 Java 应用程序集成!
创建 Java 和 Kafka 应用程序
让我们从项目结构开始,使用弹簧初始化程序创建应用程序。
转到https://start.spring.io并填写以下信息:
- 项目:马文项目
com.okta.javakafka
kafka-java
- 春季网。
- 阿帕奇·卡夫卡的春天
您还可以使用命令行生成项目。在终端中粘贴以下命令,它将下载具有上述相同配置的项目:
xxxxxxxxx
卷曲-d语言[java ]
-d依赖项 =Web,卡夫卡 |
-d包名称@com.okta.javakaka |
9961px;”>
-d类型=maven 项目 |
-o卡夫卡-贾瓦.zip
本教程使用 Maven,但如果您愿意,您可以轻松地使用 Gradle 遵循它。
就是这样!现在,您的 Java 项目结构已创建,您可以开始开发应用。
在 Java 应用程序中将消息推送到 Kafka 主题
创建可以推送消息的创建器的第一步是在 Java 应用程序中配置创建者。让我们创建一个配置类来做到这一点。
创建一个 src/main/java/com/okta/javakafka/configuration
文件夹,并在其中创建一个 ProducerConfiguration
类:
xxxxxxxxx
5px;”>
导入组织.阿帕奇.卡夫卡.客户端。生产者。制作人康菲;
导入组织.阿帕奇.卡夫卡.常见。序列化.字符串序列器;
导入组织.弹簧框架。上下文。注释。豆豆;
导入组织.弹簧框架。上下文。注释。配置;
导入组织.弹簧框架。卡夫卡.核心.默认卡夫卡生产者工厂;
导入组织.弹簧框架。卡夫卡.核心.卡夫卡模板;
导入组织.弹簧框架。卡夫卡
生产商工厂;
导入java。乌蒂尔.哈希地图;
导入java。乌蒂尔.地图;
公共类生产者配置|
私人静态最终字符串KAFKA_BROKER="本地主机:9092";
9961px;”>
返回新的DefaultKafkaProducerFactory<>(生产者配置));
}
公共映射<字符串对象>生产者配置() |
映射<字符串对象>配置=新的哈希Map<> (;
配置。put(生产者 Config.BOOTSTRAP_SERVERS_CONFIGKAFKA_BROKER);
配置
KEY_SERIALIZER_CLASS_CONFIG,字符串序列器.类);
配置。put(生产者 Config.VALUE_SERIALIZER_CLASS_CONFIG字符串序列器。类);
返回配置;
}
公共KafkaTemplate<字符串>kafkaTemplate() |
返回新的KafkaTemplate<>(生产商工厂());
}
9961px;”>
}
此类创建 一 ProducerFactory
个 ,它知道如何根据您提供的配置创建生产者。您还指定连接到本地 Kafka 代理,并使用 序列化密钥和值 String
。
您还宣布了一个 KafkaTemplate
Bean,用于对您的生产者执行高级操作。换句话说,模板能够执行诸如向主题发送消息和有效地隐藏隐藏隐藏细节等操作。
下一步是创建终结点以将消息发送到生产者。在 src/main/java/com/okta/javakafka/controller
包中,创建以下类:
导入组织
卡夫卡.核心.卡夫卡模板;
导入组织.弹簧框架。网络.绑定。注释。获取映射;
导入组织.弹簧框架。网络.绑定。注释。请求Param;
导入组织.弹簧框架。网络.绑定。注释。休息控制器;
导入java。乌蒂尔.列表;
公共类卡夫卡控制器|
1px;”•公共卡夫卡控制器(卡夫卡模板<字符串,字符串> 模板) ]
这个.模板=模板;
}
"/卡夫卡/生产") (
公共无效产生(字符串消息) |
模板。发送("myTopic"消息);
}
注意:由于您要发送要处理的数据, produce()
因此该方法确实应该是 POST
正如您所看到的,此终结点非常简单。它较早地注入配置的 , KafkaTemplate
并在向 myTopic
发出请求时向 发送消息 GET
/kafka/produce
。
让我们测试一下是否一切按预期工作。main
在类中运行方法 JavaKafkaApplication
。要从命令行运行,请执行以下命令:
xxxxxxxxx
./mvnw 弹簧启动:运行
您的服务器应在端口 8080 上运行,并且您已经可以针对它发出 API 请求!
转到您的 Web 浏览器并访问http://localhost:8080/kafka/produce?message=This是我的消息。
使用上述命令进行调用时,应用程序将执行 /kafka/produce
终结点,该终结点向 myTopic
Kafka 内的主题发送消息。
但是,您如何知道该命令已成功向主题发送消息?现在,您不会使用应用内的消息,这意味着您无法确定!
幸运的是,有一个简单的方法来创建一个消费者来直接测试。在 bin
Kafka 目录的文件夹中,运行以下命令:
SH-Bootstrap-服务器本地主机:9092-主题我的主题”数据朗[“文本/x-sh”*
xxxxxxxxxx
--引导服务器--主题我的专题
访问http://localhost:8080/kafka/produce?message=This是我在运行 Kafka 使用者的终端中再次看到以下消息的消息:
xxxxxxxxxx
5px;”>
这是我的信息
干得好!您可以立即停止此命令。
让我们添加一些 Java 代码来使用应用内的消息,而不是从终端执行。
在 Java 应用程序中使用来自 Kafka 主题的消息
与生产者一样,您需要添加配置以使使用者能够找到 Kafka Broker。
在 src/main/java/com/okta/javakafka/configuration
中创建以下类:
xxxxxxxxx
5px;”>
导入组织.阿帕奇.卡夫卡.客户端。消费者。消费者协会;
导入组织.阿帕奇.卡夫卡.常见。序列化.字符串反序列化器;
导入组织.弹簧框架。上下文。注释。豆豆;
导入组织.弹簧框架。上下文。注释。配置;
导入组织.弹簧框架。卡夫卡.config.并发卡夫卡接收容器工厂;
导入组织.弹簧框架。卡夫卡.核心.消费工厂;
导入组织.弹簧框架。卡夫卡
默认卡夫卡消费者工厂;
导入java。乌蒂尔.哈希地图;
导入java。乌蒂尔.地图;
公共类消费者配置|
私人静态最终字符串KAFKA_BROKER="本地主机:9092";
私人静态最终字符串GROUP_ID="卡夫卡-沙盒";
1px;”•
公共消费工厂<字符串>消费工厂() |
返回新的DefaultKafka 消费者工厂<>(消费者配置));
}
公共映射<字符串对象>使用者配置() |
映射<字符串对象>配置=新的哈希Map<> (;
put(消费者Config. BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKER”
配置。put(消费者Config.GROUP_ID_CONFIGGROUP_ID);
配置。put(消费者Config.KEY_DESERIALIZER_CLASS_CONFIG字符串反序列化器。类);
配置。put(消费者Config.VALUE_DESERIALIZER_CLASS_CONFIG字符串反序列化器。类);
返回配置;
}
9961px;”>
并发 Kafka Listener 容器工厂<字符串>工厂=新并发 Kafka Listener 容器工厂<> (;
工厂。集消费者工厂(消费者工厂());
返回工厂;
}
}
上面的代码创建一个工厂,知道如何连接到本地代理。它还会将使用者配置为对 String
键和值进行反序列化,与生产者配置匹配。
组 ID 是强制性的,Kafka 使用组 ID 来允许并行数据消耗。ConcurrentKafkaListenerContainerFactory
Bean 允许你的应用在多个线程中使用消息。
现在,您的 Java 应用已配置为在 Kafka 代理中查找使用者,让我们开始收听发送到主题的消息。
创建一个 src/main/java/com/okta/javakafka/consumer
目录,并在其中创建以下类:
数组列表;
导入 java.util.list;
@Component
公共类 MyTopic消费者 |
私有最终列表\lt;String> 消息 = 新的 ArrayList_lt;>();
@KafkaListener(主题 = “我的主题”,组 Id = “kafka-沙盒”)
公共无效侦听(字符串消息) |
同步(消息) |
消息.add(消息);
}
}
公共列表\lt;String_gt; 获取消息() |
返回消息;
}
[‘ 数据朗]”文本/x-java”]
xxxxxxxxx
导入组织.弹簧框架。卡夫卡.注释。卡夫卡·安弗特;
导入组织.弹簧框架。定型。组件;
导入java。乌蒂尔.数组列表;
1px;”*导入 java。乌蒂尔.列表;
公共类MyTopic消费者|
私有最终列表<字符串>消息=新的数组列表< > ();
主题="myTopic"组 Id="卡夫卡-沙盒") (
公共无效侦听(字符串消息) |
同步(消息) |
1px;”*消息。添加(消息);
}
}
公共列表<字符串>获取消息() |
返回消息;
}
}
此类负责侦听主题中的更改 myTopic
。它通过使用注释来这样做 KafkaListener
。每次从创建者向主题发送新消息时,你的应用都会在此类内收到一条消息。它将消息添加到收到的消息列表中,使其通过 方法可供其他类使用 getMessages()
。
接下来,让我们创建一个终结点,显示已使用的消息列表。返回 KafkaController
以 MyTopicConsumer
依赖项和方法方式添加 getMessages()
奥克塔.爪哇卡夫卡,消费者.MyTopic消费者;
进口组织.春框架.卡夫卡核心.卡夫卡模板;
导入组织弹簧框架. Web. 绑定. 注释. 获取映射;
导入组织.弹簧框架.Web.Bind.注释.请求参数;
导入组织弹簧框架. Web. 绑定. 注释. 休息控制器;
导入 java.
@RestController
公共类卡夫卡控制器 |
私人卡夫卡模板 < 字符串, 字符串 > 模板;
私人我的主题消费者我的主题消费者;
公共 Kafka 控制器 (KafkaTemplate < 字符串, 字符串 > 模板, MyTopic消费者 myTopic消费者) |
这个模板 = 模板;
这. 我的主题消费者 + 我的主题消费者;
}
@GetMapping(”/卡夫卡/生产”)
公共无效生成 (@RequestParam字符串消息) |
模板. 发送 (”我的主题”, 消息);
}
@GetMapping(”/卡夫卡/消息”)
公共列表 < 字符串 > 获取消息 () |
返回我的表位消费者.获取消息 ();
}
[ ‘ 数据朗 ] “文本/X-java” |
xxxxxxxxxx
进口网站.奥克塔.贾瓦卡夫卡.消费者。我的特人消费者;
导入组织.弹簧框架。卡夫卡.核心。卡夫卡模板;
弹簧框架。网络.绑定。注释。获取映射;
导入组织.弹簧框架。网络.绑定。注释。请求Param;
导入组织.弹簧框架。网络.绑定。注释。休息控制器;
导入java。乌蒂尔.列表;
公共类卡夫卡控制器|
私人卡夫卡模板<字符串>模板;
1px;”*私人我的主题消费者我的主题消费者;
公共Kafka 控制器(KafkaTemplate<字符串>模板MyTopic消费者myTopic消费者) |
这个.模板=模板;
这个.我的主题消费者=我的主题消费者;
}
"/卡夫卡/生产") (
公共无效产生(字符串消息) |
发送(”myTopic”,消息);
}
"/卡夫卡/消息") (
公共列表<字符串>获取消息() |
返回我的表位消费者。获取消息();
}
}
此类现在具有一个新的终结点来显示存储在使用者中的消息。调用此终结点时,它会从 Kafka 主题发送已处理的当前消息。
您的 Java 应用现在同时具有 Kafka 生产者和消费者,因此让我们一起测试一下!重新启动应用程序,然后转到http://localhost:8080/kafka/messages。
现在,没有返回任何信息让我们通过访问您的 Web 浏览器和访问由我的应用程序发送http://localhost:8080/kafka/produce?message=Message来解决此问题! .
当 Kafka 收到消息时,它会让您的消费者立刻知道。继续,在浏览器中转到http://localhost:8080/kafka/messages。现在您将看到您的消息已成功接收!
干得好!您有一个Java应用程序能够生成和使用来自Kafka的消息!在我们称它为一天之前,还有最后一步,这是一个非常重要的步骤。
保护您的 Java Kafka 应用程序
你的应用现在不是很安全。尽管您准备在分布式环境中处理许多消息,但可以找到指向终结点的链接的任何人都可以使用这些消息。这是一个关键漏洞,因此,让我们确保以正确的方式解决了它。
您将使用 OAuth 2.0 来确保只有经过身份验证的用户才能看到您的终结点。最好的部分?使用 Okta 对用户进行身份验证,只需 5 分钟即可在应用中添加此功能!
创建 Okta 帐户
如果您还没有 Okta 帐户,请继续创建一个。完成注册后,完成以下步骤:
- 登录到您的帐户。
- 转到应用程序>添加应用程序。您将被重定向到以下页面:
- 选择“Web”,然后单击”下一步“。
- 在表单中填写以下选项:
- 名字:
Bootiful Kafka
- 基本 URI:
http://localhost:8080
- 登录重定向 URL:
http://localhost:8080/login/oauth2/code/okta
- 名字:
- 单击“完成“。
现在,您已经拥有了 Okta 应用程序,您可以使用它来验证 Java 和 Kafka 应用程序中的用户在 pom.xml
标记内打开并添加以下依赖项 <dependencies>
:
xxxxxxxxx
<依赖项>
<组 Id>com.okta.spring</组 Id>
<工件 Id>okta-弹簧启动启动器</工件 Id>
3.0</版本>
</依赖项>
此库将与您刚刚创建的 Okta 应用集成。它还将将 Spring 安全添加到当前应用程序。在 中使用以下变量对其进行配置 src/main/resources/application.properties
:
xxxxxxxxxx
Okta. Oauth2. 发行人: https://[您的 OktaDomain]/oauth2/默认
Okta. oauth2. 客户端 ID: [你的客户]
1px;”•okta.oauth2.客户端秘密: [您的客户机密]
重要提示:此文件只能本地使用。不要将客户端的机密提交到 Git 或任何其他版本控制系统。
为了避免意外公开这些凭据,还可以将 Okta 应用程序的值指定为环境变量。
okta.env
使用以下环境变量在应用的根目录中创建文件。然后在source okta.env
启动应用之前运行。壳
xxxxxxxxx
11导出OKTA_OAUTH2_ISSUER=https://_yourOktaDomain}/oauth2/默认
2
9961px;”>
导出OKTA_OAUTH2_CLIENT_SECRET=[您的客户端机密]
您可以在 {yourClientID}
{yourClientSecret}
Okta UI 的应用程序页中找到 和 。要访问它,请按照以下步骤操作:
- 在 Okta 菜单中,转到“应用程序“。
- 选择“布蒂富尔·卡夫卡“应用程序。
- 单击”常规“选项卡。
您应该在客户端凭据区域中看到这两个值。
的值 {yourOktaDomain}
将在 Okta 仪表板中显示,只需单击菜单上的仪表板即可。您将在右上角看到组织 URL。
就是这样!
重新启动弹簧启动应用程序,然后转到http://localhost:8080/kafka/messages。您的应用现在将重定向到登录页面:
注:如果未提示您登录,那是因为您已登录。在隐身窗口中打开应用,您将看到上面显示的登录屏幕。
输入您的用户名和密码。如果您的登录尝试成功,您将再次被重定向回您的应用程序。
祝贺!现在,您有一个安全的 Java 应用程序,可以生成和使用来自 Kafka 的消息。
想要了解有关 Java、安全性和 OAuth 2.0 的更多详细信息?以下是您可能感兴趣的几个链接:
有关更多类似这样的文章,请关注Twitter上的@oktadev。我们还定期向我们的 YouTube 频道发布截屏视频!