groups consumer kafka объяснили

Здравствуйте, это codeshow.
За это время мы узнаем о groups consumer kafka .
На практике groups consumer часто используются вместо прямого подключения consumer к partition .
groups consumer обеспечивают более автоматизированное consume partition , чем прямое подключение consumer к partition .
Например, чтобы consume topic , состоящую из 4 partition , вам нужно создать 4 consumer , которые просматривают partition с 0 по 3.
Это связано с тем, что один consumer может consume только один partition .
Однако если используются groups consumer , один consumer может consume все четыре partition .
А когда количество consumer в одной группе увеличивается, partition для consume автоматически rebalancing.
В отличие от прямого consume partition , у операции есть много преимуществ, поскольку они consume автоматически.

Мы будем практиковать consumer groups .
Настройка среды продолжится с предыдущего видео.
Запустите devcontainers для практики kafka .
Дождитесь, пока container запустится.
Когда container будет готов, мы откроем desktop docker .
Выполнить shell в container kafka .

Давайте удалим ранее созданную topic и приступим к практике.
Если topic hello нет, этот шаг можно пропустить.

kafka-topics --bootstrap-server kafka:9092 --delete --topic hello

Удалите topic с помощью команды topics kafka .

Создайте hello topic с двумя partition с помощью команды kafka topics .

kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2

Проверьте созданную topic в меню topic AKHQ .
Я подтвердил, что есть два partition.

consume partition 0 и 1 с помощью команды consumer console kafka .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1

Как и в предыдущем видео, мы будем produce key и value на основе дефисов.
Запустите producer console kafka .

kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

Вы можете видеть, что два consumer consume сообщения.
Когда consumer consume option partition , может быть consume только один partition .
Следовательно, topic , накопленные в partition ранее ликвидированного consumer, не могут быть consume.

Затем, с этого момента, мы будем использовать groups consumer вместо прямого consume partition .
Убейте работающего consumer console kafka .

Добавляет option группы к существующей команде.
option group указывает id группы, в которой находится consumer .
Введите id группы как work.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work

Выполнение команды по назначению приведет к ошибке.
Поскольку option partition и option группы не могут быть введены одновременно.
Необходимо ввести только один из двух option.
Исключите option partition и снова введите команду consumer console kafka .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Теперь проверим работу groups consumer через AKHQ .
Подключитесь к AKHQ через port localhost 8080.
Ознакомьтесь с меню topics AKHQ .
Выберите topic hello из списка topic .
Выберите вкладку groups consumer .
Выберите groups consumer с id work .
На вкладке «topics» вы можете увидеть информацию об consume в данный момент topic и partition.
Обратите внимание, что groups consumer могут consume одну или несколько topic .
В этом упражнении мы будем consume только одну topic .

На странице AKHQ выберите вкладку Участники.
Есть столбцы id client , id, host и assignments .
id client — это значение, введенное для идентификации consumer.
На практике вы можете видеть, что было введено значение ‘ console consumer ‘, значение по умолчанию для команды shell .
id client допускает дублирование.
С другой стороны, значению столбца id автоматически присваивается уникальный id в группе consumer .
Вы можете проверить информацию о host consumer через столбец host .
В столбце assignments вы можете проверить информацию о partition topic, consume consumer .
В настоящее время он consume два partition 0 и 1 topic hello .
Затем запустите consumer , чей id группы work , в terminal внизу.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Обновите фазу AKHQ .
Вы можете видеть, что добавлен еще один consumer console .
Что касается id client , есть два consumer console ,
Вы можете проверить, что consumer имеют уникальные значения id через столбец id .

На что следует обратить внимание, так это на столбец assignments .
Раньше один consumer consume partition 0 и 1,
Поскольку теперь есть два consumer, два consumer consume partition 0 и 1 соответственно.

При изменении конфигурации consumer , участвующих в groups consumer , consumer могут автоматически выполнять rebalancing partition .
Обратите внимание, что поскольку количество consumer удвоилось, пропускная способность этих groups consumer также удвоилась.

