主机参考:VPS测评参考推荐/专注分享VPS服务器优惠信息!若您是商家可以在本站进行投稿,查看详情!此外我们还提供软文收录、PayPal代付、广告赞助等服务,查看详情! |
我们发布的部分优惠活动文章可能存在时效性,购买时建议在本站搜索商家名称可查看相关文章充分了解该商家!若非中文页面可使用Edge浏览器同步翻译!PayPal代付/收录合作 |
要查询Kafka中的主题数据,可以使用Kafka的命令行工具或编程API。 下面简单介绍一下两种常用的方法。
使用Kafka命令行工具kafka-console-consumer.sh。
首先,确保 Kafka 已安装并启动。 然后运行以下命令以使用指定主题中的数据:
./kafka-console-consumer.sh --bootstrap-server --topic --from-beginning [ h]
其中 是 Kafka 代理的地址(例如 localhost:9092) 将其替换为 将 替换为您要查询的主题的名称。 --from-beginning 参数表示从主题中最旧的记录开始消费。
这将启动一个交互式使用者,允许您查看和使用主题中的数据。 要停止使用程序,请按 Ctrl+C。
使用Kafka客户端库编程API:
您可以使用Kafka客户端库(Java、Python、Go等) 。 )编写一个程序,查询Kafka中的主题数据。 下面是使用 Python 和 confluence_kafka 库的示例。
首先,安装 confluence_kafka 库。
pip install confluence-kafka
然后,编写以下 Python 代码来消费指定主题的数据:
[h ]from confluence_kafka import Consumer, KafkaError
def Consumer_messages(kafka_broker, topic):
conf = {
'bootstrap.servers': kafka_broker,
'group.id': 'my_consumer_group',[h ] 'auto.offset.reset': '最早'
}
Consumer = Consumer(conf)
Consumer.subscribe([topic])
try:
while True:
msg = Consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f" offset 已到达分区 {msg.topic()} [{msg.partition()}] 位于 {msg.offset()}")
else:
引发 KafkaException(msg.error( ))
[小时] print(f"收到消息: {msg.value().decode('utf-8')}")
排除键盘中断:
pass
finally:[ h ] Consumer.close()
if __name__ == '__main__':
kafka_broker = ''
topic = ''
Consumer_messages(kafka_broker, topic)
将 替换为您的 Kafka 代理的地址,并将 替换为您要查询的主题的名称。 当您运行该程序时,它将显示从主题的最旧记录开始接收的消息。 要停止该程序,请按 Ctrl+C。
这几篇文章你可能也喜欢:
- kafka数据库(kafka dataflow)的性能如何?
- 如何安装Kafka数据库(kafka安装)
- kafka数据库有什么优势(kafka数据处理)
- kafka数据库适合什么场景?(kafka适合应用场景)
- 如何扩展kafka数据库(将kafka数据写入mysql)
本文由主机参考刊发,转载请注明:Kafka如何查询主题数据(kafka查询主题数据命令) https://zhujicankao.com/140923.html
评论前必须登录!
注册