The quick revisit is by query examples.

Query Example:

To run a query you can go through this typical order:

1
2
3
4
5
6
7
8
9
10
-- find resurce type
resource.type="k8s_container"
-- any label to narrow down the scope
resource.labels.cluster_name="us-east4"
resource.labels.namespace_name="default"
resource.labels.container_name="foo"
-- strings in json or text payload
textPayload:"there is a high bar"

-- apply logic operator or regexp in searching

Other query examples please see here.

Query Language

It is recommended to read through the detailed syntax here.

Key Takeaways

  1. Comment line starts with -- in query expression.

  2. The boolean operator precedence order: NOT, OR and AND and they must use upper case in query.

  3. The query expression leftside field is a path from LogEntry group, you can also explore them in the unfolded query result, for example, usually we want to check if jsonPayload or textPayload contains desired substrings.

  4. Regular expression on text query examples.

  5. The timestamp in query is on UTC, you can get it by date command:

1
2
3
4
# 2023-07-04T18:50:36+00:00
date --rfc-3339=s --date="5 hours ago" | sed -e 's/ /T/g'
# Then use it in query, for example:
timestamp >= "2023-07-04T18:50:36+00:00"

Sometimes there is a need to shift pod to another node in order to rebalance the k8s node load, such as context switch, CPU LA, network traffic, etc.

For pods controlled by deployment or statefulset, just delete the pod will usually end up with recreating it in the same node again, this can be fixed with cordon:

1
2
3
4
5
6
7
8
# make the node unschedulable for new pods
kubectl cordon <pod host node name>

# delete the target pod, it will be recreated in another node
kubectl delete pod <pod name>

# revert node status
kubectl uncordon <pod host node name>

But please note that sometimes the deleted pod will not be rescheduled to the idle node due to its big resource request, from the document:

Although actual memory or CPU resource usage on nodes is very low, the scheduler still refuses to place a Pod on a node if the capacity check fails. This protects against a resource shortage on a node when resource usage later increases, for example, during a daily peak in request rate.

In this case, you can move some less-comsumed pods rather than the original one.

BTW, if you are working on GCP, you can check the node capacity and requested resource from the UI.

If the VPC subnet address space in a region ran out, the simplest way is to create a new subnet with the same mask in the same region(depends on your needs). Or you can expand the original subnet IP range.

VPC Overview Highlight

https://cloud.google.com/vpc/docs/vpc

  • VPC is global resource
  • resource within a VPC(regardless subnet) can communicate to each other, subject to firewall rules
  • shared VPC, keep a VPC in a common host project

Subnet Overview Highlight

https://cloud.google.com/vpc/docs/subnets

  • subnet is regional resource
  • subnet creation mode: auto and custom
  • you can create more than one subnet per region(for example to extend subnet capacity).
  • subnet IPV4 valid range: primary and secondary

Note, there is no need to create secondary subnet IP range for Alias IP. From observation, the subnet will have seconard IP ranges auto created if GKE is used in that network: it will create pods and services secondary IP ranges.

For example:

1
2
3
4
5
# check which node has pods secondary IP range:
gcloud compute instances list \
# the attribute path can be found through --format flattened
--filter="networkInterfaces[0].aliasIpRanges[0].subnetworkRangeName~'pods'" \
--project <project name>

Found VM attached with specified subnet:

1
2
3
gcloud compute instances list \
--filter="networkInterfaces[].subnetwork~'regions/us-east4/subnetworks/us-east4'" \
--project <project name>

Create and Modify Network

The primary IPv4 range for the subnet can be expanded, but not replaced or shrunk, after the subnet has been created. For example, the original primary IP range is 192.168.2.0/24(in private address space defined in gcloud), now set the prefix length to 21:

1
2
3
4
gcloud compute networks subnets expand-ip-range <subnet name> \
--region us-east4 \
--prefix-length=21 \
--project <project name>

Then the new IP range will be 192.168.0.0/21(bit set exceeds mask length is removed as it does not make sense), the expansion will fail if the new IP range conflicts with others.

Regarding Logstash introduction recaps, please have a look at Elastic Stack Quick Start

Issue description: Over the past few months, we have been seeing some logs that were indeed generated, but they lost in Elasticsearch database, which result in false PD alert as the alert expression relies on these missing logs.

The data path is common and typical:

1
2
3
4
5
Data source 
=> Logstash(UDP input + filter + Kafka output)
=> Kafka
=> Logstash(Kafka input + filter + Elasticsearch output)
=> Elasticsearch(Kibana)

The lost could happen in transit at any point, let’s check and narrow the scope down.

At very beginning, I just suspected the data lost on Kafka without any proof and adjusted the ack level on Logstash Kafka output plugin, it turns out not the case.

Does The UDP Packet Reach VM?

