kafka topic, partition 설명(producer, consumer, replication-factor, offset)

안녕하세요 codeshow입니다.
이번 시간에는 kafka 의 topic과 partition에 대해 학습하겠습니다.
kafka를 사용하기 위해서는 반드시 topic이 필요합니다.
이 topic을 기준으로 producer는 메세지를 publish하고,
consumer는 메세지를 구독합니다.
topic을 생성하기 위해 kafka를 실행합니다.
환경 설정은 이전 영상을 참고해주세요.
kafka 실습을 위한 devcontainers 를 실행해주세요.
container 실행될때까지 기다려주세요.
container가 다 실행되면 docker desktop을 열겠습니다.
kafka container에서 shell을 실행합니다.
topic 조회는 kafka topics의 list option을 사용합니다.

kafka-topics --bootstrap-server kafka:9092 --list

기본으로 생성되어있는 topic consumer offsets와 schemas를 제외하고는 topic이 없는 것을 확인했습니다.
kafka-topics 명령어의 create option을 사용해 topic을 생성합니다.

kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 1 --replication-factor 1

kafka topics 명령어로 hello란 이름으로 topic을 생성했습니다.
topic의 이름은 name option을 사용해 지정합니다.
replication factor option은 partition 데이터를 몇개의 복사본을 만들지 선택하는 option입니다.
kafka 노드의 수만큼 replica factor 값을 설정했습니다.
값이 높을 수록 노드간에 파티션 복사 비용이 발생해 성능에 영향을 미칩니다.
대신 값이 적으면 kafka 장애시에 데이터를 유실할 수 있습니다.
보통 3 이상의 값을 권장합니다.

참고로 create 당시 kafka 노드의 수보다 많은 replication factor를 넣으면 오류가 발생합니다.

kafka-topics --bootstrap-server kafka:9092 --create --topic error --partitions 1 --replication-factor 4

다른 option으로 partitions 에 대해 설명하겠습니다.
partition은 kafka에서 매우 중요한 정보입니다.
partitions option은 topic을 몇개로 나눌지를 설정하는 정보입니다.
만약 producer가 100개의 topic을 publish하고 이 topic은 1개의 partition만 있다면,
100개의 topic은 1개의 partition에 쌓입니다.
만약 consumer가 1개의 topic을 처리하는데 1분이 걸린다면.
topic 100개를 처리하는데 100분이 걸리게 됩니다.
만약, partition을 4개로 설정한다면 topic은 4개의 파티션 각각 25개씩 쌓입니다.
하나의 partition에는 최대 한개의 consumer만 둘수 있기 때문에,
이제 늘어난 파티션 개수만큼 3개의 consumer를 추가할 수 있습니다.
처리시간을 100분에서 25분으로 1/4 줄일수 있습니다.

여태까지의 내용을 shell을 이용해 실습하겠습니다.
hello topic에 producer와 consumer를 실습하겠습니다.
kafka container에서 terminal을 실행합니다.
좌측에는 producer, 우측에는 consumer를 배치합니다.
kafka console producer 명령을 이용해 kafka에 연결합니다.

kafka-console-producer --bootstrap-server kafka:9092 --topic hello 

producer가 kafka broker에 연결이 되었습니다.

kafka console consumer 명령을 이용해 kafka에 연결합니다.
partition은 0번 index부터 순차적으로 증가합니다.
실습에서 partition size가 1이므로 0번 partition만 사용 가능합니다.
option으로 partition 숫자 0을 입력합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0

producer에 hello message를 입력합니다.
consumer에 hello를 확인할 수 있습니다.

그럼 하나의 partition에는 여러개의 consumer가 연결할수 있을까요?
terminal을 추가로 실행하고 consumer를 추가하겠습니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0

consumer가 문제 없이 연결됩니다.

그럼 producer로 메세지를 publish하면 어떻게 될까요?
두가지 시나리오가 있습니다.

  1. consumer 순서대로 한번씩 순차적으로 메세지를 전달합니다.
    이를 round robin 방식이라고 부릅니다.
  2. 모든 consumer에서 모든 메세지를 전달합니다.
    이름 broadcasting이라고 합니다.

동작을 확인하기 위해 producer로 hello1 메세지를 입력합니다.
두개의 consumer에 모두 hello1 메세지가 나옵니다.
다시 producer로 hello2 메세지를 입력합니다.
두개의 consumer에 모두 hello2 메세지가 나옵니다.
kafka는 같은 partition에 연결된 모든 consumer는 broadcasting 방식을 통해 메세지를 전달 받습니다.

참고로 다른 message broker인 rabbit mq 는 하나의 queue에 여러개의 worker를 연결할 수 있습니다.
그리고 round robin으로 worker는 메세지를 순차적으로 메세지를 전달 받습니다.
이 부분이 kafka와 동작이 달라서 rabbit mq 를 먼저 경험한 사람들은 헷갈리 수 있습니다.
rabbit mq 는 하나의 queue에 worker의 size를 늘려서 처리량을 늘리고,
kafka는 하나의 topic의 partition size를 늘려서 처리량을 늘립니다.

이번 실습은 partition의 수를 변경하겠습니다.
partition 수를 1에서 2로 늘리겠습니다.
ctrl c를 눌러 producer를 종료합니다.
kafka topics 명령어의 alter option을 사용해 partition size를 2로 늘리겠습니다.

