主机参考:VPS测评参考推荐/专注分享VPS服务器优惠信息!若您是商家可以在本站进行投稿,查看详情!此外我们还提供软文收录、PayPal代付、广告赞助等服务,查看详情! |
我们发布的部分优惠活动文章可能存在时效性,购买时建议在本站搜索商家名称可查看相关文章充分了解该商家!若非中文页面可使用Edge浏览器同步翻译!PayPal代付/收录合作 |
Kafka消费者组是Kafka的重要概念,允许消费者群体共同努力以在一个或多个主题上消费消息。 使用Kafka的消费者组的基本步骤如下:
1。创建一个消费者组
创建消费者时,必须指定消费者组ID。 该ID用于识别共享相同偏移的消费者组。
props = new properties();
props.put(Bootstrap.Servers,Localhost:9092);
props.put(group.id,my-conmumer-group); //消费者组ID
props.put(key.deserializer,key.deserializer,org.apache.kafka.common.common.serializatiat.stringdeserialializer)kafkaconsumer(props);
2。主题
消费者群体可以订阅一个或多个主题。 订阅主题后,消费者开始消费这些主题的消息。
消费者。 3。消费者消息
消费者总是投票(投票)有关主题的消息并处理它们。
while(true){
computerrecords string,字符串记录s = consumer.poll(持续时间。处理偏移
KAFKA消费者组自动管理偏移。 当消费者处理大量消息时,他们会提交偏移以使这些消息继续从正确的位置消费。
5。消费者
当消费者不再需要它时,应将其关闭以释放资源。
consumer.close(); 消费者组ID:具有相同消费者组ID的消费者共享偏移,因此消耗了相同的数据。 分区分配:KAFKA根据消费者组ID和主题中的分区数分配分区。 每个分区只能由一个消费者消费。
自动发送偏移:您可以配置enable.auto.commit,以控制是否自动发送偏移。 默认情况下,Kafka会自动发送偏移。
手动发送偏移:如果您想要更复杂的控制,则可以手动发送偏移。
props.put(enable.auto.commit,false);
while(true){
computerrocords string,字符串记录= computer.poll(lisation.ofmillis(100)); //手动同步发送偏移
您可以使用Kafka消费者组通过上述步骤消费消息。
这几篇文章你可能也喜欢:
- KAFKA消息压缩的优点是什么?
- 如何在Debian(Debian Connection WiFi命令)中实现交换加密
- 如何在Debian HDFS中实施数据加密和解密
- 如何在Debian(Debian Mount SMB)上安装加密分区
- Debian消息是否包含补丁信息?
本文由主机参考刊发,转载请注明:如何使用Kafka消费者组 https://zhujicankao.com/147926.html
评论前必须登录!
注册