As UDP transfer is not reliable, the packets could be lost before they reached Logstash VM, we can have background tcpdump to sniff the UDP packets from target host and port to verify, for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# time limit the process run
# -k/--kill-after: send kill after 5s if TERM signal does not work
# 12h: duration, run tcpdump 12 hours
timeout -k 5 12h \
# -i: interface
# -nn: don't convert protocol and port numbers to name
# -w: write to pcap file
# -U: with -w, write packet to file immediately
# -C: rollover file size limit: 200MB
tcpdump -i eth0 \
# filter capturing with src IP started from 172.16 and port 514 only
-nn src 172.16 and port 514 \
-wU /home/example/logstash-0.pcap \
-C 200 \
&> /dev/null &

It is important to limit the pcap file size and roll over new file if necessary, for example, the pcap file generated from above command will be:

1
2
3
4
5
# suffix number starts from 1, auto appended.
/home/example/logstash-0.pcap
/home/example/logstash-0.pcap1
/home/example/logstash-0.pcap2
/home/example/logstash-0.pcap3

Each will have file size <= 200MB as -C specified.

Then you can use Wireshark or tcpdump itself to read/filter the pcap file:

1
2
3
4
5
6
7
8
# -r: read pcap file
# -v: show verbose packet info
tcpdump -r /home/example/logstash-0.pcap2 \
# filter expression
src 172.16.53.4 and port 514 \
-nn \
-v \
| grep "target info"

Then we know the UDP packets actually reached the VM, move to the next question.

Other tcpdump Usages

If you want to have file creation in a strict time interval manner:

1
2
3
4
5
6
# -G: rotated every 1800s duration
# must use strftime filename, otherwise new file will overwrite the previous
tcpdump -i eth0 -G 1800 -w /tmp/trace-%Y-%m-%d-%H:%M:%S.pcap
# -W: limit the file number to 3, and exit 0
# so it will only create 3 files and each 30 mins
tcpdump -i eth0 -W 3 -G 1800 -w /tmp/trace-%Y-%m-%d-%H:%M:%S.pcap

We can have -C(size) and -G(time) together, but need the timestamp file name! For example:

1
tcpdump -i eth0 -w /tmp/trace-%Y-%m-%d-%H:%M:%S.pcap -G 3 -C 2

The file creation is either every 3s or 2MB, whichever comes first, look the size rollover is within each time interval:

1
2
3
4
5
6
7
8
9
-rw-r--r--. 1 tcpdump              2.0M May  2 06:30 trace-2022-05-02-06:30:20.pcap
-rw-r--r--. 1 tcpdump 2.0M May 2 06:30 trace-2022-05-02-06:30:20.pcap1
-rw-r--r--. 1 tcpdump 1003K May 2 06:30 trace-2022-05-02-06:30:20.pcap2
-rw-r--r--. 1 tcpdump 2.0M May 2 06:30 trace-2022-05-02-06:30:23.pcap
-rw-r--r--. 1 tcpdump 2.0M May 2 06:30 trace-2022-05-02-06:30:23.pcap1
-rw-r--r--. 1 tcpdump 1.1M May 2 06:30 trace-2022-05-02-06:30:23.pcap2
-rw-r--r--. 1 tcpdump 1.5M May 2 06:30 trace-2022-05-02-06:30:26.pcap
-rw-r--r--. 1 tcpdump 1.7M May 2 06:30 trace-2022-05-02-06:30:29.pcap
-rw-r--r--. 1 tcpdump 301K May 2 06:30 trace-2022-05-02-06:30:32.pcap

You can rotate the file by size and limit the number of file created,

1
2
3
# -W: file count limit 3
# -C: rollover file size limit: 2MB
tcpdump -i eth0 -w /tmp/trace.pcap -W 3 -C 2

The result could be:

1
2
3
4
# rotated among these 3 files
-rw-r--r--. 1 tcpdump tcpdump 2.0M May 2 06:55 trace.pcap1
-rw-r--r--. 1 tcpdump tcpdump 2.0M May 2 06:55 trace.pcap2
-rw-r--r--. 1 tcpdump tcpdump 1.5M May 2 06:55 trace.pcap0

or by time and size both, note that rotation happens only within the timeslice: every 3s when the size exceeds!

1
2
# must use timestamp file name
tcpdump -i eth0 -w /tmp/trace-%Y-%m-%d-%H:%M:%S.pcap -W 5 -C 2 -G 3

Does Logstash UDP Input Drop Packet?

This can be verified by adding another output plugin file to store target info to a local file, for example in logstash config file:

1
2
3
4
5
6
7
8
9
10
11
12
input {}
filter {}

output {
kafka {}
if "target info" in [message] {
file {
# 'sourceip' is a field from record
path => "/var/log/logstash/highlight/%{sourceip}.log"
}
}
}

By checking the log file, I know the output does not send the message out, plus the filter is good, so it must be input {} UDP dropped the data.

