kafka consumer groups 설명

안녕하세요 codeshow입니다.
이번 시간에는 kafka consumer groups에 대해 학습하겠습니다.
실무에서는 consumer를 직접 partition에 연결하는 것보다 consumer groups를 주로 사용합니다.
consumer groups는 consumer를 partition에 직접 연결하는 방식보다 더 자동화된 partition consume을 할 수 있습니다.
예를 들면 4개의 partition으로 구성된 topic을 consume하기 위해서는 0번부터 3번까지 partition을 바라보는 4개의 consumer를 만들어야합니다.
하나의 consumer는 하나의 partition만 consume 할 수 있기 때문입니다.
하지만 consumer groups를 사용하면 하나의 consumer로 4개의 partition 모두를 consume 할 수 있습니다.
그리고 같은 group의 consumer가 늘어나면 consume하는 partition을 자동으로 rebalancing을 해줍니다.
partition을 직접 consume하는 것과 다르게 자동으로 consume하기 때문에 운영에 많은 이점이 있습니다.

consumer groups 실습을 하겠습니다.
환경 설정은 이전 영상에 이어 진행하겠습니다.
kafka 실습을 위한 devcontainers 를 실행해주세요.
container 실행될때까지 기다려주세요.
container가 준비되면 docker desktop을 열겠습니다.
kafka container에서 shell을 실행합니다.

이전에 만든 topic을 삭제하고 실습을 시작하겠습니다.
hello topic이 없다면 이 과정은 생략해도 됩니다.

kafka-topics --bootstrap-server kafka:9092 --delete --topic hello

kafka topics 명령으로 topic을 삭제합니다.

kafka topics 명령으로 partition이 두개인 hello topic을 생성합니다.

kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2

AKHQ의 topic 메뉴에서 생성된 topic을 확인합니다.
partition이 두개인 것을 확인했습니다.

kafka console consumer 명령으로 0,1번 partition을 consume합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1

이전 영상의 실습처럼 하이픈을 기준으로 key와 value를 produce하겠습니다.
kafka console producer를 실행합니다.

kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

두개의 consumer에서 메세지를 consume하는 것을 볼 수 있습니다.
consumer는 partition option을 통해 consume하는 경우는 하나의 partition만 consume 가능합니다.
그래서 이전에 종료한 consumer의 partition에 쌓인 topic은 consume을 할 수 없습니다.

그럼 지금부터는 partition을 직접 consume하지 않고 consumer groups을 사용하겠습니다.
실행중인 kafka console consumer를 종료합니다.

기존 명령어에 group option을 추가합니다.
group option은 consumer가 있는 group의 id를 지정합니다.
group id는 work로 입력합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work

의도한대로 명령어를 실행하면 오류가 발생합니다.
왜냐하면 partition option과 group option은 동시에 입력할 수 없습니다.
두 option중 하나만 입력해야합니다.
partition option을 빼고 다시 kafka console consumer 명령을 내립니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

이제는 AKHQ를 통해 consumer groups의 동작을 확인하겠습니다.
localhost 8080 port로 AKHQ를 접속합니다.
AKHQ topics 메뉴를 확인합니다.
topic 목록에서 hello topic을 선택합니다.
consumer groups 탭을 선택합니다.
id가 work인 consumer groups를 선택합니다.
topics 탭에는 현재 consume중인 topic과 partition의 정보를 볼 수 있습니다.
참고로 consumer groups는 한개 이상의 topic을 consume할 수 있습니다.
이번 실습에서는 하나의 topic만 consume 하겠습니다.

AKHQ 페이지에서 Members 탭을 선택합니다.
client id, id, host, assignments 컬럼이 있습니다.
client id는 consumer를 식별하기위해 넣는 값입니다.
실습에서는 shell 명령어의 기본값인 ‘console consumer’가 입력된 것을 확인할 수 있습니다.
client id는 중복을 허용합니다.
반면에 id 컬럼의 값은 consumer group 내에서 유일한 id가 자동으로 할당됩니다.
host 컬럼을 통해 consumer의 host 정보를 확인할 수 있습니다.
assignments 컬럼에서는 consumer가 consume하는 topic의 partition 정보를 확인할 수 있습니다.
지금은 hello topic의 0과 1번 partition 두개를 consume하고 있습니다.
그럼 하단에 terminal로 group id가 work인 consumer를 실행합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

AKHQ 페이즈를 새로고침합니다.
console consumer가 하나 더 추가되는 것을 확인 할 수 있습니다.
client id는 console consumer가 두개가 나오고,
id 컬럼을 통해 consumer들은 고유한 id 값을 가지고 있는 것을 확인할 수 있습니다.

신경써서 봐야할것은 assignments 컬럼입니다.
기존에 한개의 consumer가 0과 1번 partition을 consume했는데,
이제 consumer가 두개가 되었기때문에, 두개의 consumer는 0번 1번 partition을 각각 consume하게 됩니다.

consumer groups에 참여하는 consumer들의 구성이 바뀌면, 자동으로 consumer는 partition rebalancing 할 수 있습니다.
참고로, consumer 수가 두배가 되었으므로 이 consumer groups의 처리량도 두배로 늘게되었습니다.

partition rebalancing은 consumer의 수를 partition size로 나눠서 자동으로 할당합니다.
partition size가 10이고, consumer가 두개면, 각각 5개씩 consume합니다.
그럼, 반대로 partition size보다 consumer size가 큰경우에 대해 알아보겠습니다.
partition이 2개고 consumer가 3개로 consumer가 한개 더 큰 경우를 가정하겠습니다.
terminal로 새로운 consumer를 추가해서 work consumer groups에 consumer가 3개가 되도록합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

