主机参考:VPS测评参考推荐/专注分享VPS服务器优惠信息!若您是商家可以在本站进行投稿,查看详情!此外我们还提供软文收录、PayPal代付、广告赞助等服务,查看详情! |
我们发布的部分优惠活动文章可能存在时效性,购买时建议在本站搜索商家名称可查看相关文章充分了解该商家!若非中文页面可使用Edge浏览器同步翻译!PayPal代付/收录合作 |
1。安装并启动kafka
确保已安装它并启动Kafka群集。 对于安装和启动,您可以参考官方的Kafka文档。
2。创建一个主题
如果您尚未创建主题,则可以使用以下命令创建它:配置消费者组
消费者组主要在消费者配置文件中配置。 以下是一些常见的布局:
3.1 group.ID
每个消费者组都必须具有一个唯一的组。ID
Bootstrap.server.servers = localhost:9092 3.3 key.Deserialializer and value.Deserializer
键。
auto.offset.reset =初始#或最新
3.5 enable.auto.commit
是否自动发送偏移。
eNabable.auto.commit = true
auto.commit.interval.ms = 1000
3.6 session.timeout.ms
session.timeout.ms = 30000
3.7 max.poll.records
每个呼叫返回到每个呼叫()方法的最大记录数量。
max.poll.records = 500
4。写消费者代码
在Java中创建消费者代码,一个示例是:org.apache.kafka.clients.consumer.consumer.kafkaconsumer;
导入java.time.duration;
导入java.util.collections;
导入java.util.properties;特性();
props.put(Bootstrap.Servers,Localhost:9092);
props.put(group.id,your_group_id);
props.put(key.deserial(key.deserial)ivator,org.apache.kafka.common.serialization.stringDeserializer);
props.put(value.deserializer,org.apache.kafka.common.common.common.serialization.stringDeserializer); props.put(session.timeout.ms,30000);
props.put(session.timeout.ms,30000);
props.put(max.poll.records,500);
kafkaconsumer字符串,字符串消费者=新的kafkaconsumer(props); ] {
消费者se(); 5。消费者
编译并运行消费者代码以运行。使用Kafka专有命令行工具监视和管理消费者组
监视和管理消费者组。
kafka -consumer -groups.sh -bootstrap -server localhost:9092 -group your_group_id-描述
您可以使用上述步骤成功地配置并运行KAFKA消费者组。
这几篇文章你可能也喜欢:
- Debian和Redhat之间默认文件系统之间的差异(Redhat和Debian之间的差异)
- 易于配置Debian和Redhat网络(Debian和Linux)
- 如何配置Kafka副本因子(Kafka副本和分区的数量)
- Debian和Redhat软件来源最多的人(Redhatyum来源)
- 更友好,debian或redhat用户界面(redhat桌面)
本文由主机参考刊发,转载请注明:如何配置Kafka消费者组(KAFKA SET消费者组) https://zhujicankao.com/147082.html
评论前必须登录!
注册