kafkaconsumergroups解释
大家好,这里是codeshow。
这一次,我们将了解kafkaconsumergroups。
在实践中,consumergroups而不是将consumer直接连接到partition。
与将consumer直接连接到partition相比,consumergroups允许更多的自动化partitionconsume。
例如,要consume由 4 个partition组成的topic,您需要创建 4 个查看partition0 到 3 的consumer。
这是因为一个consumer只能consume一个partition。
但是,如果使用consumergroups,一个consumer可以consume所有四个partition。
并且当同组consumer数量增加时,consumepartition会自动rebalancing。
与直接consumepartition不同,因为它们是自动consume的,所以对操作有很多好处。
我们会实践consumergroups。
环境设置将从上一个视频继续。
运行devcontainers进行kafka练习。
等到container运行。
container准备就绪后,我们将打开docker desktop。
在kafkacontainer中执行shell。
让我们删除之前创建的topic并开始练习。
如果没有hello topic,可以跳过这一步。
kafka-topics --bootstrap-server kafka:9092 --delete --topic hello
使用kafka topics命令删除topic。
使用kafka topics命令创建一个包含两个partition的hellotopic。
kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2
在AKHQ的topic菜单中查看创建的topic。
我已经确认有两个partition。
consumekafkaconsoleconsumer命令消费partition0 和 1。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1
和之前视频中的做法一样,我们会根据连字符producekey和value 。
运行kafkaconsoleproducer。
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
可以看到有两个consumer在consume消息。
当consumer通过partitionoptionconsume时,只能consume一个partition。
因此,先前终止的consumer的partition中累积的topic无法被consume。
那么,从现在开始,我们将使用consumergroups,而不是直接consumepartition。
杀死正在运行的kafka console consumer。
向现有命令添加组option。
groupoption指定consumer所在的组的id。
输入 group id作为work。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work
按预期执行命令将导致错误。
因为partitionoption和组option不能同时输入。
只需输入两个option之一。
排除partitionoption并再次发出kafkaconsoleconsumer命令。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
现在,我们将通过AKHQ查看consumergroups的运行情况。
使用localhost8080port连接到AKHQ。
查看AKHQtopics菜单。
从topic列表中选择hellotopic。
选择consumergroups选项卡。
选择id为work 的consumergroups。
在topics选项卡中,您可以看到有关当前consume的topic和partition的信息。
请注意,consumergroups可以consume一个或多个topic。
在本练习中,我们将仅consume一个topic。
在AKHQ页面上,选择“成员”选项卡。
有client id、 id、 host和assignments列。
client id是为识别consumer而输入的值。
实践中可以看到已经输入了shell命令的默认值‘console consumer’。
clientid允许重复。
另一方面, id列的值在consumer组中自动分配一个唯一的id。
可以通过host栏查看consumer的host信息。
在assignments栏可以查看consumerconsume的topic的partition信息。
目前,它正在consumehellotopic的两个partition0 和 1。
然后在最下面的terminal中运行group id为work的consumer。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
刷新AKHQ阶段。
您可以看到又添加了一个consoleconsumer。
至于clientid,有两个consoleconsumer,
可以通过id列查看consumer是否有唯一的id值。
需要注意的一件事是assignments栏。
以前,一个consumerconsumepartition0 和 1,
由于现在有两个consumer,这两个consumer分别consumepartition0和1。
当参与consumergroups的consumer配置发生变化时,consumer可以自动进行partitionrebalancing。
请注意,由于consumer数量增加了一倍,这些consumergroups的吞吐量也增加了一倍。
partitionrebalancing自动分配consumer数量除以partitionsize。
如果partitionsize为 10 并且有两个consumer,每个消费者consume5。
反过来,我们来看一下consumersize大于partitionsize的情况。
假设有 2 个partition和 3 个consumer,consumer更大。
在terminal添加一个新的consumer,使workconsumergroups中有3个consumer。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
刷新AKHQ页面。
你可以看到有 3 个client的 group id是work 。
顺便说一句,如果你检查assignments列,你可以看到两个consumer被分配了partition,但一个consumer没有。
因此,如果consumer的数量超过partition的数量,则不会为剩余的consumer分配partition。
您需要小心,因为您可能会不必要地浪费服务器。
让我们回到我们的实验室,在terminal终止一个consumer。
快速刷新AKHQ页面。
可以看到assignments列中没有分配的partition。
如果consumer群体的consumer构成发生变化,
所有consumer停止consumepartition并等待rebalancing。
并且当partition rebalancing完成后, consumer再次consume。
由于这个过程很短,您需要快速检查才能看到它。
然后,代替使用kafka topics命令的consumer,
如果增加kafka节点的partitionsize会怎样?
剩下的consumer会立即consume新的partition吗?
我会做一个练习来确认。
添加consumer。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
刷新AKHQ页面。
您可以在assignments列中检查未分配的consumer。
现在让我们使用kafkatopics将partition数增加到 3。
kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3
但是,不同于consumer组中consumer的添加或移除,
即使你用kafka topics命令增加partition,也不会立即分配给consumer。
作为参考,在本练习中,将partition分配给consumer组大约需要 2 分钟。
可能会有延迟,所以如果你想快速处理,使用partition size alter命令后,
另一种方法是立即添加consumer。
我们处理延迟的原因是因为kafka中topic的顺序。
例如,假设必须先处理topicA,然后才能处理topicB。
但是,A 在分区 0 上,
假定 B 存储在分区 #1 上。
如果partition 0延迟1分钟, partition 1中的B先被consume,
当延迟结束,A topic通过partition 0 被consume时,
A应该先保存B,但是B先保存,所以出错。
由于kafka按partition保证顺序,
需要排序的topicA和B使用相同的key ,这样它们就可以存储在同一个partition中。
作为consumer组,当一个consumerconsume两个或多个partition时,
需要查看topic中的元信息,读取partition信息,对每个partition分别进行处理。
我将通过另一个视频中的代码来检查这一点。
最后,让我们看一下管理consumergroups中的offset。
直接使用现有partition号consume时,不会为每个consumer单独管理offset信息。
但是,consumergroups根据组id记录每个partition处理的最后一个offset信息。
因此,如果特定consumer因故障而终止,则立即将partition分配给另一个consumer,并且可以从最后处理的offset开始consume。
在terminal中运行两个consumer和producer。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
通过输入相同的键来发布topic。
检查是否有一个consumer正在consume.
杀死接收到topic的consumer。
再次发布与producer相同key的topic。
可以看到之前没有收到topic的consumer现在正在处理topic。
然后我们将关闭所有consumer。
作为producer发布各种key的topic。
再次将consumer作为kafkaconsoleconsumer运行。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
可以看到未处理的topic被consume了。
这样,如果使用kafka,即使consumer失败了,也可以在处理完成后从offset开始工作,这样就可以做到最终一致性。
使用kafka可以获得稳定的服务运行。
作为参考,consumer组的offset信息可以在consumeroffsetstopic中找到。
您可以通过在AKHQtopics页面上选择“showalltopics”并进行搜索来查找consumeroffsetstopic。
如果你按下consumeroffsetstopic底部的’ live tail’按钮,你可以实时查看topic。
如果topic中有group id, topic, partition number作为key ,点击detail view,就可以搜索offset信息。
这允许consumergroups查找处理后的offset量。
consumergroups的解释到此结束。
设置点赞通知对内容创作者非常有帮助。
谢谢