kafkatopic,partition描述(producer,consumer,replicationfactor,offset)
大家好,这里是codeshow。
本次我们将学习kafka中的topic和partition。
要使用kafka,您必须有topic。
producer基于这个topicpublish消息,
consumer订阅消息。
运行kafka创建topic。
环境设置请参考之前的视频。
运行devcontainers进行kafka练习。
等到container运行。
当container运行完成后,我们将打开docker desktop。
在kafkacontainer中执行shell。
对于topic查找,请使用kafkatopics的listoption。
kafka-topics --bootstrap-server kafka:9092 --list
我确认除了topicconsumeroffsets和默认创建的schemas之外没有topic。
使用kafka topics命令的createoption创建topic。
kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 1 --replication-factor 1
我使用kafka topics命令创建了topic名为hello的主题。
topic的名称是使用nameoption指定的。
replicationfactoroption是option,用于选择要制作多少个partition数据副本。
我将副本factor值设置为与kafka节点数一样多。
较高的值会产生在节点之间复制分区的成本,从而影响性能。
相反,如果该值较小,则在kafka发生故障时可能会丢失数据。
通常建议使用 3 或更高的值。
作为参考,如果您在create时放置的replicationfactor大于kafka节点数,则会发生错误。
kafka-topics --bootstrap-server kafka:9092 --create --topic error --partitions 1 --replication-factor 4
我将用另一个option解释partitions。
partition是kafka中非常重要的信息。
partitionsoption是设置要划分多少topic的信息。
如果producerpublish100 个topic,而这个topic只有 1 个partition,
100 个topic堆叠在 1 个partition中。
如果consumer花费 1 分钟来处理 1 个topic。
处理 100 个topic需要 100 分钟。
如果将partition设置为 4,则topic在 4 个分区中的每个分区中堆叠 25 个。
由于一个partition最多可以有一个consumer,
现在您可以添加 3 个consumer,与分区数量增加的数量一样多。
处理时间可以减少 1/4,从 100 分钟减少到 25 分钟。
我们将使用shell练习到目前为止的内容。
我将在hello topic上练习producer和consumer。
在kafkacontainer中运行terminal。
左边是producer,右边是consumer。
使用kafka console producer命令连接到kafka。
kafka-console-producer --bootstrap-server kafka:9092 --topic hello
producer连接到kafka broker。
使用kafkaconsoleconsumer命令连接到kafka。
partition从索引 0 开始按顺序增加。
实际上,由于partitionsize为 1,因此只能使用partition0。
输入partition号 0 作为option。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
输入producer的hellomessage。
你可以在consumer中查看hello。
那么,多个consumer可以连接到一个partition吗?
让我们运行一个额外的terminal并添加consumer。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
consumer连接没有问题。
那么当您作为producerpublish消息时会发生什么?
有两种情况。
1.消息是顺序传递的,一次是按照consumer的顺序。
这称为round robin方法。
2.所有的消息都传递给所有的consumer。
这个名字叫做broadcasting。
以producer身份输入消息hello1,查看运行情况。
一条 hello1 消息出现在两个consumer上。
作为producer再次输入 hello2 消息。
一条 hello2 消息出现在两个consumer上。
在kafka中,连接到同一个partition的所有consumer都通过broadcasting接收消息。
作为参考,另一个messagebrokerrabbit mq可以将多个worker连接到一个queue。
通过round robin,worker按顺序接收消息。
这部分的工作原理与kafka不同,所以先体验过rabbit mq的人可能会一头雾水。
rabbit mq通过增加一个queue中worker的size来增加吞吐量,
kafka通过增加一个topic的partitionsize来增加吞吐量。
在本练习中,我们将更改partition数。
让我们将partition数从 1 增加到 2。
按 ctrl c 退出producer。
让我们使用kafka topics命令的alteroption将partitionsize增加到 2。
kafka-topics --bootstrap-server kafka:9092 --alter --partitions 2 --topic hello
使用kafka topics的describe选项查看分区信息。
kafka-topics --bootstrap-server kafka:9092 --topic hello --describe
作为参考,您还可以在AKHQ的topic屏幕上查看增加的partitionsize2。
再次执行producer并将consumer添加到新添加的partition#1。
kafka-console-producer --bootstrap-server kafka:9092 --topic hello
终止连接到现有partition0 的consumer,并将consumer连接到新partition1。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1
使用producer创建随机数量的消息。
有两个consumer,但topic被划分并只传递给一个consumer。
这是kafka在没有key时工作的stickypartitioncache方式。
如果没有key,第一个发送的partition号cache,topic只转发给这个partition。
如果没有key ,增加partition是没有意义的,因为只用了一个partition 。
也就是说, key是决定topic在kafka中存储partition的重要信息。
我们将添加key,以便将topic很好地划分并存储在partition之间。
向producer添加propertyoption并再次运行。
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
让我们把一个随机值作为producer。
现在你可以看到topic被划分为partition并保存了。
特别是,您可以检查相同的key是否通过consumerpublish到相同的partition。
使用keyseparatoroption,破折号之前的key和破折号之后的value被publish。
a 是key,b 是value。
作为参考,过去只发布valuehello而没有publishkey 。
如果破折号前面的值相同,key相同。
重要的是,相同的key总是存储在相同的partition中。
但是,如果key不同,则它们存储在相同的partition或不同的partition中。
为了理解这部分,让我们看看如何处理kafka中的key。
kafka对keyhash计算,取kafka节点个数的余数,并将结果值确定为partition号。
由于现在有两个partition,只有 0 和 1 可以作为partition号出现。
作为参考,hash算法使用murmur2 hash。
了解partition的一个重点是, kafka仅保证partition的topic顺序。
即在publish顺序重要的topic时,将它们放在同一个partition中。
我检查过key总是存储在同一个partition中,
对于顺序很重要的topic,请务必使用相同的key!
接下来,我将解释partition的offset。
kafka与现有的messagequeue不同的是,它在consume后不会从queue中删除消息。
分区类似于数组。
partition中的topic按顺序存储,就像数组的元素一样。
就像通过索引访问数组值一样,您可以通过offset从partition的特定索引consume。
作为参考,每个partition都有最后一个offset。
因此,如果新consumer在没有offsetoption下连接,它将从最后一个offsetconsume。
但是,如果给出offsetoption,则首先consumeoffset信息。
作为练习,我们将向terminal发出两个命令。
1.命令consumer从offset0开始执行。
2. 执行consumer不带offset命令的命令。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1 --offset 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1
如果你给offset索引0option,你可以检查它是否从头开始再次执行。
如果不存在offsetoption,offset选项默认为最新。
仅consume新publish的topic。
最后,作为partition的一个重要约束,
您不能为partition size设置较小的值。
partition size只能输入较大的值。
我将给出alter命令以将partition从 2 减少到 1。
kafka-topics --bootstrap-server kafka:9092 --alter --partitions 1 --topic hello
您会看到一条错误消息,告诉您将partitionoption设置为大于 2 的值。
kafka的topic和partition的讲解到此结束。
下一次,我们将看看如何使用consumergroups。
作为参考,而不是consumer直接consumepartition,
consumergroups在实践中使用较多。
为了更好地了解这些consumergroups,这堂课
我有时间学习partition。
订阅和点赞通知设置对内容创作者非常有帮助。
谢谢