groups de consumer kafka explicados

Hola, esto es codeshow.
En este momento, aprenderemos sobre los groups de consumer de kafka .
En la práctica , a menudo se utilizan groups de consumidores en lugar de conectar a los consumer directamente a las partition .
Los groups de consumer permiten un consume de partition más automatizado que conectar a los consumer directamente a las partition .
Por ejemplo, para consume un topic compuesto por 4 partition , debe crear 4 consumer que observen las partition 0 a 3.
Esto se debe a que un consumer puede consume solo una partition .
Sin embargo, si se utilizan groups de consumer , un consumer puede consume las cuatro partition .
Y cuando aumenta el número de consumer en un mismo grupo, la partition a consume se rebalancing automáticamente.
A diferencia del consume directo de partition , la operación ofrece muchos beneficios porque se consume automáticamente.

Practicaremos groups de consumer .
La configuración del entorno continuará desde el video anterior.
Ejecute devcontainers para practicar kafka .
Espere hasta que el container se ejecute.
Cuando el container esté listo, abriremos docker desktop.
Ejecute shell en el container kafka .

Eliminemos el topic creado anteriormente y comencemos a practicar.
Si no hay un topic de hello , puede omitir este paso.

kafka-topics --bootstrap-server kafka:9092 --delete --topic hello

Elimine un topic con el comando de topics de kafka.

Cree un topic de saludo con dos partition con el comando de topics kafka .

kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2

Verifique el topic creado en el topic de AKHQ .
He confirmado que hay dos partition.

consume las partition 0 y 1 con el comando del consumer de la console kafka .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1

Al igual que la práctica en el video anterior, produce key y value basados en guiones.
Ejecute el producer de la console kafka .

kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

Puede ver que dos consumer están consume mensajes.
Cuando un consumer consume a través de la option de partition , solo se puede consume una partition .
Por lo tanto, los topic acumulados en la partition del consumer previamente terminado no se pueden consume.

Entonces, de ahora en adelante, usaremos groups de consumer en lugar de consume partition directamente.
Mata al consumer de la console kafka en ejecución.

Agrega una option de grupo a un comando existente.
La option de grupo especifica la id del grupo donde se encuentra el consumer .
Ingrese la id del grupo como work.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work

Ejecutar el comando según lo previsto dará como resultado un error.
Porque la option de partition y la option de grupo no se pueden ingresar al mismo tiempo.
Solo se debe ingresar una de las dos option.
Excluya la option de partición y vuelva a emitir el comando del consumer de la console kafka .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Ahora, comprobaremos el funcionamiento de los groups de consumer a través de AKHQ .
Conéctese a AKHQ con el port localhost 8080.
Consulte el menú de topics de AKHQ.
Seleccione hello topic de la lista de topic .
Seleccione la pestaña de groups de consumidores.
Seleccione groups de consumer cuyo id sea work .
En la topics , puede ver información sobre los topic y partition que consume actualmente.
Tenga en cuenta que los groups de consumer pueden consume uno o más topic .
En este ejercicio, consume solo un topic .

En la página de AKHQ , seleccione la pestaña Miembros.
Hay columnas id de client , id, host y assignments .
El id de cliente es un valor ingresado para identificar al consumer.
En la práctica, puede ver que se ha ingresado ‘ console consumer ‘, el valor predeterminado del comando de shell .
La id del cliente permite duplicados.
Por otro lado, al valor de la id se le asigna automáticamente una id única dentro del consumer .
Puede verificar la información del host del consumidor a través de la host .
En la assignments , puede verificar la partition del topic consume por el consumer .
Actualmente, está consume dos partition 0 y 1 del topic hello .
Luego, ejecute el consumer cuya id de grupo es work en la terminal en la parte inferior.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Actualizar la AKHQ .
Puede ver que se agrega un consumer de console más.
En cuanto a la id del client , hay dos consumer de console ,
Puede verificar que los consumer tengan valores de id únicos a través de la id .

Una cosa a la que debe prestar atención es la columna de assignments .
Anteriormente, un consumer consume las partition 0 y 1,
Como ahora hay dos consumer, los dos consumer consume las partition 0 y 1, respectivamente.

Cuando se cambia la configuración de los consumer que participan en groups de consumer , los consumer pueden realizar el rebalancing de partition automáticamente.
Tenga en cuenta que dado que la cantidad de consumer se duplicó, el rendimiento de estos groups de consumer también se duplicó.