rebalancing partition автоматически распределяет количество consumer , деленное на size partition .
Если size partition равен 10 и есть два consumer, каждый consume 5.
И наоборот, давайте рассмотрим случай, когда size consumer больше, чем size partition .
Предположим, что есть 2 partition и 3 consumer, причем consumer на один больше.
Добавьте в terminal нового consumer так, чтобы в work groups consumer было 3 consumer.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Обновите страницу AKHQ .
Вы можете видеть, что есть 3 client с id группы work .
Кстати, если вы проверите столбец assignments , то увидите, что двум consumer назначены partition, а одному consumer— нет.
Таким образом, если количество consumer превышает количество partition , оставшимся consumer не назначается partition.
Вы должны быть осторожны, потому что вы можете тратить свой сервер без необходимости.

Вернемся в нашу лабораторию и завершим одного consumer в terminal .
Быстро обновите страницу AKHQ .
Вы можете видеть, что в столбце assignments нет назначенных partition.
В случае изменения consumer состава группы consumer ,
Все consumer перестают consume partition и ждут rebalancing.
И когда rebalancing partition завершена, consumer снова consume.
Поскольку этот процесс короткий, вам нужно быстро проверить его, чтобы увидеть.

Затем вместо consumer с командой topics kafka ,
Что произойдет, если вы увеличите size partition , который имеет узел kafka ?
Будут ли оставшиеся consumer немедленно consume новый partition ?
Я сделаю упражнение, чтобы подтвердить.
Добавьте consumer.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Обновите страницу AKHQ .
Вы можете проверить неназначенных consumer в столбце assignments .
Теперь увеличим количество partition до 3 с topics kafka .

kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3

Однако, в отличие от добавления или удаления consumer в группе consumer ,
Даже если увеличить partition командой kafka topics , он не сразу назначается consumer.
Для справки: в этом упражнении назначение partition группе consumer заняло около 2 минут.
Может быть задержка, поэтому, если вы хотите быстро обработать, после использования команды alter size partition ,
Другой способ — сразу добавить consumer.

Причина, по которой мы имеем дело с задержкой, заключается в порядке следования topic в kafka .
Например, предположим, что topic A необходимо обработать до того, как можно будет обработать topic B.
Однако A находится в разделе 0,
Предполагается, что B хранится в разделе № 1.
Если partition 0 задерживается на 1 минуту, B в partition 1 consume первым,
Когда задержка заканчивается и topic consume через partition 0,
A должен быть сохранен перед B, но B сохраняется первым, поэтому возникает ошибка.
Поскольку kafka гарантирует порядок по partition ,
topic A и B, требующие порядка, используют один и тот же key , чтобы их можно было хранить в одном partition .
Как группа consumer , когда один consumer consume два или более partition ,
Вам нужно проверить метаинформацию в topic, прочитать информацию о partition и обработать каждый partition отдельно.
Я проверю это через код в другом видео.

Наконец, давайте взглянем на управление offset в groups consumer .
При consume напрямую с существующим номером partition информация о offset не обрабатывается отдельно для каждого consumer .
Однако groups consumer записывают информацию о последнем offset, обработанном для каждой partition, на основе id группы.
Итак, если конкретный consumer завершает работу из-за сбоя, partition немедленно назначается другому consumer , и consume может быть запущено с последнего обработанного offset .
Запустите в terminal двух consumer и producer.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

Опубликуйте topic, введя тот же ключ.
Убедитесь, что один consumer consume.
Убейте consumer , получившего topic .
Опубликуйте topic с тем же key , что и producer .
Вы можете видеть, что consumer ранее не получал topic , теперь обрабатывает topic.
Тогда мы закроем всех consumer.
Публикуйте topic с различными key в качестве producer .

Снова запустите consumer как consumer console kafka .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Вы можете видеть, что необработанные topic consume.

Таким образом, если вы используете kafka, даже если consumer выйдет из строя, вы сможете работать со offset после завершения обработки, чтобы добиться согласованности в конечном итоге.
С kafka вы можете получить стабильную работу сервиса.

Для справки, информацию о offset для групп consumer можно найти в topic offsets consumer .
Вы можете найти topic consumer offsets , выбрав « show all topics » на странице topics AKHQ и выполнив поиск по ней.
Если вы нажмете кнопку « live tail » в нижней части topic consumer offsets , вы сможете просматривать topic в режиме реального времени.
Если у вас есть id группы, topic, номер partition в качестве key в topic и щелкните подробный просмотр, вы можете искать информацию о offset .
Это позволяет groups consumer искать обработанный номер offset .

На этом объяснение consumer groups заканчивается.

Настройка подобных уведомлений очень полезна для создателей контента.
Спасибо