Why Does UDP Drop Packet?

UDP is not a reliable transport. By design, it will drop messages if it does not have space to buffer them.

There is a blog talks about the UDP packer error and UDP receive buffer error, see from netstat -us, they are the first-hand indicator for packet drop.

How to Solve It?

Luckly, we can increase the queue size and buffer size, for example:

1
2
3
4
5
6
7
8
9
input {
udp {
port => 514
type => syslog
queue_size => 100000 # default is 2000
receive_buffer_bytes => 16777216 # 16 MB, default uses system sysctl value
workers => 4 # depends on the vcpu number, default is 2
}
}

The value really depends on your traffc, you can run sar -n DEV 1 to find a reasonable estimation. Moreover, you need to uplift system socket receive buffer in order to set Logstash receive_buffer_bytes correctly if it is larger than system default buffer, for example:

1
2
echo "net.core.rmem_max=16777216" >> /etc/sysctl.conf
sysctl -p

Finally, increase the Logstash JVM heap size accordingly, usually half of the RAM size for both Xms and Xmx.

Then restart and check the status of new config:

1
2
3
systemctl restart logstash
# verbose to see the config change
systemctl status logstash -l

Horizontal Scaling

If the traffic is heavy, vertical scaling on cpu core or RAM is not enough and the packet drop continues. It turns out the Logtash does not scale will the increasing CPU cores, see this comment.

So in this case you have to do horizontal scaling.

Monitoring/Alert

It is helpful to have Grafana dashboard to display drop rate as to inbound traffic, for example:

1
2
3
4
5
label_replace(rate(node_netstat_Udp_InErrors{instance=~".*-example-.*"}[5m])
/ on(instance)
rate(node_network_receive_packets_total{device="eth0", instance=~".*-example-.*"}[5m])
, "new_name", "$1", "instance", "prefix-(.+)")
* 100

There is the list of metrics exposed by node exporter.

Postscript

There are other things I practiced:

  1. Logstash config and syntax for input, filter, output plugin.
  2. Using nc as UDP client to test Logstash UDP input and filter.
  3. Revisit Kafka config and know possible data loss in Kafka side.
  4. Revisit PromQL

最后把系统优化中的网络部分又看了一遍加深印象:D

[x] how does the producer know who is the leader? Ask zookeeper, leader is partition based. [x] each topic has its own set of partitions, each partiton is assigned subset of coming events. [x] producer send message to which topic partition leader? depends on parition stragegies. [x] how to ensure the message order that treats entire topic as a whole? impossible, but order is guarantee in partition level. [x] kafka command bootstrap-server vs zookeeper

The Vagrant demo file please see my git repo Infratree, reading with Zookeeper Quick Start

I took the course from PluralSight and Udemy, both are good, Udemy course is some what out-of-date. There are other tutorial videos:

Introduction

Important Terms

  • partition (divisions of a topic)
  • topic (partitioned and distributed in broker, each partition has one leader broker + followers(replicas))
  • producer (generate data, producer client only writes to the leader broker of that partition)
  • consumer (read data from topic(partition) within a consumer group)
  • broker (node in kafka cluster)

Producers and consumers are both kafka clients.

A topic is similar to a folder in a filesystem, and the events are the files in that folder. An example topic name could be “payments”. Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.

Topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time. When a new event is published to a topic, it is actually appended to one of the topic’s partitions. Events with the same event key (e.g., a customer or vehicle ID) are written to the same partition, Kafka guarantees that any consumer of a given topic-partition will always read that partition’s events in exactly the same order as they were written.

Note that message ordering for the entire topic is not guaranteed.

Kafka clents have 3 different levels of consistency, see Consistency as a Kafka Client in Kafka in a Nutshell.

Partition

Producer sends event to broker and broker places the event in one of the partitions. 比如这里有3个partitions, 2个consumers 在一个consumer group中,则consumer2 会对应到p2, p3读取事件,如果后来增加一个consumer3在这个consumer group中,则kafka 会partition rebalance p2 或 p3 到新增的消费者中,于是就变成了每个consumer 对应一个partiton。这样处理就不会因为consumer 数量的变化而导致event被重复读取或者没有被处理,也增加了throughput.

如果number of consumer > number of partition 则多余的consumer 被闲置了,因此partition 的数量几乎决定了consumer 可有的最大数量。

1
2
3
4
5
6
7
8
                              broker              consumer group
+---------------+ +------------+
| partition1---|--------|->consumer1 |
(batch send) |/ | | |
producer -------------->|- partition2---|--------|->consumer2 |
|\ | |/ |
| partition3---|--------| |
+---------------+ +------------+

