想要了解如何在 Kubernetes 上自动扩展 Kinesis Data Streams 消费者应用程序,从而节省成本并提高资源效率?该博客提供了有关如何执行此操作的分步指南。
通过利用 Kubernetes 自动扩展 Kinesis 消费者应用程序,您可以从其内置功能(如水平 Pod 自动缩放器)中受益。
什么是 Amazon Kinesis 和 Kinesis Data Streams?
Amazon Kinesis 是一个用于实时数据处理、摄取和分析的平台。Kinesis Data Streams 是一种无服务器流数据服务(Kinesis 流数据平台的一部分,以及 Kinesis Data Firehose、Kinesis Video Streams 和 Kinesis Data Analytics)。
Kinesis 数据流可以弹性扩展,并持续适应数据摄取率和流消耗率的变化。它可用于构建实时数据分析应用程序、实时仪表板和实时数据管道。
让我们首先概述一下 Kinesis Data Streams 的一些关键概念。
Kinesis 数据流:高级架构
- Kinesis 数据流是一组 分片。每个分片都有一系列数据记录。
- 生产者不断将数据推送到 Kinesis Data Streams,消费者实时处理数据。
- 分区键 用于按流中的分片对数据进行分组
- 这些使用者也称为 Amazon Kinesis Data Streams Application。
- 开发可处理来自 KDS 数据流的数据的自定义使用者应用程序的方法之一是使用 Kinesis 客户端库 (
KCL
)。
Kinesis 消费类应用程序如何水平扩展?
Kinesis 客户端库确保每个分片都有一个记录处理器运行并处理来自该分片的数据。 KCL
通过处理与分布式计算和可扩展性相关的许多复杂任务,帮助您使用和处理来自 Kinesis 数据流的数据。它连接到数据流,枚举数据流中的分片,并使用租约来协调与其使用者应用程序的分片关联。
记录处理器为其管理的每个分片实例化。 KCL
从数据流中提取数据记录,将记录推送到相应的记录处理器,并对记录进行检查点处理。更重要的是,当工作线程实例计数更改或数据流重新分片(分片拆分或合并)时,它会平衡分片-工作线程关联(租约)。这意味着您只需添加更多实例即可扩展 Kinesis Data Streams 应用程序,因为 KCL
将自动平衡实例之间的分片。
但是,您仍然需要一种方法在负载增加时扩展应用程序。当然,您可以手动执行此操作或构建自定义解决方案来完成此操作。
这就是 Kubernetes 事件驱动的自动缩放 (KEDA) 可以提供帮助的地方。 KEDA
是一个基于 Kubernetes 的事件驱动的自动缩放组件,可以监控 Kinesis 等事件源,并根据需要处理的事件数量扩展底层 Deployment
s(和 Pod
s)。
要见证自动缩放的实际应用,您将使用使用 Kinesis 客户端库 (KCL
) 2 的 Java 应用程序。 它将部署到 Amazon EKS 上的 Kubernetes 集群,并将使用 KEDA
.该应用程序包括一个 的 ShardRecordProcessor
实现,用于处理来自 Kinesis 流的数据并将其保存到 DynamoDB 表中。我们将使用 AWS CLI 向 Kinesis 流生成数据,并观察应用程序的扩展。
在之前,我们深入研究,这里是 的 KEDA
快速概述。
什么是KEDA?
KEDA
是一个开源的 CNCF 项目,它建立在原生 Kubernetes 原语(如 Horizontal Pod Autoscaler)之上,可以添加到任何 Kubernetes 集群中。以下是其关键组件的高级概述(您可以参考 KEDA 文档 进行深入探讨):
- 中的组件充当 Kubernetes 指标服务器,用于公开水平 Pod 自动缩放器的指标
KEDA
keda-operator-metrics-apiserver
- KEDA Scaler 与外部系统(如 Redis)集成,以获取这些指标(例如,列表的长度),以根据需要处理的事件数量驱动 Kubernetes 中任何容器的自动扩展。
- 组件的作用
keda-operator
是 激活 和 停用Deployment
;即,缩放到零和从零开始
sh/docs/2.10/scalers/aws-kinesis/“ href=”https://keda.sh/docs/2.10/scalers/aws-kinesis/“ rel=”noopener noreferrer“ target=”_blank“ title=”https://keda.sh/docs/2.10/scalers/aws-kinesis/“>Kinesis Stream KEDA 缩放器的实际应用,可根据 AWS Kinesis Stream 的分片计数进行扩展。
现在让我们继续这篇文章的实际部分。
先决条件
除了 AWS 账户之外,您还需要安装 AWS CLI、 kubectl、 Docker、Java 11 和 Maven 。
设置 EKS 集群,创建 DynamoDB 表和 Kinesis 数据流
您可以通过多种方式创建 Amazon EKS 集群。我更喜欢使用 eksctl CLI,因为它提供了便利。使用 创建 eksctl
EKS 集群非常简单:
eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
有关详细信息,请参阅 Amazon EKS 入门 – eksctl 文档。
创建 DynamoDB 表以保存应用程序数据。您可以使用 AWS CLI 通过以下命令创建表:
aws dynamodb create-table \
--table-name users \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
使用 AWS CLI 创建包含 两个 分片的 Kinesis 流:
aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
克隆此 GitHub 存储库并将其更改为正确的目录:
git clone https://github
sh/docs/2.8/deploy/#yaml“ href=”https://keda.sh/docs/2.8/deploy/#yaml“ rel=”noopener noreferrer“ target=”_blank“ title=”https://keda.sh/docs/2.8/deploy/#yaml“>YAML 文件进行部署KEDA
。但您也可以使用 Helm 图表。
安装 KEDA
:
# update version 2.8.2 if required
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
验证安装:
# check Custom Resource Definitions
kubectl get crd
# check KEDA Deployments
kubectl get deployment -n keda
# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
配置 IAM 角色
KEDA 运算符以及 Kinesis 消费者应用程序需要调用 AWS API。由于两者都将在 EKS 中以 s 形式 Deployment
运行,因此我们将使用 服务账户的 IAM 角色 (IRSA) 来提供必要的权限。
在此特定方案中:
KEDA
操作员需要能够获取 Kinesis 流的分片计数:它通过使用DescribeStreamSummary
API 来实现。- 应用程序(具体为 KCL 库)需要与 Kinesis 和 DynamoDB 交互:它需要 一堆 IAM 权限 才能执行此操作。
为 KEDA 运营商配置 IRSA
将您的 AWS 账户 ID 和 OIDC 身份提供商设置为环境变量:
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1
OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster
oidc.issuer“ –输出文本 |sed -e “s/^https:\/\///”)
JSON
为角色创建包含受信任实体的文件:
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json
现在,创建 IAM 角色并附加策略(有关详细信息,请查看 policy_kinesis_keda.json
文件):
export ROLE_NAME=keda-operator-kinesis-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"
aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy
关联 IAM 角色和服务账户:
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/keda-operator -n keda
您需要重新启动 KEDA
运算符 Deployment
才能使其生效:
kubectl rollout restart deployment.apps/keda-operator -n keda
# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"
# expected output
AWS_STS_REGIONAL_ENDPOINTS: regional
AWS_DEFAULT_REGION: us-east-1
AWS_REGION: us-east-1
AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-kinesis-role
AWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
为 KCL 消费者应用程序配置 IRSA,以应对
首先创建一个 Kubernetes 服务帐户:
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: kcl-consumer-app-sa
EOF
JSON
为角色创建包含受信任实体的文件:
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts
com“,
“${OIDC_PROVIDER}:sub”: “system:serviceaccount:default:kcl-consumer-app-sa”
}
}
}
]
}
EOF学位
echo “${TRUST_RELATIONSHIP}” > trust.json
现在,创建 IAM 角色并附加策略(有关详细信息,请查看 policy.json
文件):
export ROLE_NAME=kcl-consumer-app-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"
aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/kcl-consumer-app-policy
关联 IAM 角色和服务账户:
kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/kcl-consumer-app-sa
核心基础结构现已准备就绪。让我们准备和部署使用者应用程序。
将 KCL 消费者应用程序部署到 EKS
您首先需要构建 Docker 映像并将其推送到 Amazon Elastic Container Registry (ECR)(有关详细信息,请参阅 )。Dockerfile
构建 Docker 镜像并将其推送到 ECR
# create runnable JAR file
mvn clean compile assembly\:single
# build docker image
docker build -t kcl-consumer-app .
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
aws ecr create-repository --repository-name kcl-consumer-app --region us-east-1
# tag and push the image
docker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
部署使用者应用程序
更新 以 consumer.yaml
包含刚刚推送到 ECR 的 Docker 映像。清单的其余部分保持不变:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kcl-consumer
spec:
replicas: 1
selector:
matchLabels:
app: kcl-consumer
template:
metadata:
labels:
app: kcl-consumer
spec:
serviceAccountName: kcl-consumer-app-sa
containers:
- name: kcl-consumer
image: AWS_ACCOUNT_ID
ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
图像拉策略:始终
环境:
– 名称: STREAM_NAME
值:运动-凯达-演示
– 名称: TABLE_NAME
值:用户
– 名称:APPLICATION_NAME
值:运动-凯达-演示
– 名称:AWS_REGION
值:美国东部-1
– 名称: INSTANCE_NAME
值来自:
字段参考:
字段路径:metadata.name
Deployment
创建 :
kubectl apply -f consumer.yaml
# verify Pod transition to Running state
kubectl get pods -w
KCL 应用自动缩放与 KEDA 的运行
现在,你已经部署了使用者应用程序,库 KCL
应该可以开始操作了。它要做的第一件事是在 DynamoDB 中创建一个“控制表”,这应该与 KCL 应用程序的名称相同(在本例 kinesis-keda-demo
中为 )。
进行初始协调和创建表可能需要几分钟时间。您可以检查使用者应用程序的日志以跟踪进度。
kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsonpath={.items..metadata.name})
租约分配完成后,检查表并记下 leaseOwner
属性:
aws dynamodb describe-table --table-name kinesis-keda-demo
aws dynamodb scan --table-name kinesis-keda-demo
现在,让我们使用 AWS CLI 将一些数据发送到 Kinesis 流。
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"new york"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo
com –data $(echo -n ‘{“name”:“user4”, “city”:“seattle”}’ | base64)
KCL 应用程序将每条记录保存到目标 DynamoDB
表(在本例中为该表命名 users
)。您可以查看该表以验证记录。
aws dynamodb scan --table-name users
请注意,属性的值 processed_by
?它与 KCL 消费者 Pod
相同。这将使我们能够更轻松地验证端到端自动缩放过程。
创建用于 Kinesis 的 KEDA 缩放器
这是 ScaledObject
定义。请注意,它的目标是 kcl-consumer
Deployment
(我们刚刚创建的那个), shardCount
并且设置为 1
:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: aws-kinesis-stream-scaledobject
spec:
scaleTargetRef:
name: kcl-consumer
triggers:
- type: aws-kinesis-stream
metadata:
# Required
streamName: kinesis-keda-demo
# Required
awsRegion: "us-east-1"
shardCount: "1"
identityOwner: "operator"
KEDA
创建 Kinesis 缩放器:
kubectl apply -f keda-kinesis-scaler.yaml
验证 KCL 应用程序自动扩展
我们从我们的一个 Pod
KCL 应用程序开始。但是,多亏了 KEDA
,我们现在应该看到第二个 Pod
出现。
kubectl get pods -l=app=kcl-consumer -w
# check logs of the new pod
kubectl logs -f <enter Pod name>
我们的应用程序能够自动缩放到两个Pods
,因为我们在定义中ScaledObject
指定shardCount: "1"
了
检查 kinesis-keda-demo
中的 DynamoDB
控制表:您应该会看到 的 leaseOwner
更新。
让我们向 Kinesis 流发送更多数据。
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user5@foo.com --data $(echo -n '{"name":"user5", "city":"new york"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user6@foo.com --data $(echo -n '{"name":"user6", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user7@foo.com --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user8@foo.com --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64)
验证属性的值 processed_by
。由于我们已经扩展到两个 Pod
s,因此每条记录的值应该不同,因为每 Pod
条记录都将处理来自 Kinesis 流的记录子集。
增加 Kinesis 流容量
让我们将分片数量从两个扩展到三个,并继续监视 KCL
应用程序的自动扩展。
aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
一旦 Kinesis 重新分片完成,缩放器将立即采取行动, KEDA
并将 KCL 应用程序扩展到三个 Pod
s。
kubectl get pods -l=app=kcl-consumer -w
与之前一样,确认 Kinesis 分片租约已在 的控制DynamoDB
表中更新kinesis-keda-demo
。leaseOwner
检查属性。
继续向 Kinesis 流发送更多数据。正如预期的那样,Pod
s 将共享记录处理,这将反映在表中的属性中processed_by
users
com –data $(echo -n ‘{“name”:“user9”, “city”:“new york”}’ | base64)
AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user10@foo.com –data $(echo -n ‘{“name”:“user10”, “city”:“tel aviv”}’ | base64)
AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user11@foo.com –data $(echo -n ‘{“name”:“user11”, “city”:“new delhi”}’ | base64)
AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user12@foo.com –data $(echo -n ‘{“name”:“user12”, “city”:“seattle”}’ | base64)
AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user14@foo.com –data $(echo -n ‘{“name”:“user14”, “city”:“tel aviv”}’ | base64)
AWS Kinesis put-record –stream-name $KINESIS_STREAM –分区键 user15@foo.com –data $(echo -n ‘{“name”:“user15”, “city”:“New Delhi”}’ | base64)
AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user16@foo.com –data $(echo -n ‘{“name”:“user16”, “city”:“seattle”}’ | base64)
缩小规模
到目前为止,我们只在一个方向上扩展。当我们减少 Kinesis 流的分片容量时会发生什么?亲自尝试一下:将分片计数从三个减少到两个,看看 KCL 应用程序会发生什么。
验证端到端解决方案后,应清理资源以避免产生任何额外费用。
删除资源
删除 EKS 集群、Kinesis 流和 DynamoDB 表。
eksctl delete cluster --name keda-kinesis-demo
aws kinesis delete-stream --stream-name kinesis-keda-demo
aws dynamodb delete-table --table-name users
结论
在这篇文章中,您学习了如何使用 KEDA
自动扩展 KCL
使用 Kinesis 流中的数据的应用程序。
您可以根据应用程序要求配置 KEDA 缩放器。例如,您可以将 to 设置为 并在 shardCount
3
Kinesis 流中为每三个分片设置一个 Pod
分片。但是,如果要维护一对一映射,则可以设置 shardCount
to 1
和 将负责分布式协调和 KCL
租约分配,从而确保每个 Pod
实例都有一个记录处理器实例。这是一种有效的方法,允许您横向扩展 Kinesis 流处理管道以满足应用程序的需求。