El rebalancing de la partition asigna automáticamente el número de consumer dividido por el size de la partición.
Si el size de la partición es 10 y hay dos consumer, cada uno consume 5.
Por el contrario, veamos el caso en el que el size del consumidor es mayor que el size de la partición.
Supongamos que hay 2 partition y 3 consumer, y el consumer es uno más grande.
Agregue un nuevo consumer al terminal para que haya 3 consumer en los groups de consumer de trabajo.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Actualice la página de AKHQ .
Puede ver que hay 3 client cuya id de grupo es work .
Por cierto, si revisa la assignments , puede ver que a dos consumer se les asignan partition, pero a un consumer no.
Como tal, si la cantidad de consumer excede la cantidad de partition , a los consumer restantes no se les asigna una partition.
Debe tener cuidado porque puede desperdiciar su servidor innecesariamente.

Volvamos a nuestro laboratorio y finalicemos un consumer en la terminal .
Actualice la página de AKHQ rápidamente.
Puede ver que no hay partition asignadas en la assignments .
Si hay un cambio en la consumer del grupo de consumer ,
Todos los consumer dejan de consume partition y esperan hasta el rebalancing.
Y cuando se completa el rebalancing de la partition , el consumer vuelve a consume.
Dado que este proceso es corto, debe verificarlo rápidamente para verlo.

Luego, en lugar de consumer con el comando de topics kafka ,
¿Qué sucede si aumenta el size de la partition que tiene el kafka ?
¿Los consumer restantes consume inmediatamente la nueva partition ?
Haré un ejercicio para confirmar.
Agregar consumer.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Actualice la página de AKHQ .
Puede verificar los consumer no asignados en la assignments .
Ahora aumentemos el número de partition a 3 con topics kafka .

kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3

Sin embargo, a diferencia de la adición o eliminación de consumer en el consumer ,
Incluso si aumenta la partition con el comando de topics de kafka, no se asigna inmediatamente al consumer.
Como referencia, en este ejercicio, la partition tardó aproximadamente 2 minutos en asignarse al consumer .
Puede haber un retraso, por lo que si desea procesar rápidamente, después de usar el comando de alter del size de la partition ,
Otra forma es agregar un consumer de inmediato.

La razón por la que tratamos con el retraso es por el orden de los topic en kafka .
Por ejemplo, suponga que el topic A debe procesarse antes de que el topic B pueda procesarse.
Sin embargo, A está en la partición 0,
Se supone que B está almacenado en la partición #1.
Si la partition 0 se retrasa 1 minuto, B en la partition 1 se consume primero,
Cuando finaliza el retraso y se consume un topic a través de la partition 0,
A debe guardarse antes que B, pero B se guarda primero, por lo que se produce un error.
Dado que kafka garantiza el orden por partition ,
Los topic A y B que requieren orden usan la misma key para que puedan almacenarse en la misma partition .
Como grupo de consumer , cuando un consumer consume dos o más partition ,
Debe verificar la metainformación en el topic, leer la partition y procesar cada partition por separado.
Comprobaré esto a través del código en otro video.

Por último, echemos un vistazo a la gestión de offset en groups de consumer .
Cuando consume directamente con el número de partition existente, la información de offset no se administra por separado para cada consumer .
Sin embargo, los groups de consumer registran la última información de offset procesada por partition según la id del grupo.
Por lo tanto, si un consumer específico finaliza debido a una falla, una partition se asigna inmediatamente a otro consumer y el consume se puede iniciar desde la última offset procesada.
Ejecute dos consumer y un producer en la terminal .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

Publica el topic introduciendo la misma clave.
Compruebe que un consumer está consume.
Mata al consumer que recibió el topic .
Vuelve a publicar el topic con la misma key que el producer .
Puede ver que el consumer que anteriormente no recibió el topic ahora lo está procesando.
Entonces cerraremos todos los consumer.
Publica topic con varias key como producer .

Vuelva a ejecutar el consumer como consumer de la console kafka .

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

Puede ver que los topic sin procesar se consume.

De esta manera, si usa kafka, incluso si el consumer falla, puede trabajar desde el offset una vez que se completa el procesamiento, para que pueda lograr la consistencia eventual.
Con kafka puede obtener una operación de servicio estable.

Como referencia, la información de offset para grupos de consumer se puede encontrar en el topic de offsets de consumidores.
Puede buscar el topic de las offsets del consumidor seleccionando ‘ show todos los topics ‘ en la página de topics de AKHQ y buscándolo.
Si presiona el botón ‘ live tail ‘ en la parte inferior del topic de las offsets del consumer , puede consultar los topic en tiempo real.
Si tiene id de grupo, topic, número de partition como key en el topic y hace clic en la vista detallada, puede buscar información de offset .
Esto permite que los groups de consumer busquen el número de offset procesado.

Con esto concluye la explicación de los groups de consumer .

Configurar notificaciones similares es muy útil para los creadores de contenido.
gracias