关于event送到哪个 partition 是 Partition strategy 决定的,for example, in the logstash Kafka output plugin it has partitioner config, the default is round-robin, if message key is specified, then hash will be used. 所以如何选择 key 很重要,它决定了送到partition 中的event 数量是否平衡,还要注意,kafka event order matters only within a single partition. 所以如果要保证某一类event 的顺序,则需要把它们划分到同一个partition中, so custom partitioning is possible and is reserved for special cases.

I have encountered the issue that uneven districution of message among partitions with round-robin strategy, the discussion and possible cause can be seen here

Multiple consumer groups can consume the same topic, each group is isolated: A topic can be consumed by many consumer groups and each consumer group will have many consumers. Also see this question.

Producer

For example: the logstash output kafka plugin is the producer client that sends log/message to kafka cluster.

The producer has its own set of configuration, for example the logstash kafka output config, they are aligned with the kafka producer config

1
2
3
# It denotes the number of brokers that must receive the record before we consider the write as successful.
# this is important, the default is 1 in some producer config and it may introduce less explicit data loss sometimes
acks=0,1,all(-1)

Could config the batch size, compress, acks for producer. This is a tradeoff between latency and throughput. In kafka host data folder, partitions are stored here. 可以通过设置batch size大小去观察固定数量的event增加的空间大小,一般batch size越大,越节约broker的disk space.

There is a article talks about acks: Kafka Acks Explained

Also batch.size and linger.ms explained and the behavior when linger.ms = 0 but batch.size > 0.

Consumer

For example, the logstash input kafka plugin is the consumer to read the data from kafka cluster.

The consumer has its own set of configuration, for example the lostash input kafak config, they are aligned with the kafka consumer config

High Availability

New Kafka node will register itself via zookeeper and learn about other broker in the cluster. In production zookeeper usually has 3 ~ 5 nodes to form a cluster. Kafka broker will discover each other through zookeeper.

Producer should set acks to receive ack from broker for message commitment.

Performance

It is related to disk I/O, network, RAM, CPU, OS, etc.

export KAFKA_HEAP_OPTS="-Xmx4g", to set the Java heap size for Kafka (max amount), then start the Kafka. You can monitor the heap size over time, increase it if needed.

Make sure swapping is disabled for Kafka entirely.

Increase the file descriptor limits on your linux, at least 100,000 as a starting point.

Run Kafka only on your macine, anything else will slow down the machine.

Encountered Issue

Lose messages in Kafka, on both producer and consumer side.

Quick Start

Single node

Use one Vanguard VM with Java pre-installed to do the expirement: http://kafka.apache.org/quickstart not using the self-contained zookeeper: https://dzone.com/articles/kafka-setup

Cluster Setup

Follow up the zk cluster setup, I use the same cluster nodes, kafka version is 2.6.0. 3 node IPs, sudo su to use root user.

1
2
3
4
5
6
7
8
9
10
11
12
// zk leader or follower not reflect real scenario

192.168.20.20 192.168.20.21 192.168.20.22
|--------------| |-----------------| |----------------|
| zk server1<--|----|--> zk server2<--|----|-->zk server3 |
| follower | | leader | | follower |
| | | / | \ | | |
| | | / | \ | | |
| /--|----|---/ | \---|----|--\ |
| kafka / | | kafka | | \kafka |
| broker1 | | broker2 | | broker3 |
|--------------| |-----------------| |----------------|

Kafka archive download link.

Download kafka binary and untar:

1
2
3
4
wget -q https://mirrors.sonic.net/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar -zxf kafka_2.12-2.6.0.tgz

cd /root/kafka_2.12-2.6.0/config

Create custom log.dir folder, update in configuration file:

1
mkdir -p /root/kafka/log

Updates zookeeper.connect, num.partitions and log.dir. they are the same for all brokers. broker.id and advertised.listeners, these two need to be unique for each broker, this is not a exhausted list, more info see here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# delete topic enable, default is true
delete.topic.enable=true


############################# Socket Server Settings #############################
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.20.20:9092

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

# zookeeper.connect=192.168.20.20:2181,192.168.20.21:2181,192.168.20.22:2181/kafka
# /kafka的作用就是把所有kafka相关的folder都放在zk的kafka目录下, 从zk CLI中可以看到
zookeeper.connect=192.168.20.20:2181,192.168.20.21:2181,192.168.20.22:2181

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=true
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/root/kafka/log

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 指的是每个topic 分成多少份partition, can > broker number
num.partitions=5

# 指的是每个partition的备份, 小于或等于 broker的个数
default.replication.factor=3

# in-sync replica (ISR) is a broker that has the latest data for a given partition.
# A leader itself is always an in-sync replica.
# A follower is an in-sync replica only if it has fully caught up to the partition it’s following.
# 注意,每个replica 都可以达到in-sync的状态(slow or quick),这个min.insync.replicas only works with the acks=all config in producer side!
# it denotes that at least this number is met, acks=all can succeed
# default is 1, typicially = default.replication.factor - 1
min.insync.replicas=2

