stethoscope-on-bed

当你种草你的牛时,你通常会配置一个健康检查,以保持你的牛群活着。一个很常见 livenessProbe 的是关于对 GET 终结点执行请求,如果服务使用 回复, two hundred 则我们没事,否则,pod 将被销毁,并且将一个新的 pod 投入使用:

livenessProbe:
  httpGet:
    path: /health
    port: http

卡夫卡流,它不是那么简单。基本流应用程序从主题读取数据,执行转换,并将其放回另一个主题。特别是,它不公开任何有关其健康的信息。

让我们通过几种可能性,其中没有一个是完美的,选择最适合您的具体情况的方法。如果您有其他或更好的想法,请随时发表评论;我很乐意延长这个职位。

您可能还喜欢:在生产中监控库伯内特:如何指导(第 1 部分,共 5)。

1. 创建专用 HTTP 终结点

这听起来很容易。与 Kafka Streams 应用一起运行Java HTTP Server,它公开运行状况检查终结点以报告流状态:

import java.io.IOException;
import java.net.InetSocketAddress;

import com.sun.net.httpserver.HttpServer;
import org.apache.kafka.streams.KafkaStreams;

class Health {

  private static final int OK = 200;
  private static final int ERROR = 500;

  private final KafkaStreams streams;
  private HttpServer server;

  Health(KafkaStreams streams) {
    this.streams = streams;
  }

  void start() {
    try {
      server = HttpServer.create(new InetSocketAddress(Config.HOST, Config.PORT), 0);
    } catch (IOException ioe) {
      throw new RuntimeException("Could not setup http server: ", ioe);
    }
    server.createContext("/health", exchange -> {
      int responseCode = streams.state().isRunning() ? OK : ERROR;
      exchange.sendResponseHeaders(responseCode, 0);
      exchange.close();
    });
    server.start();
  }

  void stop() {
    server.stop(0);
  }
}

然后,相应地配置流应用:

    KafkaStreams streams = new KafkaStreams(...);
    streams.start();

    Health health = new Health(streams);
    health.start();

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      try {
        streams.close();
        log.info("Stream stopped");
        health.stop();
      } catch (Exception exc) {
        log.error("Got exception while executing shutdown hook: ", exc);
      }
    }));

这工作得很好。如果流正在运行,这意味着其状态为 或 RUNNING REBALANCING ,则应用将回复 200 响应代码,而 Kubernetes 不会触摸该窗格。如果发生故障,将重新实例化窗格。

此方法的缺点是在每个 Kafka Streams 应用程序中包括一个 HTTP 服务器。

2. 基于 JMX 的健康检查 = 第一次尝试

Kafka Streams 应用程序通过 JMX 公开指标(如果使用以下参数启动):

-Dcom.sun.management.jmxremote.port=5555 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom

管理.jmxremote.ssl_false

当您使用 Java 监视和管理控制台(即 j.k.a. jconsole)连接时,您将可以访问几个指标:

Using jconsole to connect to the MBean Server exposed by the Kafka Streams application

使用 jconsole 连接到 Kafka Streams 应用程序公开的 MBean 服务器

不,不幸的是,我们还没有完成,因为应用程序的状态不在其中。

一种解决方法是监视 count kafka.streams:type=kafka-metrics-count 对象中的指标。如果它高于 1.0 ,则我假设流正在运行:

count metric for a healthy stream

盘点正常流的指标

我们发现,当流死时,计数值为 1.0:

count metric for a dead stream

计算死流的指标

我们如何围绕这些知识进行健康检查?Kubernetes 允许我们运行 shell 命令,当成功完成时,该命令会退出 0 并视为正常运行的应用程序。

要阅读MBeans,我们可以使用Jmxterm,这是可供下载的。它可以在非交互模式下运行,读取特定的 MBean 属性 – 这正是我们的情况。运行状况检查的命令如下所示:

livenessProbe:
  exec:
    command:
    - healthcheck.sh

healthcheck.sh脚本包含一个命令:

if [ `echo "get -s -b kafka.streams:type=kafka-metrics-count count" | java -jar jmxterm-1.0.0-uber.jar -l localhost:5555 -v silent -n` = "1.0" ] ; then exit 1; fi

这种方法有缺点:我们需要 jmxterm 提供jar文件,以及Kubernetes窗格中的脚本文件。让我们尝试先删除脚本。

3. 基于 JMX 的健康检查 = 第二次尝试

由于我们只需要知道一个特定的指标,我编写了一个专用的Java应用程序,可以从这里下载。如果属性的值为 count 1.0 则它引发异常,并结束非零退出代码。

运行状况检查命令不再复杂,可以直接在 command 节中键入:

livenessProbe:
  exec:
    command:
    - java
    - -jar
    - kafka-streams-health-check-1

0.jar

尽管如此,jar 文件仍需要成为容器的一部分。如果您不愿意这样做,让我们来探索另一个解决方案:

4. 人不能独自通过健康检查生活

如果流死亡,可能并不总是想要杀死坏的吊舱并启动一个新的吊舱。这取决于流遇到的故障。在再次启动应用之前,您可能希望收到警报,表明应用不再运行并手动修复问题,而不是以自动方式恢复窗格。

库伯内特斯定义了一种相当常见的模式,称为 sidecar 。创建一个 pod,该窗格由主容器、Kafka Streams 应用程序和伴奏(jmx 导出器应用程序)组成。由 Confluent提供的官方掌舵图表遵循此风格。

Kafka 代理、架构注册表、休息代理和 KSQL 在一侧有一个 jmx 导出器。只需看看这个部署描述符,在 prometheus-jmx-exporter 运行 KSQL 的主容器的一侧配置容器。

