当你种草你的牛时,你通常会配置一个健康检查,以保持你的牛群活着。一个很常见 livenessProbe
的是关于对 GET
终结点执行请求,如果服务使用 回复, two hundred
则我们没事,否则,pod 将被销毁,并且将一个新的 pod 投入使用:
livenessProbe:
httpGet:
path: /health
port: http
与卡夫卡流,它不是那么简单。基本流应用程序从主题读取数据,执行转换,并将其放回另一个主题。特别是,它不公开任何有关其健康的信息。
让我们通过几种可能性,其中没有一个是完美的,选择最适合您的具体情况的方法。如果您有其他或更好的想法,请随时发表评论;我很乐意延长这个职位。
您可能还喜欢:在生产中监控库伯内特:如何指导(第 1 部分,共 5)。
1. 创建专用 HTTP 终结点
这听起来很容易。与 Kafka Streams 应用一起运行Java HTTP Server,它公开运行状况检查终结点以报告流状态:
import java.io.IOException;
import java.net.InetSocketAddress;
import com.sun.net.httpserver.HttpServer;
import org.apache.kafka.streams.KafkaStreams;
class Health {
private static final int OK = 200;
private static final int ERROR = 500;
private final KafkaStreams streams;
private HttpServer server;
Health(KafkaStreams streams) {
this.streams = streams;
}
void start() {
try {
server = HttpServer.create(new InetSocketAddress(Config.HOST, Config.PORT), 0);
} catch (IOException ioe) {
throw new RuntimeException("Could not setup http server: ", ioe);
}
server.createContext("/health", exchange -> {
int responseCode = streams.state().isRunning() ? OK : ERROR;
exchange.sendResponseHeaders(responseCode, 0);
exchange.close();
});
server.start();
}
void stop() {
server.stop(0);
}
}
然后,相应地配置流应用:
KafkaStreams streams = new KafkaStreams(...);
streams.start();
Health health = new Health(streams);
health.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
streams.close();
log.info("Stream stopped");
health.stop();
} catch (Exception exc) {
log.error("Got exception while executing shutdown hook: ", exc);
}
}));
这工作得很好。如果流正在运行,这意味着其状态为 或 RUNNING
REBALANCING
,则应用将回复 200
响应代码,而 Kubernetes 不会触摸该窗格。如果发生故障,将重新实例化窗格。
此方法的缺点是在每个 Kafka Streams 应用程序中包括一个 HTTP 服务器。
2. 基于 JMX 的健康检查 = 第一次尝试
Kafka Streams 应用程序通过 JMX 公开指标(如果使用以下参数启动):
-Dcom.sun.management.jmxremote.port=5555 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom
管理.jmxremote.ssl_false
当您使用 Java 监视和管理控制台(即 j.k.a. jconsole)连接时,您将可以访问几个指标:
不,不幸的是,我们还没有完成,因为应用程序的状态不在其中。
一种解决方法是监视 count
kafka.streams:type=kafka-metrics-count
对象中的指标。如果它高于 1.0
,则我假设流正在运行:
我们发现,当流死时,计数值为 1.0:
我们如何围绕这些知识进行健康检查?Kubernetes 允许我们运行 shell 命令,当成功完成时,该命令会退出 0
并视为正常运行的应用程序。
要阅读MBeans,我们可以使用Jmxterm,这是可供下载的。它可以在非交互模式下运行,读取特定的 MBean 属性 – 这正是我们的情况。运行状况检查的命令如下所示:
livenessProbe:
exec:
command:
- healthcheck.sh
healthcheck.sh脚本包含一个命令:
if [ `echo "get -s -b kafka.streams:type=kafka-metrics-count count" | java -jar jmxterm-1.0.0-uber.jar -l localhost:5555 -v silent -n` = "1.0" ] ; then exit 1; fi
这种方法有缺点:我们需要 jmxterm
提供jar文件,以及Kubernetes窗格中的脚本文件。让我们尝试先删除脚本。
3. 基于 JMX 的健康检查 = 第二次尝试
由于我们只需要知道一个特定的指标,我编写了一个专用的Java应用程序,可以从这里下载。如果属性的值为 count
1.0
,则它引发异常,并结束非零退出代码。
运行状况检查命令不再复杂,可以直接在 command
节中键入:
livenessProbe:
exec:
command:
- java
- -jar
- kafka-streams-health-check-1
0.jar
尽管如此,jar 文件仍需要成为容器的一部分。如果您不愿意这样做,让我们来探索另一个解决方案:
4. 人不能独自通过健康检查生活
如果流死亡,可能并不总是想要杀死坏的吊舱并启动一个新的吊舱。这取决于流遇到的故障。在再次启动应用之前,您可能希望收到警报,表明应用不再运行并手动修复问题,而不是以自动方式恢复窗格。
库伯内特斯定义了一种相当常见的模式,称为 sidecar
。创建一个 pod,该窗格由主容器、Kafka Streams 应用程序和伴奏(jmx 导出器应用程序)组成。由 Confluent提供的官方掌舵图表遵循此风格。
Kafka 代理、架构注册表、休息代理和 KSQL 在一侧有一个 jmx 导出器。只需看看这个部署描述符,在 prometheus-jmx-exporter
运行 KSQL 的主容器的一侧配置容器。
好处是我们遵循一种标准方法。最后,我们确实有更多指标需要研究。
无论使用副车如何,我们需要为额外的容器提供额外的资源和预算,每个 Kafka Streams 应用程序都需要一个。
使用我们的 Kafka Streams 应用程序中的副车,我们可能不再需要健康检查。仅仅依靠指标和警报并放弃运行状况检查的好处是,我们不会将 Kafka Streams 应用程序容器与其他 jar 文件弄乱。
生活中没有免费的午餐!我们必须配置一个监控系统和警报,通知我们,当Kafka流应用程序死亡 – 我们需要深入。
添加侧车容器
首先,让我们将 Kafka Streams 应用程序配置为在 docker 容器内运行。如前所述,我们还需要传递一些 com.sun.management.jmxremote
JVM 参数来公开 JMX Mbean。由于自动化 Docker 映像的构建和发布是一个好主意,因此让我们使用出色的分级插件,并在 build.gradle
文件中配置 docker 映像和应用程序本身:
docker {
javaApplication {
if (System.getenv('GOOGLE_APPLICATION_CREDENTIALS')) {
registryCredentials {
url = 'https://eu.gcr.io'
username = '_json_key'
password = file(System.getenv('GOOGLE_APPLICATION_CREDENTIALS')).text
}
}
baseImage = 'openjdk:11-jdk'
maintainer = 'Grzegorz Kocur "grzegorz.kocur@softwaremill.com"'
tag = 'eu.gcr.io/myproject/' + project.name.toLowerCase() + ':' + dockerVersion
jvmArgs = [ "-Dcom.sun.management.jmxremote.port=5555",
"-Dcom.sun.management.jmxremote.authenticate=false",
"-Dcom.sun.management.jmxremote.ssl=false"]
}
}
请注意 – 无需设置 java.rmi.server.hostname
属性,因为容器在 pod 内共享其网络命名空间并使用 相互 localhost
通信。这大大简化了事情它需要一个配置文件。最好将应用程序及其配置分离。在 Kubernetes 中,有一个专用对象 , ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: jmx-exporter-config
data:
jmx-prometheus.yml: |+
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi
lowercaseOutputName: true
lowercaseOutputLabelNames: true
ssl: false
rules:
- pattern: ".*"
这是一个非常基本的示例 – jmx 导出器将在 localhost
端口 5555
上连接到 Kafka Streams 应用程序,并读取所有指标。使用与主应用程序配置中的端口相同的端口非常重要。
在 Kubernetes 上创建 时,我们可以 ConfigMap
deployment
在 中使用它,将其安装为卷:
spec:
template:
spec:
volumes:
- name: jmx-config
configMap:
name: jmx-exporter-config
最后一步是将侧车容器添加到吊舱:
spec:
template:
spec:
containers:
- name: prometheus-jmx-exporter
image: solsson/kafka-prometheus-jmx-exporter
command:
- java
- -jar
- jmx_prometheus_httpserver.jar
- 5556
- /etc/jmx-exporter/jmx-prometheus.yml
ports:
- 5556
name: metrics
volumeMounts:
- name: jmx-config
mountPath: /etc/jmx-exporter
并使用Kubernetes 服务公开它 :
apiVersion: v1
kind: Service
metadata:
name: kafka-stream-service
labels:
app: kafka-streams
spec:
type: ClusterIP
ports:
{{ .Values.prometheus.jmx.port }}
targetPort: metrics
name: metrics
protocol: TCP
selector:
app: kafka-streams
我们建议在 Kubernetes 实例中使用普罗米塞斯运算符。然后,您可以将 配置为 ServiceMonitor
将自动将 jmx 导出器添加为 Prometheus 实例的目标:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: jmx-exporter-service
spec:
selector:
matchLabels:
app: kafka-streams
endpoints:
- port: metrics
Prometheus 读取指标后,我们可以创建警报并配置警报管理器,以便使用您选择的通信通道发送所需的通知:Slack 或短信似乎是个好主意。
同样,普罗米西乌斯运算符简化了许多事情。我们要做的就是创建一个 Kubernetes 类型的 PrometheusRule
对象:
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
labels:
app: prometheus-operator
release: prometheus-operator
name: kafka-streams
spec:
groups:
- name: kafka-streams.rules
rules:
- alert:
annotations:
{{ $labels.job }}."
expr: sum(kafka_streams_kafka_metrics_count_count) by (job) / count(kafka_streams_kafka_metrics_count_count) by (job) == 1
for: 5m
labels:
severity: critical
该 expr
字段定义警报触发器
如前所言,除了警报,我们还在普罗米西乌斯有卡夫卡流应用程序指标,我们可以使用 Grafana 可视化它们:
吃你的蛋糕,也吃它
两者,指标以及健康检查,我们可以保持Kubernetes吊舱的自我修复功能,并在恢复持续失败时得到通知。
当应用程序因数据不匹配而消亡时,第三次尝试后可能会通知我们,如果没有我们的干预,应用程序将无法运行。
足够说
监视 Kafka Streams 应用程序证明并非易事。您需要确定查看指标和可能定义警报是否满足您的 SLA 要求。在某些情况下,需要以自动方式重新启动,这就是 livenessProbe
功能启动的地方。最后,混合这两种方法应提供对应用程序的可用性和运行状况的最有信心。
如果你想向前迈出一步,看看使用Kubernetes自动缩放KafkaStreams应用程序。