The log retention log.retention.hours=168 is for deleting message in Kafka, whether or not the messages have been consumed, see this question

1
2
3
4
5
6
7
8
9
10
cd /root/kafka_2.12-2.6.0
# create a log file
touch /root/kafka/run.log
# start kafka
bin/kafka-server-start.sh config/server.properties > /root/kafka/run.log 2>1 &

# or run as daemon
bin/kafka-server-start.sh -daemon config/server.properties
# stop kafka
bin/kafka-server-stop.sh

Kafka log is configured by conf/log4j.properties, by default is under <kafka package>/logs/server.log file.

Create and output topic description, can run in anyone of the nodes, localhost can be one of the broker IPs or list of IPs combination, separate by comma. In old version you use the --zookeeper option to list all zk ip:port or ip:port/kafka(if you use chroot).

如果想从外部connect kafka broker, advertised.listeners 必须使用public IP or DNS hostname, 如果用集群内部的private IP 或者 localhost 则外部不能访问了。

partition number and replic factor在 command 都可以override default的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# create topic
# --bootstrap-server: Kafka server to connect to
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic test

# list topics created
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# topic verbose
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 \
--topic <topic name>

# delete topic
# delete.topic.enable=true by default
# deletion is not reversible
bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 \
--topic <topic name>

Producer and consumer, can run on anyone of the nodes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# produce message 
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

# read last one message in topic test of consume1 group
# note that one topic can be consumed by different consumer group
# each has separate offset
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group consume1 --max-messages 1

# view message based on consumer group
# --group: consumer group
# will not remove messge in topic, just view
# --from-beginning: read from topic beginning
# --max-messages: max message to show
# --max-messages: formater
bin/kafka-console-consumer.sh --topic test --group consume1 --from-beginning --bootstrap-server localhost:9092 --max-messages 2 --property print.timestamp=true

# read consumer groups
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# check partition/offset/lag messages in each consumer group/topic
# Consumer lag indicates the lag between Kafka producers and consumers. If the rate of
# production of data far exceeds the rate at which it is getting consumed, consumer
# groups will exhibit lag.
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group name>
# or
# they have the same output
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group <group name> \
--bootstrap-server localhost:9092 \
--describe

Update topic settings:

1
2
# partition number can only be increase
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --partitions 4

How to know all Kafka brokers are ready and running? Can check with zk ephemeral node, see here

1
echo dump | nc localhost 2181 | grep brokers

Perf Test

To see the kafka resiliency, we can have a perf test:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# generate 10KB of random data
base64 /dev/urandom | head -c 10000 | egrep -ao "\w" | tr -d '\n' > file10KB.txt

# in a new shell: start a continuous random producer
./kafka-producer-perf-test.sh --topic perf \
--num-records 10000 \
--throughput 10 \
--payload-file file10KB.txt \
--producer-props acks=1 bootstrap.servers=localhost:9092 \
--payload-delimiter A

# in a new shell: start a consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic perf

# then do:
# kill one kafka server
# kill another kafka server
# kill the last server

# start back the servers one by one

Then open another terminal to consume the data, meanwhile go to kill the kafka broker one by one to see the result.

Kafka UI

There is a open source Web UI tool for managing Kafka cluster, CMAK. You can build a docker image of it. Another open source for viewing topic messages Kowl

Run as Daemon

Similar to Zookeeper daemon set. Set Kafka as system daemon so that it will be launched every time system boots.

For example, generate service file kafka.service in /etc/systemd/system folder. Double curly brackets is placeholder in jinja2 template.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
# after zookeeper is up
After=network-online.target consul.service zookeeper-server.service dnsmasq.service
StartLimitInterval=200
StartLimitBurst=5

[Service]
# long-runing without forking
Type=simple
User={{ kafka_user }}
Group={{ kafka_group }}
# start, stop
ExecStart=/opt/kafka/bin/kafka-server-start.sh {{ kafka_conf_dir }}/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh

OOMScoreAdjust=-500
Restart=on-failure
RestartSec=30

[Install]
WantedBy=multi-user.target

More detail about systemd please search and see my systemd blog.

Then you must enable kafka starts on boot:

1
2
3
systemctl daemon-reload
# enable start on boot
systemctl enable kafka

Other systemd commands:

1
2
3
4
5
6
7
8
systemctl start kafka
systemctl stop kafka
systemctl restart kafka

# reload config without restart
systemctl reload kafka
# first try relaod, if not supports then restart
systemctl reload-or-restart kafka

Porj1 description: build a small go docker app to fetch and display service A’s token from specified GKE cluster through port forwarding. The arguments in token request path are from configMap in the same GKE cluster.

I structure this blog in the order of the problem solving in development process.

Caveat

