Kafka Quick Start

[x] how does the producer know who is the leader? Ask zookeeper, leader is partition based. [x] each topic has its own set of partitions, each partiton is assigned subset of coming events. [x] producer send message to which topic partition leader? depends on parition stragegies. [x] how to ensure the message order that treats entire topic as a whole? impossible, but order is guarantee in partition level. [x] kafka command bootstrap-server vs zookeeper

The Vagrant demo file please see my git repo Infratree, reading with Zookeeper Quick Start

I took the course from PluralSight and Udemy, both are good, Udemy course is some what out-of-date. There are other tutorial videos:

Introduction

Important Terms

  • partition (divisions of a topic)
  • topic (partitioned and distributed in broker, each partition has one leader broker + followers(replicas))
  • producer (generate data, producer client only writes to the leader broker of that partition)
  • consumer (read data from topic(partition) within a consumer group)
  • broker (node in kafka cluster)

Producers and consumers are both kafka clients.

A topic is similar to a folder in a filesystem, and the events are the files in that folder. An example topic name could be “payments”. Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.

Topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time. When a new event is published to a topic, it is actually appended to one of the topic’s partitions. Events with the same event key (e.g., a customer or vehicle ID) are written to the same partition, Kafka guarantees that any consumer of a given topic-partition will always read that partition’s events in exactly the same order as they were written.

Note that message ordering for the entire topic is not guaranteed.

Kafka clents have 3 different levels of consistency, see Consistency as a Kafka Client in Kafka in a Nutshell.

Partition

Producer sends event to broker and broker places the event in one of the partitions. 比如这里有3个partitions, 2个consumers 在一个consumer group中,则consumer2 会对应到p2, p3读取事件,如果后来增加一个consumer3在这个consumer group中,则kafka 会partition rebalance p2 或 p3 到新增的消费者中,于是就变成了每个consumer 对应一个partiton。这样处理就不会因为consumer 数量的变化而导致event被重复读取或者没有被处理,也增加了throughput.

如果number of consumer > number of partition 则多余的consumer 被闲置了,因此partition 的数量几乎决定了consumer 可有的最大数量。

1
2
3
4
5
6
7
8
                              broker              consumer group
+---------------+ +------------+
| partition1---|--------|->consumer1 |
(batch send) |/ | | |
producer -------------->|- partition2---|--------|->consumer2 |
|\ | |/ |
| partition3---|--------| |
+---------------+ +------------+

关于event送到哪个 partition 是 Partition strategy 决定的,for example, in the logstash Kafka output plugin it has partitioner config, the default is round-robin, if message key is specified, then hash will be used. 所以如何选择 key 很重要,它决定了送到partition 中的event 数量是否平衡,还要注意,kafka event order matters only within a single partition. 所以如果要保证某一类event 的顺序,则需要把它们划分到同一个partition中, so custom partitioning is possible and is reserved for special cases.

I have encountered the issue that uneven districution of message among partitions with round-robin strategy, the discussion and possible cause can be seen here

Multiple consumer groups can consume the same topic, each group is isolated: A topic can be consumed by many consumer groups and each consumer group will have many consumers. Also see this question.

Producer

For example: the logstash output kafka plugin is the producer client that sends log/message to kafka cluster.

The producer has its own set of configuration, for example the logstash kafka output config, they are aligned with the kafka producer config

1
2
3
# It denotes the number of brokers that must receive the record before we consider the write as successful.
# this is important, the default is 1 in some producer config and it may introduce less explicit data loss sometimes
acks=0,1,all(-1)

Could config the batch size, compress, acks for producer. This is a tradeoff between latency and throughput. In kafka host data folder, partitions are stored here. 可以通过设置batch size大小去观察固定数量的event增加的空间大小,一般batch size越大,越节约broker的disk space.

There is a article talks about acks: Kafka Acks Explained

Also batch.size and linger.ms explained and the behavior when linger.ms = 0 but batch.size > 0.

Consumer

For example, the logstash input kafka plugin is the consumer to read the data from kafka cluster.

