groups consumer kafka giải thích
Xin chào, đây là codeshow.
Trong thời gian này, chúng ta sẽ tìm hiểu về groups consumer kafka .
Trong thực tế , groups người tiêu dùng thường được sử dụng hơn là kết nối consumer trực tiếp với partition .
groups consumer cho phép consume dụng nhiều partition tự động hơn là kết nối consumer trực tiếp với partition .
Ví dụ: để consume topic bao gồm 4 partition , bạn cần tạo 4 consumer xem xét partition từ 0 đến 3.
Điều này là do một consumer chỉ có thể consume một partition .
Tuy nhiên, nếu groups consumer được sử dụng, thì một consumer có thể consume cả bốn partition .
Và khi số lượng consumer trong cùng một nhóm tăng lên, partition để consume sẽ tự động rebalancing.
Không giống như partition consume trực tiếp, có nhiều lợi ích khi vận hành vì chúng được consume tự động.
Chúng tôi sẽ thực hành groups consumer .
Cài đặt môi trường sẽ tiếp tục từ video trước.
Chạy devcontainers để thực hành kafka .
Chờ cho đến khi container chạy.
Khi container đã sẵn sàng, chúng tôi sẽ mở docker desktop.
Thực thi shell trong container kafka .
Hãy xóa topic đã tạo trước đó và bắt đầu thực hành.
Nếu không có topic hello , bạn có thể bỏ qua bước này.
kafka-topics --bootstrap-server kafka:9092 --delete --topic hello
Xóa topic bằng lệnh topics kafka .
Tạo hello topic xin chào với hai partition bằng lệnh topics kafka .
kafka-topics --bootstrap-server kafka:9092 --create --topic hello --partitions 2
Kiểm tra topic đã tạo trong menu topic của AKHQ .
Tôi đã xác nhận rằng có hai partition.
consume partition 0 và 1 bằng lệnh consumer console điều khiển kafka .
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 1
Giống như cách làm trong video trước, chúng ta sẽ produce key và value dựa trên dấu gạch ngang.
Chạy producer console kafka .
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
Bạn có thể thấy rằng hai consumer đang consume tin nhắn.
Khi consumer consume thông qua option partition , chỉ một partition có thể được consume .
Do đó, không thể consume topic được tích lũy trong partition của consumer đã chấm dứt trước đó .
Sau đó, từ giờ trở đi, chúng tôi sẽ sử dụng groups consumer thay vì consume trực tiếp partition .
Giết consumer console kafka đang chạy.
Thêm một option nhóm vào một lệnh hiện có.
option nhóm chỉ định id của nhóm nơi consumer được đặt.
Nhập id nhóm là work.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --partition 0 --group work
Thực hiện lệnh như dự định sẽ dẫn đến lỗi.
Bởi vì option partition và option nhóm không thể được nhập cùng một lúc.
Chỉ một trong hai option phải được nhập.
Loại trừ option partition và phát lại lệnh consumer console điều khiển kafka .
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Bây giờ, chúng ta sẽ kiểm tra hoạt động của groups consumer thông qua AKHQ .
Kết nối với AKHQ bằng port 8080 localhost .
Kiểm tra menu topics AKHQ .
Chọn topic hello từ danh sách topic .
Chọn tab groups consumer .
Chọn groups consumer có id là work .
Trong tab topics , bạn có thể xem thông tin về topic và partition hiện đang consume .
Lưu ý rằng groups consumer có thể consume một hoặc nhiều topic .
Trong bài tập này, chúng ta sẽ chỉ consume một topic .
Trên trang AKHQ , chọn tab Thành viên.
Có các cột client id, id, host và assignments .
id client là một giá trị được nhập để xác định consumer.
Trong thực tế, bạn có thể thấy rằng ‘ console consumer ‘, giá trị mặc định của lệnh shell , đã được nhập.
id client cho phép trùng lặp.
Mặt khác, giá trị của cột id được tự động gán một id duy nhất trong nhóm consumer .
Bạn có thể kiểm tra thông tin host của consumer thông qua cột host .
Trong cột assignments , bạn có thể kiểm tra thông tin partition của topic được consumer consume .
Hiện tại nó đang consume 2 partition 0 và 1 của topic hello .
Sau đó, chạy consumer có id nhóm đang work trong terminal ở phía dưới.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Làm mới giai đoạn AKHQ .
Bạn có thể thấy rằng một consumer console khác đã được thêm vào.
Đối với id client , có hai consumer console ,
Bạn có thể kiểm tra xem consumer có giá trị id duy nhất thông qua cột id .
Một điều cần chú ý là cột assignments .
Trước đây, một consumer consume partition 0 và 1,
Vì bây giờ có hai consumer, nên hai consumer lần lượt consume partition 0 và 1.
Khi cấu hình của consumer tham gia vào groups consumer bị thay đổi, consumer có thể tự động thực hiện rebalancing partition .
Lưu ý rằng vì số lượng consumer tăng gấp đôi, thông lượng của groups consumer này cũng tăng gấp đôi.
rebalancing partition tự động phân bổ số lượng consumer chia cho size partition .
Nếu size partition là 10 và có hai consumer, mỗi người consume 5.
Ngược lại, hãy xem trường hợp size consumer lớn hơn size partition .
Giả sử rằng có 2 partition và 3 consumer và consumer lớn hơn.
Thêm một consumer mới vào terminal để có 3 consumer trong groups consumer work .
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Làm mới trang AKHQ .
Bạn có thể thấy rằng có 3 client có id nhóm là work .
Nhân tiện, nếu bạn kiểm tra cột assignments , bạn có thể thấy rằng hai consumer được chỉ định partition, nhưng một consumer thì không.
Như vậy, nếu số lượng consumer vượt quá số lượng partition , consumer còn lại sẽ không được chỉ định partition.
Bạn cần cẩn thận vì bạn có thể lãng phí máy chủ của mình một cách không cần thiết.
Hãy quay trở lại phòng thí nghiệm của chúng tôi và kết thúc một consumer trong terminal .
Làm mới trang AKHQ một cách nhanh chóng.
Bạn có thể thấy rằng không có partition nào được gán trong cột assignments .
Nếu có sự thay đổi trong thành phần consumer của nhóm consumer ,
Tất cả consumer ngừng consume partition và đợi cho đến khi rebalancing.
Và khi rebalancing partition hoàn tất, consumer consume trở lại.
Vì quá trình này ngắn nên bạn cần kiểm tra nhanh để xem.
Sau đó, thay vì consumer bằng lệnh topics kafka ,
Điều gì xảy ra nếu bạn tăng size của partition mà nút kafka có?
consumer còn lại sẽ ngay lập tức consume partition mới?
Tôi sẽ làm một bài tập để xác nhận.
Thêm consumer.
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Làm mới trang AKHQ .
Bạn có thể kiểm tra consumer chưa được chỉ định trong cột assignments .
Bây giờ, hãy tăng số lượng partition lên 3 với topics kafka .
kafka-topics --bootstrap-server kafka:9092 --alter --topic hello --partitions 3
Tuy nhiên, không giống như việc thêm hoặc loại bỏ consumer trong nhóm consumer ,
Ngay cả khi bạn tăng partition bằng lệnh topics kafka , nó sẽ không được gán ngay cho consumer.
Để tham khảo, trong bài tập này, mất khoảng 2 phút để gán partition cho nhóm consumer .
Có thể xảy ra độ trễ nên muốn xử lý nhanh thì sau khi dùng lệnh partition size alter ,
Một cách khác là thêm consumer ngay lập tức.
Lý do chúng tôi xử lý sự chậm trễ là do thứ tự của topic trong kafka .
Ví dụ: giả sử topic A phải được xử lý trước khi topic B có thể được xử lý.
Tuy nhiên, A nằm trên phân vùng 0,
B được cho là được lưu trữ trên phân vùng #1.
Nếu partition 0 bị trì hoãn trong 1 phút, B trong partition 1 sẽ được consume trước,
Khi độ trễ kết thúc và một topic được consume thông qua partition 0,
A nên được lưu trước B, nhưng B được lưu trước, do đó xảy ra lỗi.
Vì kafka đảm bảo thứ tự theo partition ,
topic A và B yêu cầu thứ tự sử dụng cùng key để chúng có thể được lưu trữ trong cùng một partition .
Là consumer , khi một consumer consume hai partition trở lên,
Bạn cần kiểm tra thông tin meta trong topic, đọc thông tin partition và xử lý riêng từng partition .
Tôi sẽ kiểm tra điều này thông qua mã trong một video khác.
Cuối cùng, chúng ta hãy xem việc quản lý offset trong groups consumer .
Khi consume trực tiếp với số partition hiện có, thông tin offset không được quản lý riêng cho từng consumer .
Tuy nhiên, groups consumer ghi lại thông tin offset cuối cùng được xử lý trên mỗi partition dựa trên id nhóm.
Vì vậy, nếu một consumer cụ thể chấm dứt do lỗi, partition sẽ ngay lập tức được gán cho một consumer khác và consume có thể được bắt đầu từ offset được xử lý cuối cùng.
Chạy hai consumer và producer trong terminal .
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
kafka-console-producer --bootstrap-server kafka:9092 --topic hello --property "key.separator=-" --property "parse.key=true"
Xuất bản topic bằng cách nhập cùng một khóa.
Kiểm tra xem một consumer có đang consume.
Giết consumer đã nhận được topic .
Xuất bản topic với cùng một key với producer một lần nữa.
Bạn có thể thấy rằng consumer trước đây không nhận được topic hiện đang xử lý topic.
Sau đó, chúng tôi sẽ đóng cửa tất cả consumer.
Xuất bản topic với nhiều key khác nhau với tư cách là producer .
Chạy lại consumer với tư cách là consumer console kafka .
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello --group work
Bạn có thể thấy rằng topic chưa được xử lý đã được consume.
Theo cách này, nếu bạn sử dụng kafka, ngay cả khi consumer không thành công, bạn có thể làm việc từ offset sau khi quá trình xử lý hoàn tất, do đó bạn có thể đạt được tính nhất quán cuối cùng.
Với kafka, bạn có thể vận hành dịch vụ ổn định.
Để tham khảo, bạn có thể tìm thấy thông tin offset cho các nhóm consumer trong topic offsets consumer .
Bạn có thể tra cứu topic offsets consumer bằng cách chọn ‘ show all topics ‘ trên trang topics AKHQ và tìm kiếm chủ đề đó.
Nếu bạn nhấn nút ‘ tail live ‘ ở cuối topic offsets consumer , bạn có thể kiểm tra topic trong thời gian thực.
Nếu bạn có id nhóm, topic, số partition key trong topic và nhấp vào xem chi tiết, bạn có thể tìm kiếm thông tin offset .
Điều này cho phép groups consumer tra cứu số offset được xử lý.
Điều này kết thúc lời giải thích của groups consumer .
Đặt thông báo thích rất hữu ích cho người tạo nội dung.
Cảm ơn