好处是我们遵循一种标准方法。最后,我们确实有更多指标需要研究。

无论使用副车如何,我们需要为额外的容器提供额外的资源和预算,每个 Kafka Streams 应用程序都需要一个。

使用我们的 Kafka Streams 应用程序中的副车,我们可能不再需要健康检查。仅仅依靠指标和警报并放弃运行状况检查的好处是,我们不会将 Kafka Streams 应用程序容器与其他 jar 文件弄乱。

生活中没有免费的午餐!我们必须配置一个监控系统和警报,通知我们,当Kafka流应用程序死亡 – 我们需要深入。

添加侧车容器

首先,让我们将 Kafka Streams 应用程序配置为在 docker 容器内运行。如前所述,我们还需要传递一些 com.sun.management.jmxremote JVM 参数来公开 JMX Mbean。由于自动化 Docker 映像的构建和发布是一个好主意,因此让我们使用出色的分级插件,并在 build.gradle 文件中配置 docker 映像和应用程序本身:

docker {
    javaApplication {
        if (System.getenv('GOOGLE_APPLICATION_CREDENTIALS')) {
            registryCredentials {
                url = 'https://eu.gcr.io'
                username = '_json_key'
                password = file(System.getenv('GOOGLE_APPLICATION_CREDENTIALS')).text
            }
        }
        baseImage = 'openjdk:11-jdk'
        maintainer = 'Grzegorz Kocur "grzegorz.kocur@softwaremill.com"'
        tag = 'eu.gcr.io/myproject/' + project.name.toLowerCase() + ':' + dockerVersion
        jvmArgs = [ "-Dcom.sun.management.jmxremote.port=5555",
                    "-Dcom.sun.management.jmxremote.authenticate=false",
                    "-Dcom.sun.management.jmxremote.ssl=false"]
    }
}

请注意 – 无需设置 java.rmi.server.hostname 属性,因为容器在 pod 内共享其网络命名空间并使用 相互 localhost 通信。这大大简化了事情它需要一个配置文件。最好将应用程序及其配置分离。在 Kubernetes 中,有一个专用对象 , ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: jmx-exporter-config
data:
  jmx-prometheus.yml: |+
    jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    ssl: false
    rules:
      - pattern: ".*"

这是一个非常基本的示例 – jmx 导出器将在 localhost 端口 5555 上连接到 Kafka Streams 应用程序,并读取所有指标。使用与主应用程序配置中的端口相同的端口非常重要。

在 Kubernetes 上创建 时,我们可以 ConfigMap deployment 在 中使用它,将其安装为卷:

spec:
  template:
    spec:
      volumes:
        - name: jmx-config
          configMap:
            name: jmx-exporter-config

最后一步是将侧车容器添加到吊舱:

spec:
  template:
    spec:
      containers:
        - name: prometheus-jmx-exporter
          image: solsson/kafka-prometheus-jmx-exporter
          command:
            - java
            - -jar
            - jmx_prometheus_httpserver.jar
            - 5556
            - /etc/jmx-exporter/jmx-prometheus.yml
          ports:
            - 5556
              name: metrics
          volumeMounts:
            - name: jmx-config
              mountPath: /etc/jmx-exporter

并使用Kubernetes 服务公开它 :

apiVersion: v1
kind: Service
metadata:
  name: kafka-stream-service
  labels:
    app: kafka-streams
spec:
  type: ClusterIP
  ports:
    {{ .Values.prometheus.jmx.port }}
      targetPort: metrics
      name: metrics
      protocol: TCP
  selector:
    app: kafka-streams

我们建议在 Kubernetes 实例中使用普罗米塞斯运算符。然后,您可以将 配置为 ServiceMonitor 将自动将 jmx 导出器添加为 Prometheus 实例的目标:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: jmx-exporter-service
spec:
  selector:
    matchLabels:
      app: kafka-streams
  endpoints:
    - port: metrics

Prometheus 读取指标后,我们可以创建警报并配置警报管理器,以便使用您选择的通信通道发送所需的通知:Slack 或短信似乎是个好主意。

同样,普罗米西乌斯运算符简化了许多事情。我们要做的就是创建一个 Kubernetes 类型的 PrometheusRule 对象:

apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  labels:
    app: prometheus-operator
    release: prometheus-operator
  name: kafka-streams
spec:
  groups:
    - name: kafka-streams.rules
      rules:
        - alert: 
          annotations:
            {{ $labels.job }}."
          expr: sum(kafka_streams_kafka_metrics_count_count) by (job) / count(kafka_streams_kafka_metrics_count_count) by (job) == 1
          for: 5m
          labels:
            severity: critical

expr 字段定义警报触发器

如前所言,除了警报,我们还在普罗米西乌斯有卡夫卡流应用程序指标,我们可以使用 Grafana 可视化它们:

Kafka streams

卡夫卡溪流

吃你的蛋糕,也吃它

两者,指标以及健康检查,我们可以保持Kubernetes吊舱的自我修复功能,并在恢复持续失败时得到通知。

当应用程序因数据不匹配而消亡时,第三次尝试后可能会通知我们,如果没有我们的干预,应用程序将无法运行。

足够说

监视 Kafka Streams 应用程序证明并非易事。您需要确定查看指标和可能定义警报是否满足您的 SLA 要求。在某些情况下,需要以自动方式重新启动,这就是 livenessProbe 功能启动的地方。最后,混合这两种方法应提供对应用程序的可用性和运行状况的最有信心。

如果你想向前迈出一步,看看使用Kubernetes自动缩放KafkaStreams应用程序。

进一步阅读

Comments are closed.