The VSC may have hiccups on import of go packages(annoying errors at package import statement which prevents code intelligent suggestions from using), troubleshooting could be:

  1. check the package is downloaded/installed, run go get
  2. open VSC editor at Go project root directory rather than others
  3. make sure Go tools are up-to-date
  4. try move the Go project out of GOPATH, and set go.mod for it
  5. restart VSC editor

How to get GKE cluster kubeconfig file?

To talk to GKE cluster, the first thing to do is running the gcloud command:

1
gcloud container clusters get-credentials <cluster> --region <region> --project <project>

I don’t want to install gcloud CLI, alternatively letting golang do this job. The idea is to figure out how does gcloud command generate the kubeconfig, for example:

1
KUBECONFIG=kubeconfig.yml gcloud container clusters get-credentials <cluster> --region <region> --project <project>

The result would be a single cluster kuebconfig.yml file in the current directory, and export this KUBECONFIG will make subsequent kubectl commands work on the cluster specified in this yaml file.

To understand in depth I use gcloud option --log-http to dump command log:

1
gcloud container clusters get-credentials <cluster> --region <region> --project <project> --log-http

Displaying redacted log here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
=======================
==== request start ====
uri: https://oauth2.googleapis.com/token
method: POST
== headers start ==
# header info
== headers end ==
== body start ==
Body redacted: Contains oauth token. Set log_http_redact_token property to false to print the body of this request.
== body end ==
==== request end ====
---- response start ----
status: 200
-- headers start --
# header info
-- headers end --
-- body start --
Body redacted: Contains oauth token. Set log_http_redact_token property to false to print the body of this response.
-- body end --
total round trip time (request+response): 0.084 secs
---- response end ----

Fetching cluster endpoint and auth data.
=======================
==== request start ====
uri: https://container.googleapis.com/v1/projects/<cluster>/locations/us-west1/clusters/<cluster>?alt=json
method: GET
== headers start ==
# header info
== headers end ==
== body start ==

== body end ==
==== request end ====
---- response start ----
status: 200
-- headers start --
# header info
-- headers end --
-- body start --
# big json file which contains necessary data to generate kubeconfig
-- body end --
total round trip time (request+response): 0.067 secs
---- response end ----
----------------------
kubeconfig entry generated for <cluster>.

The first http call is about OAuth2.0, used to authorize caller’s request, the gcloud client will do this automatically for you if env variable GOOGLE_APPLICATION_CREDENTIALS is set correctly, for example:

1
export GOOGLE_APPLICATION_CREDENTIALS="path to adc.json or service account key file"

I will mount this credential file into docker when I run container.

Note that I use GOOGLE_APPLICATION_CREDENTIALS because the app runs outside of the gcloud environment, if it is inside the attached service account will be used. See demo about Authenticating as a service account.

Next I go to figure out how to run gcloud K8s go client to get the GKE cluster info, from the log dump above the URL is known as:

1
uri: https://container.googleapis.com/v1/projects/<cluster>/locations/us-west1/clusters/<cluster>?alt=json

It is gcloud K8s engine REST API v1, the live experiment can play here. Once you fill the name field and click the EXECUTE button(uncheck the API key), the OAuth2.0 will pop up and let you authorize the request then the call will succeed.

Next find the corresponding go client call for this REST API: Go Cloud Client Libraries => search in search bar Kubernetes Engine API => search related func name func (*ClusterManagerClient) GetCluster

Note that gcloud K8s go client is for managing GKE cluster not the K8s resources, it is not the k8s/go-client mentioned later.

From the sample code, the required field structure is the same as the REST API path:

1
2
3
// The name (project, location, cluster) of the cluster to retrieve.
// Specified in the format `projects/*/locations/*/clusters/*`.
Name string `protobuf:"bytes,5,opt,name=name,proto3" json:"name,omitempty"`

Now ready, I have GOOGLE_APPLICATION_CREDENTIALS exported and gcloud K8s go client library, it is easy to get what gcloud container clusters get-credentials does for us and make the kubeconfig yaml file from template.

What is OAuth2.0

The OAuth(open authorization) 2.0 google doc and example.

Usage: authorization between services, OAuth access token is JWT.

  • Intro OAuth, access delegation, limited access.
  • OAuth deeper: terms: resource, resource owner, resource server, client, authorization server(issue access token)
  • JWT explained, vs session token(reference token). The JWT(value token) contains the complete request info, that’s why it uses JSON object. Session token is just a key from a session map on the server side.
  • JWT structure explained, encode and decode JWT object

How to use K8s go client to access K8s resource?

Note that K8s go client(kubernetes/go-client) is a standalone project used to talk to K8s, K8s itself is another go project(kubernetes/kubernetes).

I have made the kubeconfig yaml file ready and set KUBECONFIG env variable, then using the client to do API call, for example reference code, can also reference the go client example for out-of-k8s cluster.

In my project I need to get date from configMap, use go client for configmap.

Tutorial

