kafka consumer groups erklärt
Hallo, das ist codeshow.
In dieser Zeit werden wir etwas über kafka groups consumer .
In der Praxis werden häufig consumer, groups consumer direkt mit partition zu verbinden.
consumer ermöglichen einen stärker automatisierten partition , als consumer direkt mit partition zu verbinden.
Um beispielsweise ein topic zu consume , das aus 4 partition besteht, müssen Sie 4 consumer erstellen, die die partition 0 bis 3 betrachten.
Dies liegt daran, dass ein consumer nur eine partition consume kann.
Wenn jedoch consumer verwendet werden, kann ein consumer alle vier partition consume .
Und wenn die Anzahl der consumer in derselben Gruppe zunimmt, wird die zu consume partition automatisch rebalancing.
Im Gegensatz zu direkt consume partition bietet der Betrieb viele Vorteile, da sie automatisch consume werden.
Wir werden consumer üben .
Die Umgebungseinstellung wird vom vorherigen Video fortgesetzt.
Führen Sie devcontainers für kafka Übungen aus.
Warten Sie, bis der container ausgeführt wird.
Wenn der container fertig ist, öffnen wir docker desktop.
Führen Sie die shell im kafka container aus.
Lassen Sie uns das zuvor erstellte topic löschen und mit dem Üben beginnen.
Wenn es kein hello topic gibt, können Sie diesen Schritt überspringen.
kafka-topics --bootstrap-server kafka:9092 --delete --topic hello
Löschen Sie ein topic mit dem kafka topics .
Erstellen Sie ein hello topic mit zwei partition mit dem kafka topics .
kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2
Überprüfen Sie das erstellte topic im topic von AKHQ .
Ich habe bestätigt, dass es zwei partition gibt.
consume die partition 0 und 1 mit dem kafka console consumer Befehl.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1
Wie in der Übung im vorherigen Video werden wir key und value basierend auf Bindestrichen produce.
Führen Sie kafka console producer aus.
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
Sie können sehen, dass zwei consumer Nachrichten consume.
Wenn ein consumer über die partition option , kann nur eine partition consume werden.
Daher können die in der partition des zuvor beendeten consumer angesammelten topic nicht consume werden.
Dann werden wir von nun an consumer verwenden , anstatt partition direkt zu consume .
Beenden Sie den laufenden consumer der kafka console .
Fügt einem vorhandenen Befehl eine option hinzu.
Die option gibt die id der Gruppe an, in der sich der consumer befindet.
Geben Sie die Gruppen id als work ein.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work
Die bestimmungsgemäße Ausführung des Befehls führt zu einem Fehler.
Weil partition und option nicht gleichzeitig eingegeben werden können.
Es muss nur eine der beiden option eingegeben werden.
Schließen Sie die partition aus, und führen Sie den kafka console consumer Befehl erneut aus.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Jetzt werden wir den Betrieb von consumer durch AKHQ überprüfen.
Verbinden Sie sich mit AKHQ mit dem port localhost 8080 .
Sehen Sie sich das AKHQ topics an.
Wählen Sie hello topic aus der topic aus.
Wählen Sie die consumer aus .
Wählen Sie consumer aus , deren id work ist.
Auf der topics“ können Sie Informationen zu den aktuell consume topic und partition sehen.
Beachten Sie , dass consumer ein oder mehrere topic consume können.
In dieser Übung consume wir nur ein topic .
Wählen Sie auf der AKHQ Seite die Registerkarte Mitglieder.
Es gibt die Spalten client id, id, host und assignments .
Die client id ist ein Wert, der zur Identifizierung des consumer eingegeben wird.
In der Praxis sieht man, dass ‘ console consumer ‘, der Standardwert des shell Befehls, eingetragen ist.
Die client id erlaubt Duplikate.
Andererseits wird dem Wert der id Spalte automatisch eine eindeutige id innerhalb der consumer zugewiesen.
Sie können die host Informationen des Verbrauchers über die host Spalte überprüfen.
In der assignments können Sie die Partitionsinformationen des topic überprüfen, das vom consumer consume wird .
Derzeit consume es zwei partition 0 und 1 des hello topic .
Führen Sie dann den consumer , dessen Gruppen- id work ist, im terminal unten aus.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Aktualisieren Sie die AKHQ Phase.
Sie können sehen, dass ein weiterer console consumer wird.
Bezüglich der client id gibt es consumer console ,
Sie können anhand der id Spalte überprüfen, ob consumer eindeutige id Werte haben.
Eine Sache, auf die Sie achten sollten, ist die assignments .
Zuvor consume ein consumer die partition 0 und 1,
Da nun zwei consumer vorhanden sind, consume die beiden consumer die partition 0 bzw. 1.
Wenn die Konfiguration von consumer, die an consumer teilnehmen , geändert wird, können die consumer automatisch einen partition durchführen.
Beachten groups consumer hat, da sich die Anzahl der consumer verdoppelt hat.
Beim partition wird automatisch die Anzahl der consumer dividiert durch die partition size.
Wenn die partition size ist und es zwei consumer gibt, consume jeder 5.
Schauen wir uns umgekehrt den Fall an, in dem die consumer size als die partition ist .
Nehmen wir an, dass es 2 partition und 3 consumer gibt, und der consumer ist eins größer.
Fügen Sie dem terminal einen neuen consumer hinzu , sodass es 3 consumer in den work consumer .
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Aktualisieren Sie die AKHQ Seite.
Sie können sehen, dass es 3 client gibt, deren Gruppen- id work ist.
Übrigens, wenn Sie die assignments überprüfen, können Sie sehen, dass zwei consumer partition zugewiesen sind, einem consumer jedoch nicht.
Wenn also die Anzahl der consumer die Anzahl der partition übersteigt, wird den verbleibenden consumer keine partition zugewiesen.
Sie müssen vorsichtig sein, da Sie Ihren Server unnötig verschwenden können.
Gehen wir zurück zu unserem Labor und terminieren einen consumer in terminal .
Aktualisieren Sie die AKHQ Seite schnell.
Sie können sehen, dass es in der assignments keine zugewiesenen partition gibt.
Wenn sich die Verbraucherzusammensetzung der consumer ändert,
Alle consumer hören auf, partition zu consume, und warten bis zum rebalancing.
Und wenn der partition abgeschlossen ist, consume der consumer erneut.
Da dieser Vorgang kurz ist, müssen Sie ihn schnell überprüfen, um ihn zu sehen.
Dann statt consumer mit dem kafka topics ,
Was passiert, wenn Sie die size der partition erhöhen, die der kafka Knoten hat?
Werden die verbleibenden consumer die neue partition sofort consume?
Ich mache eine Übung zur Bestätigung.
consumer hinzufügen.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Aktualisieren Sie die AKHQ Seite.
Nicht zugeordnete consumer können Sie in der assignments prüfen.
Lassen Sie uns nun die Anzahl der partition mit kafka topics auf 3 erhöhen.
kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3
Anders als beim Hinzufügen oder Entfernen von consumer in der consumer
Auch wenn Sie die partition mit dem kafka topics vergrößern, wird sie nicht sofort dem consumer zugewiesen.
Als Referenz: In dieser Übung dauerte es ungefähr 2 Minuten, bis die partition der consumer zugewiesen wurde.
Es kann eine Verzögerung geben, wenn Sie also schnell verarbeiten möchten, nachdem Sie den Befehl „partition size alter“ verwendet haben,
Eine andere Möglichkeit besteht darin, sofort einen consumer hinzuzufügen.
Der Grund, warum wir uns mit Verzögerungen befassen, liegt in der Reihenfolge der topic in kafka .
Angenommen, topic A muss verarbeitet werden, bevor topic B verarbeitet werden kann.
A befindet sich jedoch auf Partition 0,
Es wird angenommen, dass B auf Partition Nr. 1 gespeichert ist.
Wenn partition 0 um 1 Minute verzögert wird, wird B in partition 1 zuerst consume,
Wenn die Verzögerung endet und ein topic durch partition 0 consume wird,
A sollte vor B gespeichert werden, aber B wird zuerst gespeichert, sodass ein Fehler auftritt.
Da kafka die Ordnung nach partition garantiert,
A- und B- topic , die eine Reihenfolge erfordern, verwenden denselben key , sodass sie in derselben partition gespeichert werden können.
Als consumer , wenn ein consumer zwei oder mehr partition consume,
Sie müssen die Metainformationen im topic überprüfen, die Partitionsinformationen lesen und jede partition separat verarbeiten.
Ich werde dies anhand des Codes in einem anderen Video überprüfen.
Werfen wir abschließend einen Blick auf die Verwaltung von offset in consumer .
Beim direkten consume mit der vorhandenen partition werden offset Informationen nicht für jeden consumer separat verwaltet.
consumer zeichnen jedoch die letzten offset Informationen auf, die pro partition basierend auf der Gruppen- id verarbeitet groups .
Wenn also ein bestimmter consumer aufgrund eines Fehlers beendet wird, wird eine partition sofort einem anderen consumer zugewiesen, und der consume kann ab dem zuletzt verarbeiteten offset gestartet werden.
Führen Sie im terminal zwei consumer und einen producer aus.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
Veröffentlichen Sie das topic, indem Sie denselben Schlüssel eingeben.
Überprüfen Sie, ob ein consumer consume.
Töten Sie den consumer , der das topic erhalten hat.
Veröffentlichen Sie das topic erneut mit demselben key wie der producer .
Sie können sehen, dass der consumer, der das topic zuvor nicht erhalten hat, das topic jetzt verarbeitet.
Dann schließen wir alle consumer.
Veröffentlichen Sie als producer topic mit verschiedenen key .
Führen Sie den consumer consumer als kafka console aus.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Sie können sehen, dass unverarbeitete topic consume werden.
Auf diese Weise können Sie, wenn Sie kafka verwenden, auch wenn der consumer ausfällt, nach Abschluss der Verarbeitung vom offset aus arbeiten, um letztendlich Konsistenz zu erreichen.
Mit kafka erhalten Sie einen stabilen Servicebetrieb.
Als Referenz finden Sie offset Informationen für consumer im topic consumer offsets .
Sie können das topic „offsets “ nachschlagen, indem Sie auf der AKHQ topics „ all topics show “ auswählen und danach suchen.
Wenn Sie auf die Schaltfläche „ live tail “ am Ende des topic „ offsets “ klicken, können Sie die topic in Echtzeit überprüfen.
Wenn Sie die Gruppen- id, das topic und die partition als key im topic haben und auf Detailansicht klicken, können Sie offset Informationen suchen.
Dadurch können consumer die verarbeitete offset Nummer nachschlagen.
Damit ist die Erläuterung der consumer abgeschlossen.
Das Einstellen von ähnlichen Benachrichtigungen ist für Inhaltsersteller sehr hilfreich.
Danke