kafka-topics --bootstrap-server kafka:9092 --alter --partitions 2 --topic hello

kafka topics의 describe 옵션으로 파티션 정보를 확인합니다.

kafka-topics --bootstrap-server kafka:9092 --topic hello --describe

참고로, AKHQ의 topic 화면에서도 늘어난 partition size 2를 확인 할 수 있습니다.
다시 producer를 실행하고, 새로 추가된 1번 partition에 consumer를 추가합니다.

kafka-console-producer --bootstrap-server kafka:9092 --topic hello 

기존에 partition 0에 연결된 consumer를 종료하고, 새로 partition 1에 consumer를 연결합니다.

kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1

producer로 무작위로 여러번 메세지를 작성합니다.
consumer는 두개지만 topic은 한 consumer에게만 나뉘어 전달됩니다.

이는 key가 없는 경우 kafka가 작동하는 sticky partition cache 방식입니다.
key가 없는 경우 처음 보낸 partition 번호를 cache하고 이 partition으로만 topic을 전달합니다.
key가 없다면 한 partition만 사용하기 때문에 partition을 늘려도 의미가 없습니다.
즉, key는 kafka에서 topic이 저장될 partition을 결정하는 중요한 정보입니다.
partition들간에 topic이 잘 나뉘어 저장되도록 key를 추가하겠습니다.

producer에 property option을 추가해 다시 실행합니다.

kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"

producer로 무작위로 값을 넣어보겠습니다.
이제 topic이 partition 나뉘어 저장되는 것을 확인할 수 있습니다.
특히, 같은 key는 같은 partition으로 publish되는 것을 consumer를 통해 확인 할 수 있습니다.

key separator option을 이용해 dash 앞에는 key, 뒤에는 value가 publish됩니다.
a는 key, b는 value가 됩니다.
참고로 기존에는 key 없이, hello라는 value만 publish했습니다.

dash를 기준으로 앞의 값이 같으면 key가 같은 것입니다.
중요한점은 같은 key는 항상 같은 partition에 저장됩니다.
하지만 key가 다르다면 같은 partition 또는 다른 partition에 저장됩니다.
이 부분에 대한 이해를 위해 kafka에서 key를 다루는 법을 살펴보겠습니다.
kafka는 key를 hash해서 kafka 노드의 수만큼 나머지 연산을 해서 나오는 결과 값을 partition 번호로 결정합니다.
지금은 partition이 두개이므로, 0과 1만이 partition 번호로 나올 수 있습니다.
참고로 hash 알고리즘은 murmur2 hash를 사용합니다.

partition을 이해할 때 중요한 사항으로 kafka는 partition 단위로만 topic의 순서를 보장합니다.
즉, 순서가 중요한 topic을 publish할 때는 같은 partition에 넣어야합니다.
key 값은 항상 같은 partition에 저장되는 것을 확인했으니,
순서가 중요한 topic의 경우는 꼭 같은 key를 사용하세요!

다음으로 partition의 offset에 대해서 설명하겠습니다.
kafka가 기존의 message queue와 다른 특징으로는 consume후에 queue에서 메세지를 삭제하지 않습니다.

파티션은 배열과 비슷합니다.
partition 안에 topic들은 배열의 원소처럼 순서대로 저장되어있습니다.
index로 배열의 값을 접근하듯, offset을 통해 partition의 특정 index 이후부터 consume을 할 수 있습니다.
참고로 partition마다 마지막 offset을 가지고 있습니다.
그래서 신규 consumer가 offset option없이 연결되면 마지막 offset부터 consume을 합니다.
하지만 offset option을 주면 offset 정보부터 consume을 합니다.

실습으로 terminal에 두 명령을 내리겠습니다.

  1. 0번 offset부터 consumer가 실행하도록 명령 합니다.
  2. offset 명령 없이 consumer 실행하도록 명령 합니다.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1 --offset 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1 

offset index 0 option을 주면 처음부터 다시 실행하는 것을 확인 할 수 있습니다.
offset option이 없는 경우는 offset 옵션 기본값은 latest입니다.
새로 publish된 topic만 consume합니다.

마지막으로 partition에 대한 중요한 제약사항으로,
partition size에 기존보다 작은 값을 넣을 수 없습니다.
partition size는 더 큰 값만 입력 가능합니다.
partition을 2개에서 1개로 줄이는 alter 명령을 내리겠습니다.

kafka-topics --bootstrap-server kafka:9092 --alter --partitions 1 --topic hello

partition option 값을 2보다 큰 값을 넣으라는 오류 메세지를 확인 할 수 있습니다.

이상으로 kafka의 topic과 partition에 대한 설명을 마치겠습니다.
다음 시간에는 consumer groups를 사용하는 방법에 대해 살펴 보겠습니다.
참고로, consumer가 partition을 직접 consume하는 것보다,
consumer groups를 실무에서 더 많이 사용합니다.
이번 수업은 이 consumer groups를 잘 이해하기 위해,
partition을 학습하는 시간을 갖었습니다.

구독과 좋아요 알림 설정은 컨텐츠 제작자에게 많은 도움이 됩니다.

감사합니다.