拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 检查Apache Kafka 服务器是否正在运行的指南

检查Apache Kafka 服务器是否正在运行的指南

白鹭 - 2022-08-19 2169 0 2

一、概述

使用Apache Kafka 的客户端应用程序通常属于两类,即生产者和消费者。生产者和消费者都要求底层Kafka 服务器启动并运行,然后才能分别开始生产和消费工作

在本文中,我们将学习一些确定Kafka 服务器是否正在运行的策略。

2. 使用Zookeeper 命令

找出是否存在活动代理的最快方法之一是使用Zookeeper 的dump命令。dump命令是可用于管理Zookeeper 服务器的4LW命令之一****

让我们继续使用nc命令通过正在侦听2181 端口的Zookeeper 服务器发送转储命令:

$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0

在执行命令时,我们会看到在Zookeeper 服务器上注册的临时代理ID 列表。如果不存在临时ID,则没有任何代理节点正在运行。

此外,重要的是要注意,需要在zookeeper.propertieszoo.cfg配置文件中通常可用的配置中明确允许dump命令:

lw.commands.whitelist=dump

或者,我们也可以使用Zookeeper API 来查找活动代理列表。

3. 使用Apache Kafka 的AdminClient

如果我们的生产者或消费者是Java 应用程序,那么我们可以使用Apache Kafka 的AdminClient类来确定Kafka 服务器是否已启动

让我们定义KafkaAdminClient类来包装AdminClient类的实例,以便我们可以快速测试我们的代码:

public class KafkaAdminClient {
private final AdminClient client;
public KafkaAdminClient(String bootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
props.put("request.timeout.ms", 3000);
props.put("connections.max.idle.ms", 5000);
this.client = AdminClient.create(props);
}
}

接下来,让我们在KafkaAdminClient类中定义verifyConnection()方法来验证客户端是否可以连接到正在运行的代理服务器:

public boolean verifyConnection() throws ExecutionException, InterruptedException {
Collection<Node> nodes = this.client.describeCluster()
.nodes()
.get();
return nodes != null && nodes.size() > 0;
}

最后,让我们通过连接到正在运行的Kafka 集群来测试我们的代码:

@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
boolean alive = kafkaAdminClient.verifyConnection();
assertThat(alive).isTrue();
}

4. 使用kcat实用程序

我们可以使用kcat(以前的kafkacat)命令来查看是否有正在运行的Kafka 代理节点。为此,让我们使用-L选项来显示现有主题的元数据

$ kcat -b localhost:9092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:9092/bootstrap):
1 brokers:
broker 0 at 192.168.1.53:9092 (controller)
1 topics:
topic "demo-topic" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0

接下来,让我们在代理节点关闭时执行相同的命令:

$ kcat -b localhost:9092 -t demo-topic -L -m 1
%3|1660579562.937|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)

对于这种情况,我们会收到“连接被拒绝”错误,因为没有正在运行的代理节点。此外,我们必须注意,通过使用-m选项将请求超时限制为1 秒,我们能够快速失败。

5. 使用UI 工具

对于不需要自动检查的实验性POC 项目,我们可以依靠诸如Offset Explorer之类的UI 工具但是,如果我们要验证企业级Kafka 客户端的代理节点状态,不建议使用这种方法。

让我们使用Offset Explorer 使用Zookeeper 主机和端口详细信息连接到Kafka 集群: 使用偏移资源管理器检查连接

我们可以在左侧窗格中看到正在运行的代理列表。而已。我们只需单击一下按钮即可获得它。

六,结论

在本教程中,我们探索了一些使用Zookeeper 命令、Apache 的AdminClientkcat实用程序的命令行方法,然后使用基于UI 的方法来确定Kafka 服务器是否已启动。



标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *