kafka consumer groups説明
こんにちはcodeshowです。
今回はkafka consumer groupsについて学びます。
実務では、 consumerを直接partitionに接続するよりもconsumer groupsを主に使用します。
consumer groupsはconsumerをpartitionに直接接続する方法よりも自動化されたpartition consumeを行うことができます。
たとえば、4 つのpartitionで構成されたtopicをconsumeするには、0 番から 3 回までpartitionを眺める 4 つのconsumerを作成しなければなりません。
1 つのconsumerは 1 つのpartitionだけconsumeできるからです。
ただし、 consumer groupsを使用すると、1 つのconsumerで 4 つのpartitionすべてをconsumeできます。
そして同じグループのconsumerが増えればconsumeするpartitionを自動的にrebalancingをしてくれます。
partitionを直接consumeとは異なり自動的にconsumeするため、運用に多くの利点があります。
consumer groups実習をします。
環境設定は前の映像に続いて進めます。
kafka練習のためのdevcontainersを実行してください。
container実行されるまでお待ちください。
containerが準備ができたらdocker desktop を開きます。
kafka containerでshellを実行します。
以前に作成したtopicを削除して実習を始めましょう。
hello topicがない場合、このプロセスは省略してもかまいません。
kafka-topics --bootstrap-server kafka:9092 --delete --topic hello
kafka topicsコマンドでtopicを削除します。
kafka topicsコマンドでpartitionが2つのhello topicを生成します。
kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2
AKHQのtopicメニューで生成されたtopicを確認します。
partitionが2つであることを確認しました。
kafka console consumerコマンドで 0,1 回partition をconsumeします。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1
前の映像の実習のようにハイフンを基準にkeyとvalueをproduceします。
kafka console producerを実行します。
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
2 つのconsumerでメッセージをconsumeすることがわかります。
consumerはpartition optionを通じてconsumeする場合は 1 つのpartitionだけconsume可能です。
だから、以前に終了したconsumerのpartitionにたまったtopicはconsumeをすることはできません。
それではこれからはpartitionを直接consumeせずにconsumer groups を使います。
実行中のkafka console consumerを終了します。
既存の命令にgroup optionを追加します。
group optionはconsumerがある group のidを指定します。
group idはworkと入力します。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work
意図したとおりに命令を実行するとエラーが発生します。
なぜなら、 partition optionと group option は同時に入力することはできません。
どちらのoption入力する必要があります。
partition optionを減算し、再度kafka console consumerコマンドを発行します。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
では、 AKHQを通じてconsumer groupsの動作を確認します。
localhost 8080 portでAKHQ を接続します。
AKHQ topicsメニューを確認してください。
topicリストからhello topic を選択します。
consumer groupsタブを選択します。
idがworkであるconsumer groupsを選択します。
topicsタブには、現在consume中のtopicとpartitionの情報を表示できます。
ちなみに、 consumer groupsは 1 つ以上のtopicをconsumeできます。
今回の実習では、1つのtopicだけをconsumeます。
AKHQページで、Membersタブを選択します。
client id、 id、 host、 assignmentsカラムがあります。
client idはconsumerを識別するために入れる値です。
練習では、 shellコマンドのデフォルト値である「console consumer」が入力されていることを確認できます。
client idは重複を許可します。
一方、 id列の値はconsumerグループ内で唯一のidが自動的に割り当てられます。
host列を使用して、consumerのhost情報を確認できます。
assignmentsカラムではconsumerがconsumeするtopicのpartition情報を確認できます。
今はhello topicの0と1番partitionの2つをconsumeています。
次に、下部にterminalとして group idがwork であるconsumerを実行します。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
AKHQフェイズを更新します。
console consumerがもう 1 つ追加されることを確認できます。
client idはconsole consumerが二つが出て、
id列を使用すると、consumerは一意のid値を持っていることを確認できます。
気にして見なければならないのはassignmentsカラムです。
従来、一つのconsumerが 0 と 1 回partitionをconsumeしたが、
これでconsumerが 2 つになったので、2 つのconsumerは 0 回 1 回partitionをそれぞれconsumeすることになります。
consumer groupsに参加するconsumerの構成が変わると、自動的にconsumerはpartition rebalancingすることができます。
ちなみに、consumer数が倍増したため、このconsumergroupsのスループットも倍増しました。
partition rebalancingはconsumerの数をpartition sizeで割って自動的に割り当てます。
partition sizeが 10 で、 consumerが 2 つの場合、それぞれ 5 つずつconsumeします。
では、逆にpartition sizeよりconsumer sizeが大きい場合について調べましょう。
partitionが 2 個でconsumerが 3 個でconsumerが 1 個より大きい場合を仮定します。
terminalに新しいconsumerを追加して、 work consumer groupsにconsumerが 3 つになるようにします。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
AKHQページを更新してください。
group idがworkのclientは 3 つであることを確認できます。
ところで、 assignmentsカラムを確認すると、2 つのconsumerはpartition が割り当てられましたが、1 つconsumerは割り当てられていないことを確認できます。
このようにpartition数よりconsumer数が多い場合、残るconsumerはpartitionを割り当てられません。
不必要にサーバーを無駄にする可能性があるため、注意が必要です。
練習に戻り、 terminalでconsumer 1 つを終了します。
すばやくAKHQページを更新します。
assignments列に割り当てられたpartitionがないことを確認できます。
consumerグループのconsumer構成に変化が生じた場合、
すべてのconsumerはpartition consume を停止し、 rebalancingされるまで待機します。
そしてpartition rebalancingが完了するとconsumerは再consumeをします。
このプロセスは短いため、すばやく確認する必要があります。
では、 kafka topics命令でconsumerではなく、
kafkaノードが持っているpartitionのsizeを増やすとどうなりますか?
残りのconsumerはすぐに新しいpartitionをconsumeますか?
確認のため実習をさせていただきます。
consumerを追加します。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
AKHQページを更新してください。
assignments列で割り当てられていないconsumerを確認できます。
今、 kafka topicsでpartitionの数を3つに増やします。
kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3
しかし、consumerグループのconsumerが追加または削除されるのとは異なり、
kafka topics命令でpartitionを増やしてもconsumerにすぐに割り当てられません。
ちなみに今の練習ではconsumer group にpartitionが割り当てられるまで 2 分ほどかかりました。
遅延がある可能性があるため、迅速に処理したい場合は、 partition size alterコマンドを使用した後、
すぐにconsumerを追加するのも方法です。
遅延について扱う理由はkafkaのtopicの順序のためです。
たとえば、A topicは必ず B topicが処理される前に処理されなければならないと仮定します。
さてAは0番パーティションに、
Bは1番目のパーティションに保存されていると仮定します。
もし0番partitionが1分間遅延が発生し、1番partitionのBが先にconsumeとなり、
遅延が終了し、A topicが 0 回partitionを通じてconsumeされるとき、
A は B より先に保存する必要がありますが、B が最初に保存されているためエラーが発生します。
kafkaはpartition単位で順序が保証されるため
順序が必要なAとBのtopicは、同じpartitionに保存できるように同じkeyを使用してください。
consumer group で 1 つのconsumerが 2 つ以上のpartition をconsumeするときは、
topicで meta 情報を確認してpartition情報を読み、 partitionごとに別々に処理をしてくれなければなりません。
これは他のビデオでコードを通して確認します。
最後に、 consumer groupsでoffsetを管理することを見てみましょう。
既存のpartition番号で直接consumeした時はconsumerごとにoffset情報を別に管理しません。
しかし、 consumer groupsは group idに基づいてpartitionごとに処理した最後のoffset情報を記録します。
そのため、特定のconsumerが障害で終了した場合、すぐに別のconsumerにpartitionが割り当てられ、最後に処理したoffset以降からconsumeを行うことができます。
terminalでconsumer二つとproducerを実行します。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
同じキーを入力してtopicを発行します。
1 つのconsumerでconsumeすることを確認します。
topicを受け取ったconsumerを終了します。
再度producerと同じkeyでtopic を発行します。
既存のtopicを受け取っていなかったconsumerが、今度はtopicを処理していることを確認できます。
それではすべてのconsumerを終了させます。
producerでさまざまなkeyでtopicを発行します。
再度kafka console consumerでconsumerを実行します。
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
未処理のtopicをconsumeことを確認できます。
このように、 kafkaを使用すると、consumerが失敗しても処理が完了したoffset以降の作業を実行できるため、最終的な一貫性を達成できます。
kafkaを使用すると、安定したサービス運営を得ることができます。
ちなみに、consumerグループのoffset情報は、consumeroffsetstopicで確認できます。
AKHQ topicsページで「show all topics」を選択して検索すると、 consumer offsets topicを検索できます。
consumer offsets topicを下部の「live tail」ボタンを押すと、 topicをリアルタイムで確認できます。
topicからkeyへ group id, topic, partition番号があり、詳細ビューをクリックするとoffset情報を照会できます。
これにより、consumergroupsは処理されたoffset番号を照会できます。
以上でconsumer groupsの説明を終えます。
「いいね!」通知設定は、コンテンツ制作者に多くの役に立ちます。
ありがとうございます。