Youtube: Getting Started with Kubernetes client-go Youtube: client-go K8s native development

How to port forward in pure golang?

Next, I need to query a K8s service, to make it easy I need to forward port onto localhost, the kubectl command is:

1
2
# service port forward
kubectl port-forward svc/example 9200:9200 -v=8 &> verbose.txt

I add -v=8 flag to dump the log into verbose.txt file.

Then I see there are consecutive API calls, it first gets service detail(GET), then looking for pod(GET) and pod details(GET) managed by that service and uses that pod to do port forwarding(POST). So it actually does:

1
2
# pod port forward
kubectl port-forward example-0 9200:9200

The go client has port forward package, to use it, import as k8s.io/client-go/tools/portforward(just follows the dir path layout).

The usage of port forward package is not obvious, I reference below 2 posts to make it work:

The go channels(start and stop port forward) and goroutine will be used here, you can check lsof or netstat to see the target localhost port is listening.

Additionally, You can also see how kubectl implement port-forward

How to convert curl to golang?

Next, I need to convert curl to golang, it is easy as the underlying is all about http request.

There is a interesting project has exactly what I need: https://mholt.github.io/curl-to-go/

How to unmarshal only small set of fields from HTTP response JSON?

I find 2 options:

  1. Use struct type which has only the target fields, have to construct multiple struct and embed them to reflect the JSON field nesting.
  2. Use interface + type assertion to receive and convert the target field, no struct is needed!

For single JSON object, don’t use json.Decoder, please use json.Unmarshal instead.

How to reduce go docker image size?

Lastly, I want to build a go docker app to simplify use and has minimum image size.

The docker official doc for building go docker image is not good, end up with a big size image.

The solution is easy: using multi-stage Dockerfile, build go binary in one go docker image and copy the binary to a new base image of the same Linux distro but has much less image size:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# syntax=docker/dockerfile:1

# Need to have google-cloud-sdk-gke-gcloud-auth-plugin
# for go-client to access GKE resource
# https://cloud.google.com/blog/products/containers-kubernetes/kubectl-auth-changes-in-gke
FROM golang:1.18-buster AS auth-plugin
# version > 381.0.0-0 asks to install gcloud cli
# which is not desired and make image size bigger!
ENV AUTH_VERSION="381.0.0-0"
RUN echo "deb https://packages.cloud.google.com/apt cloud-sdk main" \
| tee -a /etc/apt/sources.list.d/google-cloud-sdk.list \
&& \
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg \
| apt-key add -

RUN apt-get update \
&& \
apt-get install google-cloud-sdk-gke-gcloud-auth-plugin=$AUTH_VERSION

# build binary executable base on alpine image
FROM golang:1.18-alpine AS binary
WORKDIR /deployment
COPY cmd ./cmd
COPY go.mod ./
COPY go.sum ./
RUN go mod download
RUN go build -o ./cmd/app ./cmd

# minimum app image creation
FROM alpine:3.15
WORKDIR /deployment
COPY --from=binary \
/deployment/cmd/app \
./cmd/app
COPY --from=auth-plugin \
/usr/lib/google-cloud-sdk/bin/gke-gcloud-auth-plugin \
/usr/local/bin/gke-gcloud-auth-plugin
COPY config ./config
COPY template ./template

WORKDIR /deployment/cmd
ENTRYPOINT [ "./app"]
CMD ["--help"]

This approach helps me reduce the docker image size from near 2GB to 60MB.

There is a post has similar idea: Build a super minimalistic Docker Image to run your Golang App.

When working with gcloud CLI, sometimes there are strong needs to filter and format the output for subsequent processing.

A use case for example here:

1
2
3
4
5
gcloud compute instances \
list \
--project example \
--filter='name~test*' \
--format="value(name,zone,disks[].deviceName)"

The filter and format are explained under gcloud topic group, you can find other useful topics about gcloud.

Resource Available Key

To find the keys for filter and format, check --format flattened option, it lists all keys for the specified resource.

Filter Syntax

https://cloud.google.com/sdk/gcloud/reference/topic/filters

Format Syntax

https://cloud.google.com/sdk/gcloud/reference/topic/formats

Projection Syntax:

https://cloud.google.com/sdk/gcloud/reference/topic/projections

A strange issue happened without a clear cause and by the time I didn’t fully understand why/how the solution worked, documented here for revisit if necessary.

Note: Run docker command with sudo is not encouraged, add user to docker group is preferred.

Issue: Docker daemon startup fails contineously, checking deamon status and journal:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
× docker.service - Docker Application Container Engine
Loaded: loaded (/lib/systemd/system/docker.service; enabled; vendor preset: enabled)
Active: failed (Result: exit-code) since Mon 2022-02-28 04:42:32 UTC; 94ms ago
TriggeredBy: × docker.socket
Docs: https://docs.docker.com
Process: 6747 ExecStart=/usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock (code=exited, status=1/FAILURE)
Main PID: 6747 (code=exited, status=1/FAILURE)
CPU: 266ms

Feb 28 04:42:32 chengdolgob.c.googlers.com systemd[1]: docker.service: Scheduled restart job, restart counter is at 2.
Feb 28 04:42:32 chengdolgob.c.googlers.com systemd[1]: Stopped Docker Application Container Engine.
Feb 28 04:42:32 chengdolgob.c.googlers.com systemd[1]: docker.service: Start request repeated too quickly.
Feb 28 04:42:32 chengdolgob.c.googlers.com systemd[1]: docker.service: Failed with result 'exit-code'.
Feb 28 04:42:32 chengdolgob.c.googlers.com systemd[1]: Failed to start Docker Application Container Engine

First to adjust the docker daemon unit file to disabke restart by commenting out:

1
2
3
#TimeoutSec=0
#RestartSec=10
#Restart=always

Then run systemctl daemon-reload and restart docker daemon to see detailed error message:

1
2
3
...
Feb 28 04:43:46 chengdolgob.c.googlers.com dockerd[7890]: failed to start daemon: Error initializing network controller: list bridge addresses failed: PredefinedLocalScopeDefaultNetworks List: [192.168.11.0/24]: no availabl
...

Check the network interface ip a s, there is no docker0 bridge.

Solution: To fix it, create new docker network bridge, reference ticket:

1
2
3
4
5
6
7
8
# delete docker0 bridge
sudo ip link delete docker0

# 192.168.9.1/24 is from docker daemon.json bip field
sudo ip link add name docker0 type bridge
sudo ip addr add dev docker0 192.168.9.1/24
# or default
sudo ip addr add dev docker0 172.17.0.1/16

The inet IP is from /etc/docker/daemon.json bip field, for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"data-root": "/usr/local/google/docker",
"bip": "192.168.9.1/24",
"default-address-pools": [
{
"base": "192.168.11.0/24",
"size": 24
}
],
"storage-driver": "overlay2",
"debug": true,
"registry-mirrors": ["https://mirror.gcr.io"]
}

See daemon.json attribute description.

Then after bridge is created, restart docker daemon:

1
sudo systemctl restart docker

Search standard Go packages here: https://pkg.go.dev/std

What are packages belong to golang.org/x: These packages are part of the Go Project but outside the main Go tree. They are developed under looser compatibility requirements than the Go core. Install them with “go get”.

List of commonly used packages fmt: Common use placeholder: %q, %v, %d, %c(one string), %T(type), %p(address). fmt.Errorf() fmt.Sprintf() logrus: structured logger for Go strings: strings.LastIndex(s,“/”), strings([]byte{‘a’,‘b’}) strconv: strconv.Itoa(55), strconv.Atoi(“1”), strconv.ParseInt(“23”, 10, 64) bytes: bytes.Buffer, []byte(“hello”) unicode/utf8: utf8.RuneCountInString(“Hello, 世界”)

kubernetes: https://github.com/kubernetes/kubernetes kubernetes/client-go: https://pkg.go.dev/k8s.io/client-go/kubernetes

os: os.Args, os.Stdin, os.Open, os.Exit(1) flag: extract options/flags from CLI, like Python Click cobra: powerful CLI for modern app: k8s, etcd, istio, etc viper: a complete configuration solution for Go app path/filepath

httpRouter net/http: minimal server, http.Handlefunc(x,handler), http.ListenAndServe(x,x) func handler(w http.ResponseWriter, r *http.Request) {} net/url context encoding/json https://pkg.go.dev/encoding/json json.Marshal(), json.MarshalIndent() yaml: gopkg.in/yaml.v3

math/rand: rand is a subdirectory in math package. bufio: Some help for textual I/O: bufio.NewScanner(os.Stdin), bufio.NewReader(os.Stdin) runtime runtime.Stack() io/ioutil: ioutil.ReadAll() sync: sync.Mutex

sort: sort.Interface sort.Strings(<[]string>) sort.Ints(<[]int>) sort.Reverse(sort.IntSlice(<[]int>)) sort.IsSorted() text/template text/tabwriter html/template

errors time: time.Now(), time.Since(xx).Seconds()

testify

hugo: a static site website generator

Makefile tutorial GNU Make website

Make function used: shell

We are using Makefile for each repo to help:

  1. Download/sync dependencies.
  2. Run unit tests.
  3. Build binary.
  4. Package binary and build docker image.
  5. Push tagged docker image to repository.

Thus the Makefile can be easily integrated into CICD pipeline or run manually for regular development, for example:

1
2
3
4
5
6
7
8
# download dependencies
make vendor

# run unit tests
make test

# build, package and publish are 3 phonies defined in Makefile
make build package publish serivce=example version=chengdol-example-0.0.1
0%