On-Call Kafka Quick Check

On any of Kafka cluster nodes runs following commands.

Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# list topics created
./bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

# display:
# number of partitions of this topic
# relica factor
# overridden configs
# in-sync replicas
./bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic <topic name> \
--describe
# the output for example:
# "Isr" is a status, it shows which replica is in-sync, the below means all replicas are good
# Configs field shows the override settings of default
Topic: apple PartitionCount: 3 ReplicationFactor: 3 Configs: cleanup.policy=delete,segment.bytes=536870912,retention.ms=172800000,retention.bytes=2000000000
Topic: apple Partition: 0 Leader: 28 Replicas: 28,29,27 Isr: 28,29,27
Topic: apple Partition: 1 Leader: 29 Replicas: 29,27,28 Isr: 29,27,28
Topic: apple Partition: 2 Leader: 27 Replicas: 27,28,29 Isr: 27,28,29

# only show overridden config
./bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topics-with-overrides \
--topic <topic name> \
--describe
# or using kafka-config
./bin/kafka-configs.sh \
--zookeeper <zookeeper>:2181 \
--entity-type topics \
--entity-name <topic name> \
--describe

# add or update topic config
./bin/kafka-configs.sh \
--zookeeper <zookeeper>:2181 \
--entity-type topics \
--entity-name <topic name> \
--alter \
--add-config 'max.message.bytes=16777216'

# remove topic overrides
./bin/kafka-configs.sh \
--zookeeper <zookeeper>:2181 \
--entity-type topics \
--entity-name <topic name> \
--alter \
--delete-config 'max.message.bytes'

# list all consumer groups of a topic
# no direct command

Consumer Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# list consumer groups
./bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list

# check partition/offset/lag messages in each consumer group/topic
# also see topics consumed by the group

# Consumer lag indicates the lag between Kafka producers and consumers. If the rate of
# production of data far exceeds the rate at which it is getting consumed, consumer
# groups will exhibit lag.
# From column name:
# LAG = LOG-END-OFFSET - CURRENT-OFFSET
# CLIENT-ID: xxx-0-0: means consumer 0 and its worker thread 0
./bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group <consumer group name> \
--describe

# read last one message in topic of consumer group
# note that one topic can be consumed by different consumer group
# each has separate consumer offset
./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic <topic name> \
--group <consumer group name> \
--max-messages 1

Partition

1
2
3
4
5
6
7
# increase partition number
# partition number can only grow up
./bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic <topic name> \
--partitions <new partition number> \
--alter

Delete Messages

If there is bad data in message that stucks the consumer, we can delete them from the specified partition:

1
2
3
./bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file deletion.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"partitions": [
{
"topic": "<topic name>",
// partition number, such as 0
"partition": 0,
// offset, delete all message from the beginning of partition till this
// offset(excluded).
// The offset specified is one higher than the problematic offset reported
// in the log
"offset": 149615102
}
],
// check ./bin/kafka-delete-records.sh --help to see the version
"version": 1
}

Note that if all messages need deleting from the topic, then specify in the JSON an offset of -1.

0%