The consumer has its own set of configuration, for example the lostash input kafak config, they are aligned with the kafka consumer config

High Availability

New Kafka node will register itself via zookeeper and learn about other broker in the cluster. In production zookeeper usually has 3 ~ 5 nodes to form a cluster. Kafka broker will discover each other through zookeeper.

Producer should set acks to receive ack from broker for message commitment.

Performance

It is related to disk I/O, network, RAM, CPU, OS, etc.

export KAFKA_HEAP_OPTS="-Xmx4g", to set the Java heap size for Kafka (max amount), then start the Kafka. You can monitor the heap size overtime, increase it if needed.

Make sure swapping is disabled for Kafka entirely.

Increase the file descriptor limits on your linux, at least 100,000 as a starting point.

Run Kafka only on your macine, anything else will slow down the machine.

Encountered Issue

Lose messages in Kafka, on both producer and consumer side.

Quick Start

Single node

Use one Vanguard VM with Java pre-installed to do the expirement: http://kafka.apache.org/quickstart not using the self-contained zookeeper: https://dzone.com/articles/kafka-setup

Cluster Setup

Follow up the zk cluster setup, I use the same cluster nodes, kafka version is 2.6.0. 3 node IPs, sudo su to use root user.

1
2
3
4
5
6
7
8
9
10
11
12
// zk leader or follower not reflect real scenario

192.168.20.20 192.168.20.21 192.168.20.22
|--------------| |-----------------| |----------------|
| zk server1<--|----|--> zk server2<--|----|-->zk server3 |
| follower | | leader | | follower |
| | | / | \ | | |
| | | / | \ | | |
| /--|----|---/ | \---|----|--\ |
| kafka / | | kafka | | \kafka |
| broker1 | | broker2 | | broker3 |
|--------------| |-----------------| |----------------|

Kafka archive download link.

Download kafka binary and untar:

1
2
3
4
wget -q https://mirrors.sonic.net/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar -zxf kafka_2.12-2.6.0.tgz

cd /root/kafka_2.12-2.6.0/config

Create custom log.dir folder, update in configuration file:

1
mkdir -p /root/kafka/log

Updates zookeeper.connect, num.partitions and log.dir. they are the same for all brokers. broker.id and advertised.listeners, these two need to be unique for each broker, this is not a exhausted list, more info see here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# delete topic enable, default is true
delete.topic.enable=true


############################# Socket Server Settings #############################
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.20.20:9092

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

# zookeeper.connect=192.168.20.20:2181,192.168.20.21:2181,192.168.20.22:2181/kafka
# /kafka的作用就是把所有kafka相关的folder都放在zk的kafka目录下, 从zk CLI中可以看到
zookeeper.connect=192.168.20.20:2181,192.168.20.21:2181,192.168.20.22:2181

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=true
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/root/kafka/log

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 指的是每个topic 分成多少份partition, can > broker number
num.partitions=5

# 指的是每个partition的备份, 小于或等于 broker的个数
default.replication.factor=3

# in-sync replica (ISR) is a broker that has the latest data for a given partition.
# A leader itself is always an in-sync replica.
# A follower is an in-sync replica only if it has fully caught up to the partition it’s following.
# 注意,每个replica 都可以达到in-sync的状态(slow or quick),这个min.insync.replicas only works with the acks=all config in producer side!
# it denotes that at least this number is met, acks=all can succeed
# default is 1, typicially = default.replication.factor - 1
min.insync.replicas=2

The log retention log.retention.hours=168 is for deleting message in Kafka, whether or not the messages have been consumed, see this question

1
2
3
4
5
6
7
8
9
10
cd /root/kafka_2.12-2.6.0
# create a log file
touch /root/kafka/run.log
# start kafka
bin/kafka-server-start.sh config/server.properties > /root/kafka/run.log 2>1 &

# or run as daemon
bin/kafka-server-start.sh -daemon config/server.properties
# stop kafka
bin/kafka-server-stop.sh