AKHQ 페이지를 새로고침합니다.
group id가 work인 client는 3개인 것을 확인 할 수 있습니다.
그런데 assignments 컬럼을 확인하면 2개의 consumer는 partition이 할당되었지만, 한개 consumer는 할당되지 않은 것을 확인할 수 있습니다.
이처럼 partition 수보다 consumer 수가 많으면 남는 consumer는 partition을 할당 받지 못합니다.
불필요하게 서버를 낭비할 수 있으니 주의가 필요합니다.

실습으로 돌아가 terminal에서 consumer 하나를 종료합니다.
빠르게 AKHQ 페이지를 새로고침합니다.
assignments 컬럼에 할당된 partition이 없는 것을 확인할 수 있습니다.
consumer group의 consumer 구성에 변화가 생기면,
모든 consumer는 partition consume을 중지하고 rebalancing되기 전까지 대기합니다.
그리고 partition rebalancing이 완료되면 consumer는 다시 consume을 합니다.
이 과정이 짧기 때문에 빠르게 확인을 해야 볼 수 있습니다.

그럼 kafka topics 명령어로 consumer가 아닌,
kafka 노드가 가지고 있는 partition의 size를 늘리면 어떻게 될까요?
남는 consumer가 바로 새로운 partition을 consume할까요?
확인을 위해 실습을 하겠습니다.
consumer를 추가합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

AKHQ 페이지를 새로고침합니다.
assignments 컬럼에서 할당되지 않은 consumer를 확인할 수 있습니다.
이제 kafka topics로 partition 수를 3개로 늘리겠습니다.

kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3

하지만 consumer group의 consumer가 추가 또는 제거되는 것과는 다르게,
kafka topics 명령어로 partition을 늘려도 consumer에게 즉시 할당 되지 않습니다.
참고로 지금 실습에서는 consumer group에 partition이 할당되기까지 2분정도 걸렸습니다.
지연이 있을수 있으니 빠르게 처리하고 싶다면 partition size alter 명령을 사용후,
바로 consumer를 추가하는 것도 방법입니다.

지연에 대해서 다루는 이유는 kafka의 topic의 순서때문입니다.
예를 들면, A topic은 반드시 B topic이 처리되기전에 처리되어야한다고 가정하겠습니다.
그런데 A는 0번 파티션에,
B는 1번파티션에 저장된가고 가정합니다.
만약 0번 partition이 1분간 지연이 발생해, 1번 partition의 B가 먼저 consume이 되고,
지연이 끝나고 A topic이 0번 partition을 통해 consume될 때,
A가 B보다 먼저 저장되어야하는데 B가 먼저 저장되어있어 오류가 발생하게 됩니다.
kafka는 partition 단위로 순서가 보장되기 때문에
순서가 필요한 A와 B topic은 같은 partition에 저장할 수 있도록 동일한 key를 사용하세요.
consumer group으로 하나의 consumer가 2개 이상의 partition을 consume할 때는,
topic에서 meta 정보를 확인해 partition 정보를 읽고 partition 마다 따로 처리를 해줘야합니다.
이것은 다른 영상에서 code를 통해 확인하겠습니다.

마지막으로 consumer groups에서 offset 관리하는 것을 살펴보겠습니다.
기존 partition 번호로 직접 consume 했을때는 consumer 마다 offset 정보를 따로 관리하지 않습니다.
하지만 consumer groups는 group id를 기준으로 partition마다 처리한 마지막 offset 정보를 기록합니다.
그래서 특정 consumer가 장애로 종료되었다면 바로 다른 consumer에게 partition이 할당되고, 마지막으로 처리한 offset 이후부터 consume을 할 수 있습니다.
terminal로 consumer 두개와 producer를 실행합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

같은 키를 입력해서 topic을 발행합니다.
한 consumer에서 consume하는 것을 확인합니다.
topic을 받은 consumer를 종료합니다.
다시 producer로 같은 key로 topic을 발행합니다.
기존에 topic을 받지 못했던 consumer가 이제는 topic을 처리하는 것을 확인할수 있습니다.
그럼 모든 consumer를 종료 시키겠습니다.
producer로 다양한 key로 topic을 발행합니다.

다시 kafka console consumer로 consumer를 실행합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work

처리되지 않은 topic들을 consume하는 것을 확인할 수 있습니다.

이렇듯 kafka를 사용하면 consumer가 실패하더라도 처리가 완료된 offset 이후부터 작업을 할 수 있으므로 최종적 일관성을 달성할 수 있습니다.
kafka를 사용하면 안정적인 서비스 운영을 얻을 수 있습니다.

참고로 consumer group의 offset 정보는 consumer offsets topic에서 확인할 수 있습니다.
AKHQ topics 페이지에서 ‘show all topics’ 를 선택하고 검색하면 consumer offsets topic을 조회할 수 있습니다.
consumer offsets topic을 하단의 ‘live tail’ 버튼을 누르면 topic들을 실시간으로 확인할 수 있습니다.
topic에서 key로 group id, topic, partition 번호가 있고 상세보기를 누르면 offset 정보를 조회할 수 있습니다.
이를 통해 consumer groups는 처리한 offset 번호를 조회할 수 있습니다.

이상으로 consumer groups에 대한 설명을 마치겠습니다.

좋아요 알림 설정은 컨텐츠 제작자에게 많은 도움이 됩니다.
감사합니다.