想要了解如何在 Kubernetes 上自动扩展 Kinesis Data Streams 消费者应用程序,从而节省成本并提高资源效率?该博客提供了有关如何执行此操作的分步指南。

通过利用 Kubernetes 自动扩展 Kinesis 消费者应用程序,您可以从其内置功能(如水平 Pod 自动缩放器)中受益。

什么是 Amazon Kinesis 和 Kinesis Data Streams?

Amazon Kinesis 是一个用于实时数据处理、摄取和分析的平台。Kinesis Data Streams 是一种无服务器流数据服务(Kinesis 流数据平台的一部分,以及 Kinesis Data Firehose、Kinesis Video StreamsKinesis Data Analytics)。

Kinesis 数据流可以弹性扩展,并持续适应数据摄取率和流消耗率的变化。它可用于构建实时数据分析应用程序、实时仪表板和实时数据管道。

让我们首先概述一下 Kinesis Data Streams 的一些关键概念。

Kinesis 数据流:高级架构

  • Kinesis 数据流是一组 分片。每个分片都有一系列数据记录。
  • 生产者不断将数据推送到 Kinesis Data Streams,消费者实时处理数据。
  •  分区键 用于按流中的分片对数据进行分组
  • 它使用与每个数据记录关联的分区键来确定给定数据记录属于哪个分片。
  • 使用者从 Amazon Kinesis Data Streams 获取记录,对其进行处理,并将结果存储在 Amazon DynamoDB、Amazon Redshift、Amazon S3 等中。
    • 这些使用者也称为 Amazon Kinesis Data Streams Application。
    • 开发可处理来自 KDS 数据流的数据的自定义使用者应用程序的方法之一是使用 Kinesis 客户端库KCL)。
  • Kinesis 消费类应用程序如何水平扩展?

    Kinesis 客户端库确保每个分片都有一个记录处理器运行并处理来自该分片的数据。 KCL 通过处理与分布式计算和可扩展性相关的许多复杂任务,帮助您使用和处理来自 Kinesis 数据流的数据。它连接到数据流,枚举数据流中的分片,并使用租约来协调与其使用者应用程序的分片关联。

    记录处理器为其管理的每个分片实例化。 KCL 从数据流中提取数据记录,将记录推送到相应的记录处理器,并对记录进行检查点处理。更重要的是,当工作线程实例计数更改或数据流重新分片(分片拆分或合并)时,它会平衡分片-工作线程关联(租约)。这意味着您只需添加更多实例即可扩展 Kinesis Data Streams 应用程序,因为 KCL 将自动平衡实例之间的分片。

    但是,您仍然需要一种方法在负载增加时扩展应用程序。当然,您可以手动执行此操作或构建自定义解决方案来完成此操作。

    这就是 Kubernetes 事件驱动的自动缩放 (KEDA) 可以提供帮助的地方。 KEDA 是一个基于 Kubernetes 的事件驱动的自动缩放组件,可以监控 Kinesis 等事件源,并根据需要处理的事件数量扩展底层 Deployments(和 Pods)。

    要见证自动缩放的实际应用,您将使用使用 Kinesis 客户端库 (KCL) 2 的 Java 应用程序。 它将部署到 Amazon EKS 上的 Kubernetes 集群,并将使用 KEDA.该应用程序包括一个 的 ShardRecordProcessor 实现,用于处理来自 Kinesis 流的数据并将其保存到 DynamoDB 表中。我们将使用 AWS CLI 向 Kinesis 流生成数据,并观察应用程序的扩展。

    Use the AWS CLI to produce data to the Kinesis stream and observe the scaling of the application

    在之前,我们深入研究,这里是 的 KEDA快速概述。

    什么是KEDA?

    KEDA 是一个开源的 CNCF 项目,它建立在原生 Kubernetes 原语(如 Horizontal Pod Autoscaler)之上,可以添加到任何 Kubernetes 集群中。以下是其关键组件的高级概述(您可以参考 KEDA 文档 进行深入探讨):

    1. 中的组件充当 Kubernetes 指标服务器,用于公开水平 Pod 自动缩放器的指标KEDA keda-operator-metrics-apiserver
    2. KEDA Scaler 与外部系统(如 Redis)集成,以获取这些指标(例如,列表的长度),以根据需要处理的事件数量驱动 Kubernetes 中任何容器的自动扩展。
    3. 组件的作用 keda-operator激活停用Deployment;即,缩放到零和从零开始

    sh/docs/2.10/scalers/aws-kinesis/“ href=”https://keda.sh/docs/2.10/scalers/aws-kinesis/“ rel=”noopener noreferrer“ target=”_blank“ title=”https://keda.sh/docs/2.10/scalers/aws-kinesis/“>Kinesis Stream KEDA 缩放器的实际应用,可根据 AWS Kinesis Stream 的分片计数进行扩展。 

    现在让我们继续这篇文章的实际部分。

    先决条件

    除了 AWS 账户之外,您还需要安装 AWS CLIkubectlDocker、Java 11 和 Maven

    设置 EKS 集群,创建 DynamoDB 表和 Kinesis 数据流

    您可以通过多种方式创建 Amazon EKS 集群。我更喜欢使用 eksctl CLI,因为它提供了便利。使用 创建 eksctl EKS 集群非常简单:

    eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
    

    有关详细信息,请参阅 Amazon EKS 入门 – eksctl 文档。

    创建 DynamoDB 表以保存应用程序数据。您可以使用 AWS CLI 通过以下命令创建表:

    aws dynamodb create-table \
        --table-name users \
        --attribute-definitions AttributeName=email,AttributeType=S \
        --key-schema AttributeName=email,KeyType=HASH \
        --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
    

    使用 AWS CLI 创建包含 两个 分片的 Kinesis 流:

    aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
    

    克隆此 GitHub 存储库并将其更改为正确的目录:

    git clone https://github

    sh/docs/2.8/deploy/#yaml“ href=”https://keda.sh/docs/2.8/deploy/#yaml“ rel=”noopener noreferrer“ target=”_blank“ title=”https://keda.sh/docs/2.8/deploy/#yaml“>YAML 文件进行部署KEDA。但您也可以使用 Helm 图表

    安装 KEDA

    # update version 2.8.2 if required
    
    kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
    

    验证安装:

    # check Custom Resource Definitions
    kubectl get crd
    
    # check KEDA Deployments
    kubectl get deployment -n keda
    
    # check KEDA operator logs
    kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
    

    配置 IAM 角色

    KEDA 运算符以及 Kinesis 消费者应用程序需要调用 AWS API。由于两者都将在 EKS 中以 s 形式 Deployment运行,因此我们将使用 服务账户的 IAM 角色 (IRSA) 来提供必要的权限。

    在此特定方案中:

    • KEDA 操作员需要能够获取 Kinesis 流的分片计数:它通过使用 DescribeStreamSummary API 来实现。
    • 应用程序(具体为 KCL 库)需要与 Kinesis 和 DynamoDB 交互:它需要 一堆 IAM 权限 才能执行此操作。

    为 KEDA 运营商配置 IRSA

    将您的 AWS 账户 ID 和 OIDC 身份提供商设置为环境变量:

    ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
    
    #update the cluster name and region as required
    export EKS_CLUSTER_NAME=demo-eks-cluster
    export AWS_REGION=us-east-1
    
    OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster

    oidc.issuer“ –输出文本 |sed -e “s/^https:\/\///”)

    JSON为角色创建包含受信任实体的文件:

    read -r -d '' TRUST_RELATIONSHIP <<EOF
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
          },
          "Action": "sts:AssumeRoleWithWebIdentity",
          "Condition": {
            "StringEquals": {
              "${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
              "${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
            }
          }
        }
      ]
    }
    EOF
    echo "${TRUST_RELATIONSHIP}" > trust_keda.json
    

    现在,创建 IAM 角色并附加策略(有关详细信息,请查看 policy_kinesis_keda.json 文件):

    export ROLE_NAME=keda-operator-kinesis-role
    
    aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"
    
    aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.json
    aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy
    

    关联 IAM 角色和服务账户:

    kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
    
    # verify the annotation 
    kubectl describe serviceaccount/keda-operator -n keda
    

    您需要重新启动 KEDA 运算符 Deployment 才能使其生效:

    kubectl rollout restart deployment.apps/keda-operator -n keda
    
    # to verify, confirm that the KEDA operator has the right environment variables
    kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"
    
    # expected output
    
    AWS_STS_REGIONAL_ENDPOINTS:   regional
    AWS_DEFAULT_REGION:           us-east-1
    AWS_REGION:                   us-east-1
    AWS_ROLE_ARN:                 arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-kinesis-role
    AWS_WEB_IDENTITY_TOKEN_FILE:  /var/run/secrets/eks.amazonaws.com/serviceaccount/token
    

    为 KCL 消费者应用程序配置 IRSA,以应对

    首先创建一个 Kubernetes 服务帐户:

    kubectl apply -f - <<EOF
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: kcl-consumer-app-sa
    EOF
    

    JSON为角色创建包含受信任实体的文件:

    read -r -d '' TRUST_RELATIONSHIP <<EOF
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
          },
          "Action": "sts:AssumeRoleWithWebIdentity",
          "Condition": {
            "StringEquals": {
              "${OIDC_PROVIDER}:aud": "sts

    com“,
    “${OIDC_PROVIDER}:sub”: “system:serviceaccount:default:kcl-consumer-app-sa”
    }
    }
    }
    ]
    }
    EOF学位
    echo “${TRUST_RELATIONSHIP}” > trust.json

    现在,创建 IAM 角色并附加策略(有关详细信息,请查看 policy.json 文件):

    export ROLE_NAME=kcl-consumer-app-role
    
    aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"
    
    aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.json
    
    aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/kcl-consumer-app-policy
    

    关联 IAM 角色和服务账户:

    kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
    
    # verify the annotation
    kubectl describe serviceaccount/kcl-consumer-app-sa
    

    核心基础结构现已准备就绪。让我们准备和部署使用者应用程序。

    将 KCL 消费者应用程序部署到 EKS

    您首先需要构建 Docker 映像并将其推送到 Amazon Elastic Container Registry (ECR)(有关详细信息,请参阅 )。Dockerfile

    构建 Docker 镜像并将其推送到 ECR

    # create runnable JAR file
    mvn clean compile assembly\:single
    
    # build docker image
    docker build -t kcl-consumer-app .
    
    AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
    
    # create a private ECR repo
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
    
    aws ecr create-repository --repository-name kcl-consumer-app --region us-east-1
    
    # tag and push the image
    docker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
    docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
    

    部署使用者应用程序

    更新 以 consumer.yaml 包含刚刚推送到 ECR 的 Docker 映像。清单的其余部分保持不变:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kcl-consumer
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kcl-consumer
      template:
        metadata:
          labels:
            app: kcl-consumer
        spec:
          serviceAccountName: kcl-consumer-app-sa
          containers:
            - name: kcl-consumer
              image: AWS_ACCOUNT_ID

    ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
    图像拉策略:始终
    环境:
    – 名称: STREAM_NAME
    值:运动-凯达-演示
    – 名称: TABLE_NAME
    值:用户
    – 名称:APPLICATION_NAME
    值:运动-凯达-演示
    – 名称:AWS_REGION
    值:美国东部-1
    – 名称: INSTANCE_NAME
    值来自:
    字段参考:
    字段路径:metadata.name

    Deployment创建 :

    kubectl apply -f consumer.yaml
    
    # verify Pod transition to Running state
    kubectl get pods -w
    

    KCL 应用自动缩放与 KEDA 的运行

    现在,你已经部署了使用者应用程序,库 KCL 应该可以开始操作了。它要做的第一件事是在 DynamoDB 中创建一个“控制表”,这应该与 KCL 应用程序的名称相同(在本例 kinesis-keda-demo中为 )。

    进行初始协调和创建表可能需要几分钟时间。您可以检查使用者应用程序的日志以跟踪进度。

    kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsonpath={.items..metadata.name})
    

    租约分配完成后,检查表并记下 leaseOwner 属性:

    aws dynamodb describe-table --table-name kinesis-keda-demo
    aws dynamodb scan --table-name kinesis-keda-demo
    

    kinesis-keda-demo

    现在,让我们使用 AWS CLI 将一些数据发送到 Kinesis 流。

    export KINESIS_STREAM=kinesis-keda-demo
    
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"new york"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"tel aviv"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo

    com –data $(echo -n ‘{“name”:“user4”, “city”:“seattle”}’ | base64)

    KCL 应用程序将每条记录保存到目标 DynamoDB 表(在本例中为该表命名 users )。您可以查看该表以验证记录。

    aws dynamodb scan --table-name users
    

    请注意,属性的值 processed_by ?它与 KCL 消费者 Pod相同。这将使我们能够更轻松地验证端到端自动缩放过程。

    users - items returned

    创建用于 Kinesis 的 KEDA 缩放器

    这是 ScaledObject 定义。请注意,它的目标是 kcl-consumer Deployment (我们刚刚创建的那个), shardCount 并且设置为 1

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: aws-kinesis-stream-scaledobject
    spec:
      scaleTargetRef:
        name: kcl-consumer
      triggers:
        - type: aws-kinesis-stream
          metadata:
            # Required
            streamName: kinesis-keda-demo
            # Required
            awsRegion: "us-east-1"
            shardCount: "1"
            identityOwner: "operator"
    

    KEDA创建 Kinesis 缩放器:

    kubectl apply -f keda-kinesis-scaler.yaml
    

    验证 KCL 应用程序自动扩展

    我们从我们的一个 Pod KCL 应用程序开始。但是,多亏了 KEDA,我们现在应该看到第二个 Pod 出现。

    kubectl get pods -l=app=kcl-consumer -w
    
    # check logs of the new pod
    kubectl logs -f <enter Pod name>
    

    我们的应用程序能够自动缩放到两个Pods,因为我们在定义中ScaledObject指定shardCount: "1"

    检查 kinesis-keda-demo 中的 DynamoDB控制表:您应该会看到 的 leaseOwner更新。

    kinesis-keda-demo - items returned

    让我们向 Kinesis 流发送更多数据。

    export KINESIS_STREAM=kinesis-keda-demo
    
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user5@foo.com --data $(echo -n '{"name":"user5", "city":"new york"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user6@foo.com --data $(echo -n '{"name":"user6", "city":"tel aviv"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user7@foo.com --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)
    aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user8@foo.com --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64)
    

    验证属性的值 processed_by 。由于我们已经扩展到两个 Pods,因此每条记录的值应该不同,因为每 Pod 条记录都将处理来自 Kinesis 流的记录子集。

    增加 Kinesis 流容量

    让我们将分片数量从两个扩展到三个,并继续监视 KCL 应用程序的自动扩展。

    aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
    

    一旦 Kinesis 重新分片完成,缩放器将立即采取行动, KEDA 并将 KCL 应用程序扩展到三个 Pods。

    kubectl get pods -l=app=kcl-consumer -w
    

    与之前一样,确认 Kinesis 分片租约已在 的控制DynamoDB表中更新kinesis-keda-demoleaseOwner检查属性。

    继续向 Kinesis 流发送更多数据。正如预期的那样,Pods 将共享记录处理,这将反映在表中的属性中processed_by userscom –data $(echo -n ‘{“name”:“user9”, “city”:“new york”}’ | base64)
    AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user10@foo.com –data $(echo -n ‘{“name”:“user10”, “city”:“tel aviv”}’ | base64)
    AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user11@foo.com –data $(echo -n ‘{“name”:“user11”, “city”:“new delhi”}’ | base64)
    AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user12@foo.com –data $(echo -n ‘{“name”:“user12”, “city”:“seattle”}’ | base64)
    AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user14@foo.com –data $(echo -n ‘{“name”:“user14”, “city”:“tel aviv”}’ | base64)
    AWS Kinesis put-record –stream-name $KINESIS_STREAM –分区键 user15@foo.com –data $(echo -n ‘{“name”:“user15”, “city”:“New Delhi”}’ | base64)
    AWS Kinesis put-record –stream-name $KINESIS_STREAM –partition-key user16@foo.com –data $(echo -n ‘{“name”:“user16”, “city”:“seattle”}’ | base64)

    缩小规模

    到目前为止,我们只在一个方向上扩展。当我们减少 Kinesis 流的分片容量时会发生什么?亲自尝试一下:将分片计数从三个减少到两个,看看 KCL 应用程序会发生什么。

    验证端到端解决方案后,应清理资源以避免产生任何额外费用。

    删除资源

    删除 EKS 集群、Kinesis 流和 DynamoDB 表。

    eksctl delete cluster --name keda-kinesis-demo
    aws kinesis delete-stream --stream-name kinesis-keda-demo
    aws dynamodb delete-table --table-name users
    

    结论

    在这篇文章中,您学习了如何使用 KEDA 自动扩展 KCL 使用 Kinesis 流中的数据的应用程序。

    您可以根据应用程序要求配置 KEDA 缩放器。例如,您可以将 to 设置为 并在 shardCount 3 Kinesis 流中为每三个分片设置一个 Pod 分片。但是,如果要维护一对一映射,则可以设置 shardCount to 1 和 将负责分布式协调和 KCL 租约分配,从而确保每个 Pod 实例都有一个记录处理器实例。这是一种有效的方法,允许您横向扩展 Kinesis 流处理管道以满足应用程序的需求。

    Comments are closed.