Kafka producers

Chia sẻ qua:

Overview

Một khi topics được tạo, chúng ta cần gửi dữ liệu vào đó. Vậy làm thế nào để gửi dữ liệu vào kafka topic, đó là chức năng của kafka producers.

Kafka producers là gì?

Tất cả các ứng dụng gửi dữ liệu lên kafka đều được gọi là kafka producers. Chi tiết hơn, đó là các ứng dụng sử dụng thư viện kafka client để ghi dữ liệu vào Kafka. Kafka client hỗ trợ hầu hết các ngôn ngữ phổ biến như Java, Go, Python ...

Kafka producers

Một producer gửi bản tin tới topic, và các bản tin được chia ra các partitions theo một cơ chế nào đó (Ví dụ: Key hashing) chúng ta sẽ tìm hiểu trong phần key hashing.

Khi gửi bản tin tới kafka bạn sẽ phải cấu hình ack level(ack=0,1 hoặc all) để định nghĩa như thế nào là gửi thành công. Chúng ta sẽ tìm hiểu chi tiết hơn trong bài topic replication.

Message Keys

Một bản tin được gửi lên kafka gồm key(có thể có hoặc không) và value

Nếu một bản tin được gửi lên:

  • Được chỉ định partition -> bản tin sẽ gửi tới partition được chỉ định
  • Không chỉ định partition nhưng có key != null -> tất cả các bản tin cùng key sẽ được gửi tới cùng partition theo cơ chế key hashing của DefaultPartitioner
  • Không chỉ định partition và key == null các bản tin sẽ round-robin tới các partition. Điều đó có nghĩ nếu topic có partition từ 0-2 thì bản tin sẽ lần lượt vào p0 -> p1 -> p2 và quay lại p0.

Message keys được ứng dụng với các bản tin cần đảm bảo thứ tự xử lý. Ví dụ: giả sử với một giao dịch gồm các phase sau:

  1. Khởi tạo
  2. Trừ tiền khách hàng
  3. Cộng tiền cho đơn vị bán
  4. Tạo đơn hàng vận chuyển.

Vậy khi xử lý chúng ta cần đảm bảo thứ tự các phase này và đơn giản nhất chúng ta sử dụng key là transactionId của luồng giao dịch. Khi đó tất cả các phase của cùng một giao dịch sẽ được gửi tới cùng một partition và được xử lý bởi cùng một consumer

Cấu trúc của một bản tin

Bản tin được tạo bởi kafka producers và bao gồm các thành phần như sau:

Cấu trúc bản tin kafka

  • Key là không bắt buộc. Có thể là số, chữ hoặc định dạng dữ liệu khác và được chuyển về định dạng binary
  • Value cũng có thể null. Value có thể là bất kỳ dữ liệu gì và cũng sẽ được chuyển về binary.
  • Compression Type. Bản tin được gửi lên có thể nén lại. Và Compression Type có thể là none, gzip, lz4, snappy,vàà zstd
  • Headers là danh sách bộ key-value. Thường được sử dụng để gửi các metadata, thông tin thêm về dữ liệu.
  • Partition + Offset khi bản tin được gửi lên kafka nó sẽ được lưu vào một partition với offset nào đó. Bộ Topic+partition+offset là định danh cho 1 bản tin và nó unique trên kafka.
  • Timestamp timestamp được kafka hoặc người dùng gán vào bản tin

Kafka Message Serializers

Trong bất kỳ ngôn ngữ nào, key hay value đều được thể hiện dưới dạng object tuy nhiên kafka thực hiện việc xử lý bản tin với định dạng byte arrays. Và việc chuyển đổi từ định dạng object thành định dạng binary được gọi là message serialization.

Message serialization

Bản tin với Key kiểu Integer và Value kiểu String chúng ta sẽ có Serializer tương ứng là KeySerializer=IntegerSerializer và ValueSerializer=StringSerializer để chuyển đổi dữ liệu về định dạng binary.

Kafka Message Key Hashing

Kafka partitioner là phần sẽ quyết định bản tin sẽ được lưu vào partition nào. Nếu bạn thay đổi thì DefaultPartitioner sẽ được sử dụng.

1public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
2    if (keyBytes == null) {
3        return stickyPartitionCache.partition(topic, cluster);
4    } 
5    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
6    int numPartitions = partitions.size();
7    // hash the keyBytes to choose a partition
8    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
9}

Đây là đoạn code được sử dụng để lựa chọn partition, nó sử dụng thuật toán MurmurHash để hash key và sau đó chia lấy dư với số lượng partition. Với thuật toán này cùng 1 key sẽ cho ra kết quả là cùng 1 partition đây chính là lý do tất cả các bản tin cùng key sẽ được routing tới cùng partition.