[x] how does the producer know who is the leader? Ask zookeeper, leader is partition based. [x] each topic has its own set of partitions, each partiton is assigned subset of coming events. [x] producer send message to which topic partition leader? depends on parition stragegies. [x] how to ensure the message order that treats entire topic as a whole? impossible, but order is guarantee in partition level. [x] kafka command bootstrap-server vs zookeeper
The Vagrant demo file please see my git repo Infratree, reading with Zookeeper Quick Start
I took the course from PluralSight and Udemy, both are good, Udemy course is some what out-of-date. There are other tutorial videos:
Introduction
- Kafka简明教程
- Kafka in a Nutshell
- Apach Kafka Official Doc
- Learn Kafka Guide
- kcat, just like netcat to network.
- Understanding Kafka Topics and Partitions
Important Terms
- partition (divisions of a topic)
- topic (partitioned and distributed in broker, each partition has one leader broker + followers(replicas))
- producer (generate data, producer client only writes to the leader broker of that partition)
- consumer (read data from topic(partition) within a consumer group)
- broker (node in kafka cluster)
Producers and consumers are both kafka clients.
A topic is similar to a folder in a filesystem, and the events are the files in that folder. An example topic name could be “payments”. Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.
Topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time. When a new event is published to a topic, it is actually appended to one
of the topic’s partitions. Events with the same event key (e.g., a customer or vehicle ID) are written to the same partition, Kafka guarantees that any consumer of a given topic-partition
will always read that partition’s events in exactly the same order as they were written.
Note that message ordering for the entire topic is not guaranteed.
Kafka clents have 3 different levels of consistency, see Consistency as a Kafka Client
in Kafka in a Nutshell.
Partition
Producer sends event to broker and broker places the event in one of the partitions. 比如这里有3个partitions, 2个consumers 在一个consumer group中,则consumer2 会对应到p2, p3读取事件,如果后来增加一个consumer3在这个consumer group中,则kafka 会partition rebalance p2 或 p3 到新增的消费者中,于是就变成了每个consumer 对应一个partiton。这样处理就不会因为consumer 数量的变化而导致event被重复读取或者没有被处理,也增加了throughput.
如果number of consumer > number of partition 则多余的consumer 被闲置了,因此partition 的数量几乎决定了consumer 可有的最大数量。
1 | broker consumer group |
关于event送到哪个 partition 是 Partition strategy 决定的,for example, in the logstash Kafka output plugin it has partitioner config, the default is round-robin, if message key is specified, then hash will be used. 所以如何选择 key 很重要,它决定了送到partition 中的event 数量是否平衡,还要注意,kafka event order matters only within a single partition. 所以如果要保证某一类event 的顺序,则需要把它们划分到同一个partition中, so custom partitioning is possible and is reserved for special cases.
I have encountered the issue that uneven districution of message among partitions with round-robin strategy, the discussion and possible cause can be seen here
Multiple consumer groups can consume the same topic, each group is isolated: A topic can be consumed by many consumer groups and each consumer group will have many consumers. Also see this question.
Producer
For example: the logstash output kafka plugin is the producer client that sends log/message to kafka cluster.
The producer has its own set of configuration, for example the logstash kafka output config, they are aligned with the kafka producer config
1 | # It denotes the number of brokers that must receive the record before we consider the write as successful. |
Could config the batch size, compress, acks for producer. This is a tradeoff between latency and throughput. In kafka host data folder, partitions are stored here. 可以通过设置batch size大小去观察固定数量的event增加的空间大小,一般batch size越大,越节约broker的disk space.
There is a article talks about acks
:
Kafka Acks Explained
Also batch.size
and linger.ms
explained and the behavior when linger.ms
= 0 but batch.size
> 0.
Consumer
For example, the logstash input kafka plugin is the consumer to read the data from kafka cluster.
The consumer has its own set of configuration, for example the lostash input kafak config, they are aligned with the kafka consumer config
High Availability
New Kafka node will register itself via zookeeper and learn about other broker in the cluster. In production zookeeper usually has 3 ~ 5 nodes to form a cluster. Kafka broker will discover each other through zookeeper.
Producer should set acks to receive ack from broker for message commitment.
Performance
It is related to disk I/O, network, RAM, CPU, OS, etc.
export KAFKA_HEAP_OPTS="-Xmx4g"
, to set the Java heap size for Kafka (max amount), then start the Kafka. You can monitor the heap size over time, increase it if needed.
Make sure swapping is disabled for Kafka entirely.
Increase the file descriptor limits on your linux, at least 100,000 as a starting point.
Run Kafka only on your macine, anything else will slow down the machine.
Encountered Issue
Lose messages in Kafka, on both producer and consumer side.
Quick Start
Single node
Use one Vanguard VM with Java pre-installed to do the expirement: http://kafka.apache.org/quickstart not using the self-contained zookeeper: https://dzone.com/articles/kafka-setup
Cluster Setup
Follow up the zk cluster setup, I use the same cluster nodes, kafka version is 2.6.0
.
3 node IPs, sudo su to use root user.
1 | // zk leader or follower not reflect real scenario |
Kafka archive download link.
Download kafka binary and untar:
1 | wget -q https://mirrors.sonic.net/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz |
Create custom log.dir
folder, update in configuration file:
1 | mkdir -p /root/kafka/log |
Updates zookeeper.connect
, num.partitions
and log.dir
. they are the same for all brokers.
broker.id
and advertised.listeners
, these two need to be unique for each broker, this is not a exhausted list, more info see here:
1 | ############################# Server Basics ############################# |
The log retention log.retention.hours=168
is for deleting message in Kafka, whether or not the messages have been consumed, see this question
1 | cd /root/kafka_2.12-2.6.0 |
Kafka log is configured by conf/log4j.properties
, by default is under <kafka package>/logs/server.log
file.
Create and output topic description, can run in anyone of the nodes, localhost
can be one of the broker IPs or list of IPs combination, separate by comma. In old version you use the --zookeeper
option to list all zk ip:port or ip:port/kafka(if you use chroot).
如果想从外部connect kafka broker, advertised.listeners
必须使用public IP or DNS hostname, 如果用集群内部的private IP 或者 localhost 则外部不能访问了。
partition number and replic factor在 command 都可以override default的。
1 | # create topic |
Producer and consumer, can run on anyone of the nodes:
1 | # produce message |
Update topic settings:
1 | # partition number can only be increase |
How to know all Kafka brokers are ready and running? Can check with zk ephemeral node, see here
1 | echo dump | nc localhost 2181 | grep brokers |
Perf Test
To see the kafka resiliency, we can have a perf test:
1 | # generate 10KB of random data |
Then open another terminal to consume the data, meanwhile go to kill the kafka broker one by one to see the result.
Kafka UI
There is a open source Web UI tool for managing Kafka cluster, CMAK. You can build a docker image of it. Another open source for viewing topic messages Kowl
Run as Daemon
Similar to Zookeeper daemon set. Set Kafka as system daemon so that it will be launched every time system boots.
For example, generate service file kafka.service
in /etc/systemd/system
folder.
Double curly brackets is placeholder in jinja2 template.
1 | [Unit] |
More detail about systemd
please search and see my systemd blog.
Then you must enable kafka starts on boot:
1 | systemctl daemon-reload |
Other systemd commands:
1 | systemctl start kafka |