Kafka log is configured by conf/log4j.properties, by default is under <kafka package>/logs/server.log file.

Create and output topic description, can run in anyone of the nodes, localhost can be one of the broker IPs or list of IPs combination, separate by comma. In old version you use the --zookeeper option to list all zk ip:port or ip:port/kafka(if you use chroot).

如果想从外部connect kafka broker, advertised.listeners 必须使用public IP or DNS hostname, 如果用集群内部的private IP 或者 localhost 则外部不能访问了。

partition number and replic factor在 command 都可以override default的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# create topic
# --bootstrap-server: Kafka server to connect to
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic test

# list topics created
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# topic verbose
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 \
--topic <topic name>

# delete topic
# delete.topic.enable=true by default
# deletion is not reversible
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 \
--topic <topic name>

Producer and consumer, can run on anyone of the nodes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# produce message 
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

# read last one message in topic test of consume1 group
# note that one topic can be consumed by different consumer group
# each has separate offset
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group consume1 --max-messages 1

# view message based on consumer group
# --group: consumer group
# will not remove messge in topic, just view
# --from-beginning: read from topic beginning
# --max-messages: max message to show
# --max-messages: formater
bin/kafka-console-consumer.sh --topic test --group consume1 --from-beginning --bootstrap-server localhost:9092 --max-messages 2 --property print.timestamp=true

# read consumer groups
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# check partition/offset/lag messages in each consumer group/topic
# Consumer lag indicates the lag between Kafka producers and consumers. If the rate of
# production of data far exceeds the rate at which it is getting consumed, consumer
# groups will exhibit lag.
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group name>
# or
# they have the same output
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group <group name> \
--bootstrap-server localhost:9092 \
--describe

Update topic settings:

1
2
# partition number can only be increase
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --partitions 4

How to know all Kafka brokers are ready and running? Can check with zk ephemeral node, see here

1
echo dump | nc localhost 2181 | grep brokers

Perf Test

To see the kafka resiliency, we can have a perf test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# generate 10KB of random data
base64 /dev/urandom | head -c 10000 | egrep -ao "\w" | tr -d '\n' > file10KB.txt

# in a new shell: start a continuous random producer
./kafka-producer-perf-test.sh --topic perf \
--num-records 10000 \
--throughput 10 \
--payload-file file10KB.txt \
--producer-props acks=1 bootstrap.servers=localhost:9092 \
--payload-delimiter A

# in a new shell: start a consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic perf

# then do:
# kill one kafka server
# kill another kafka server
# kill the last server

# start back the servers one by one

Then open another terminal to consume the data, meanwhile go to kill the kafka broker one by one to see the result.

Kafka UI

There is a open source Web UI tool for managing Kafka cluster, CMAK. You can build a docker image of it. Another open source for viewing topic messages Kowl

Run as Daemon

Similar to Zookeeper daemon set. Set Kafka as system daemon so that it will be launched every time system boots.

For example, generate service file kafka.service in /etc/systemd/system folder. Double curly brackets is placeholder in jinja2 template.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
# after zookeeper is up
After=network-online.target consul.service zookeeper-server.service dnsmasq.service
StartLimitInterval=200
StartLimitBurst=5

[Service]
# long-runing without forking
Type=simple
User={{ kafka_user }}
Group={{ kafka_group }}
# start, stop
ExecStart=/opt/kafka/bin/kafka-server-start.sh {{ kafka_conf_dir }}/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh

OOMScoreAdjust=-500
Restart=on-failure
RestartSec=30

[Install]
WantedBy=multi-user.target

More detail about systemd please search and see my systemd blog.

Then you must enable kafka starts on boot:

1
2
3
systemctl daemon-reload
# enable start on boot
systemctl enable kafka

Other systemd commands:

1
2
3
4
5
6
7
8
systemctl start kafka
systemctl stop kafka
systemctl restart kafka

# reload config without restart
systemctl reload kafka
# first try relaod, if not supports then restart
systemctl reload-or-restart kafka
0%